telemetry_subscribers/
file_exporter.rs1use futures::{FutureExt, future::BoxFuture};
5use opentelemetry::trace::TraceError;
6use opentelemetry_proto::{
7 tonic::collector::trace::v1::ExportTraceServiceRequest,
8 transform::{
9 common::tonic::ResourceAttributesWithSchema,
10 trace::tonic::group_spans_by_resource_and_scope,
11 },
12};
13use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
14use prost::Message;
15use std::fs::OpenOptions;
16use std::io::Write;
17use std::path::{Path, PathBuf};
18use std::sync::{Arc, Mutex};
19
20#[derive(Clone)]
21pub(crate) struct CachedOpenFile {
22 inner: Arc<Mutex<Option<(PathBuf, std::fs::File)>>>,
23}
24
25impl std::fmt::Debug for CachedOpenFile {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("CachedOpenFile").finish()
28 }
29}
30
31impl CachedOpenFile {
32 pub fn open_file(path: &Path) -> std::io::Result<std::fs::File> {
33 OpenOptions::new().append(true).create(true).open(path)
34 }
35
36 pub fn new<P: AsRef<Path>>(file_path: Option<P>) -> std::io::Result<Self> {
37 let inner = if let Some(file_path) = file_path {
38 let file_path = file_path.as_ref();
39 let file = Self::open_file(file_path)?;
40 Some((file_path.to_owned(), file))
41 } else {
42 None
43 };
44 Ok(Self {
45 inner: Arc::new(Mutex::new(inner)),
46 })
47 }
48
49 pub fn update_path<P: AsRef<Path>>(&self, file_path: P) -> std::io::Result<()> {
50 let mut inner = self.inner.lock().unwrap();
51 let file_path = file_path.as_ref().to_owned();
52
53 if let Some((old_file_path, _)) = &*inner
54 && old_file_path == &file_path
55 {
56 return Ok(());
57 }
58
59 let file = Self::open_file(file_path.as_path())?;
60 *inner = Some((file_path, file));
61 Ok(())
62 }
63
64 pub fn clear_path(&self) {
65 self.inner.lock().unwrap().take();
66 }
67
68 fn with_file(
69 &self,
70 f: impl FnOnce(Option<&mut std::fs::File>) -> std::io::Result<()>,
71 ) -> std::io::Result<()> {
72 f(self.inner.lock().unwrap().as_mut().map(|(_, file)| file))
73 }
74}
75
76#[derive(Debug)]
77pub(crate) struct FileExporter {
78 pub cached_open_file: CachedOpenFile,
79 resource: ResourceAttributesWithSchema,
80}
81
82impl FileExporter {
83 pub fn new(file_path: Option<PathBuf>) -> std::io::Result<Self> {
84 Ok(Self {
85 cached_open_file: CachedOpenFile::new(file_path)?,
86 resource: ResourceAttributesWithSchema::default(),
87 })
88 }
89}
90
91impl SpanExporter for FileExporter {
92 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
93 let cached_open_file = self.cached_open_file.clone();
94 let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);
95 async move {
96 cached_open_file
97 .with_file(|maybe_file| {
98 if let Some(file) = maybe_file {
99 let request = ExportTraceServiceRequest { resource_spans };
100
101 let buf = request.encode_length_delimited_to_vec();
102
103 file.write_all(&buf)
104 } else {
105 Ok(())
106 }
107 })
108 .map_err(|e| TraceError::Other(e.into()))
109 }
110 .boxed()
111 }
112}