telemetry_subscribers/
file_exporter.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::{FutureExt, future::BoxFuture};
5use opentelemetry::trace::TraceError;
6use opentelemetry_proto::{
7    tonic::collector::trace::v1::ExportTraceServiceRequest,
8    transform::{
9        common::tonic::ResourceAttributesWithSchema,
10        trace::tonic::group_spans_by_resource_and_scope,
11    },
12};
13use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
14use prost::Message;
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19
20#[derive(Clone)]
21pub(crate) struct CachedOpenFile {
22    inner: Arc<Mutex<Option<(PathBuf, std::fs::File)>>>,
23}
24
25impl std::fmt::Debug for CachedOpenFile {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("CachedOpenFile").finish()
28    }
29}
30
31impl CachedOpenFile {
32    pub fn open_file(path: &Path) -> std::io::Result<std::fs::File> {
33        OpenOptions::new().append(true).create(true).open(path)
34    }
35
36    pub fn new<P: AsRef<Path>>(file_path: Option<P>) -> std::io::Result<Self> {
37        let inner = if let Some(file_path) = file_path {
38            let file_path = file_path.as_ref();
39            let file = Self::open_file(file_path)?;
40            Some((file_path.to_owned(), file))
41        } else {
42            None
43        };
44        Ok(Self {
45            inner: Arc::new(Mutex::new(inner)),
46        })
47    }
48
49    pub fn update_path<P: AsRef<Path>>(&self, file_path: P) -> std::io::Result<()> {
50        let mut inner = self.inner.lock().unwrap();
51        let file_path = file_path.as_ref().to_owned();
52
53        if let Some((old_file_path, _)) = &*inner
54            && old_file_path == &file_path
55        {
56            return Ok(());
57        }
58
59        let file = Self::open_file(file_path.as_path())?;
60        *inner = Some((file_path, file));
61        Ok(())
62    }
63
64    pub fn clear_path(&self) {
65        self.inner.lock().unwrap().take();
66    }
67
68    fn with_file(
69        &self,
70        f: impl FnOnce(Option<&mut std::fs::File>) -> std::io::Result<()>,
71    ) -> std::io::Result<()> {
72        f(self.inner.lock().unwrap().as_mut().map(|(_, file)| file))
73    }
74}
75
76#[derive(Debug)]
77pub(crate) struct FileExporter {
78    pub cached_open_file: CachedOpenFile,
79    resource: ResourceAttributesWithSchema,
80}
81
82impl FileExporter {
83    pub fn new(file_path: Option<PathBuf>) -> std::io::Result<Self> {
84        Ok(Self {
85            cached_open_file: CachedOpenFile::new(file_path)?,
86            resource: ResourceAttributesWithSchema::default(),
87        })
88    }
89}
90
91impl SpanExporter for FileExporter {
92    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
93        let cached_open_file = self.cached_open_file.clone();
94        let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
95        async move {
96            cached_open_file
97                .with_file(|maybe_file| {
98                    if let Some(file) = maybe_file {
99                        let request = ExportTraceServiceRequest { resource_spans };
100
101                        let buf = request.encode_length_delimited_to_vec();
102
103                        file.write_all(&buf)
104                    } else {
105                        Ok(())
106                    }
107                })
108                .map_err(|e| TraceError::Other(e.into()))
109        }
110        .boxed()
111    }
112}