1use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8 Context, KeyValue,
9 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14 self, Resource, runtime,
15 trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21 env,
22 io::{Write, stderr},
23 str::FromStr,
24 sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::metadata::LevelFilter;
27use tracing::{Level, error, info};
28use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
29use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
30
31use crate::file_exporter::{CachedOpenFile, FileExporter};
32
33mod file_exporter;
34pub mod span_latency_prom;
35
36pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39#[derive(Default, Clone, Debug)]
45pub struct TelemetryConfig {
46 pub enable_otlp_tracing: bool,
47 pub tokio_console: bool,
49 pub json_log_output: bool,
51 pub log_file: Option<String>,
53 pub log_string: Option<String>,
55 pub span_level: Option<Level>,
58 pub panic_hook: bool,
60 pub crash_on_panic: bool,
62 pub prom_registry: Option<prometheus::Registry>,
64 pub sample_rate: f64,
65 pub trace_target: Option<Vec<String>>,
67 pub user_info_target: Vec<String>,
69}
70
71#[must_use]
72#[allow(dead_code)]
73pub struct TelemetryGuards {
74 worker_guard: WorkerGuard,
75 provider: Option<TracerProvider>,
76}
77
78impl TelemetryGuards {
79 fn new(
80 config: TelemetryConfig,
81 worker_guard: WorkerGuard,
82 provider: Option<TracerProvider>,
83 ) -> Self {
84 set_global_telemetry_config(config);
85 Self {
86 worker_guard,
87 provider,
88 }
89 }
90}
91
92impl Drop for TelemetryGuards {
93 fn drop(&mut self) {
94 clear_global_telemetry_config();
95 }
96}
97
98#[derive(Clone, Debug)]
99pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
100
101impl FilterHandle {
102 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
103 let filter = EnvFilter::try_new(directives)?;
104 self.0.reload(filter)?;
105 Ok(())
106 }
107
108 pub fn get(&self) -> Result<String, BoxError> {
109 self.0
110 .with_current(|filter| filter.to_string())
111 .map_err(Into::into)
112 }
113}
114
115pub struct TracingHandle {
116 log: FilterHandle,
117 trace: Option<FilterHandle>,
118 file_output: CachedOpenFile,
119 sampler: SamplingFilter,
120}
121
122impl TracingHandle {
123 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
124 self.log.update(directives)
125 }
126
127 pub fn get_log(&self) -> Result<String, BoxError> {
128 self.log.get()
129 }
130
131 pub fn update_sampling_rate(&self, sample_rate: f64) {
132 self.sampler.update_sampling_rate(sample_rate);
133 }
134
135 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
136 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
137 self.file_output.update_path(trace_path)?;
138 Ok(())
139 }
140
141 pub fn update_trace_filter<S: AsRef<str>>(
142 &self,
143 directives: S,
144 duration: Duration,
145 ) -> Result<(), BoxError> {
146 if let Some(trace) = &self.trace {
147 let res = trace.update(directives);
148 let trace = trace.clone();
150 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
151 tokio::spawn(async move {
152 tokio::time::sleep(duration).await;
153 if let Err(e) = trace.update(trace_filter_env) {
154 error!("failed to reset trace filter: {}", e);
155 }
156 });
157 res
158 } else {
159 info!("tracing not enabled, ignoring update");
160 Ok(())
161 }
162 }
163
164 pub fn clear_file_output(&self) {
165 self.file_output.clear_path();
166 }
167
168 pub fn reset_trace(&self) {
169 if let Some(trace) = &self.trace {
170 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
171 if let Err(e) = trace.update(trace_filter_env) {
172 error!("failed to reset trace filter: {}", e);
173 }
174 }
175 }
176}
177
178fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
179 if let Some(logfile_prefix) = log_file {
180 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
181 tracing_appender::non_blocking(file_appender)
182 } else {
183 tracing_appender::non_blocking(stderr())
184 }
185}
186
187fn set_panic_hook(crash_on_panic: bool) {
189 let default_panic_handler = std::panic::take_hook();
190
191 std::panic::set_hook(Box::new(move |panic| {
198 if let Some(location) = panic.location() {
200 tracing::error!(
205 message = %panic,
206 panic.file = location.file(),
207 panic.line = location.line(),
208 panic.column = location.column(),
209 );
210 } else {
211 tracing::error!(message = %panic);
212 }
213
214 default_panic_handler(panic);
215
216 let _ = std::io::stderr().flush();
218 let _ = std::io::stdout().flush();
219
220 if crash_on_panic {
221 std::process::exit(12);
223 }
224 }));
225}
226
227static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
228 Lazy::new(|| Arc::new(Mutex::new(None)));
229
230fn set_global_telemetry_config(config: TelemetryConfig) {
231 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
232 assert!(global_config.is_none());
233 *global_config = Some(config);
234}
235
236fn clear_global_telemetry_config() {
237 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
238 *global_config = None;
239}
240
241pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
242 let global_config = GLOBAL_CONFIG.lock().unwrap();
243 global_config.clone()
244}
245
246impl TelemetryConfig {
247 pub fn new() -> Self {
248 Self {
249 enable_otlp_tracing: false,
250 tokio_console: false,
251 json_log_output: false,
252 log_file: None,
253 log_string: None,
254 span_level: None,
255 panic_hook: true,
256 crash_on_panic: false,
257 prom_registry: None,
258 sample_rate: 1.0,
259 trace_target: None,
260 user_info_target: Vec::new(),
261 }
262 }
263
264 pub fn with_json(mut self) -> Self {
265 self.json_log_output = true;
266 self
267 }
268
269 pub fn with_log_level(mut self, log_string: &str) -> Self {
270 self.log_string = Some(log_string.to_owned());
271 self
272 }
273
274 pub fn with_span_level(mut self, span_level: Level) -> Self {
275 self.span_level = Some(span_level);
276 self
277 }
278
279 pub fn with_log_file(mut self, filename: &str) -> Self {
280 self.log_file = Some(filename.to_owned());
281 self
282 }
283
284 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
285 self.prom_registry = Some(registry.clone());
286 self
287 }
288
289 pub fn with_sample_rate(mut self, rate: f64) -> Self {
290 self.sample_rate = rate;
291 self
292 }
293
294 pub fn with_trace_target(mut self, target: &str) -> Self {
295 match self.trace_target {
296 Some(ref mut v) => v.push(target.to_owned()),
297 None => self.trace_target = Some(vec![target.to_owned()]),
298 };
299
300 self
301 }
302
303 pub fn with_user_info_target(mut self, target: &str) -> Self {
306 self.user_info_target.push(target.to_owned());
307 self
308 }
309
310 pub fn with_env(mut self) -> Self {
311 if env::var("CRASH_ON_PANIC").is_ok() {
312 self.crash_on_panic = true
313 }
314
315 if env::var("TRACE_FILTER").is_ok() {
316 self.enable_otlp_tracing = true
317 }
318
319 if env::var("RUST_LOG_JSON").is_ok() {
320 self.json_log_output = true;
321 }
322
323 if env::var("TOKIO_CONSOLE").is_ok() {
324 self.tokio_console = true;
325 }
326
327 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
328 self.span_level =
329 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
330 }
331
332 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
333 self.log_file = Some(filepath);
334 }
335
336 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
337 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
338 }
339
340 self
341 }
342
343 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
344 let config = self;
345 let config_clone = config.clone();
346
347 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
352 if let Some(targets) = config.trace_target {
353 for target in targets {
354 directives.push_str(&format!(",{}=trace", target));
355 }
356 }
357 let env_filter =
358 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
359 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
360 let log_filter_handle = FilterHandle(reload_handle);
361
362 let span_level = config.span_level.unwrap_or(Level::INFO);
366 let span_filter = filter::filter_fn(move |metadata| {
367 metadata.is_span() && *metadata.level() <= span_level
368 });
369
370 let mut layers = Vec::new();
371
372 if config.tokio_console {
376 layers.push(console_subscriber::spawn().boxed());
377 }
378
379 if let Some(registry) = config.prom_registry {
380 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
381 .expect("Could not initialize span latency layer");
382 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
383 }
384
385 let mut trace_filter_handle = None;
386 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
387 let mut provider = None;
388 let sampler = SamplingFilter::new(config.sample_rate);
389 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
390
391 if config.enable_otlp_tracing {
392 let trace_file = env::var("TRACE_FILE").ok();
393 let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
394 "service.name",
395 service_name.clone(),
396 )];
397 if let Ok(namespace) = env::var("NAMESPACE") {
398 otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
399 }
400 if let Ok(hostname) = env::var("HOSTNAME") {
401 otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
402 }
403 if let Ok(network) = env::var("NETWORK") {
404 otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
405 }
406
407 let resource = Resource::new(otel_kv_vec);
408 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
409
410 let telemetry = if let Some(trace_file) = trace_file {
413 let exporter =
414 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
415 file_output = exporter.cached_open_file.clone();
416 let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
417
418 let p = TracerProvider::builder()
419 .with_resource(resource)
420 .with_sampler(sampler)
421 .with_span_processor(processor)
422 .build();
423
424 let tracer = p.tracer(service_name);
425 provider = Some(p);
426
427 tracing_opentelemetry::layer().with_tracer(tracer)
428 } else {
429 let endpoint = env::var("OTLP_ENDPOINT")
430 .unwrap_or_else(|_| "http://localhost:4317".to_string());
431 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
432 .with_tonic()
433 .with_endpoint(endpoint)
434 .build()
435 .unwrap();
436 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
437 .with_resource(resource)
438 .with_sampler(sampler)
439 .with_batch_exporter(otlp_exporter, runtime::Tokio)
440 .build();
441 let tracer = tracer_provider.tracer(service_name);
442 tracing_opentelemetry::layer().with_tracer(tracer)
443 };
444
445 opentelemetry::global::set_text_map_propagator(
447 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
448 );
449
450 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
451 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
452 trace_filter_handle = Some(FilterHandle(reload_handle));
453
454 layers.push(telemetry.with_filter(trace_env_filter).boxed());
455 }
456
457 let (nb_output, worker_guard) = get_output(config.log_file.clone());
458 if config.json_log_output {
459 let json_layer = fmt::layer()
461 .with_file(true)
462 .with_line_number(true)
463 .json()
464 .with_writer(nb_output)
465 .with_filter(log_filter)
466 .boxed();
467 layers.push(json_layer);
468 } else {
469 let fmt_layer = fmt::layer()
471 .with_ansi(config.log_file.is_none() && stderr().is_tty())
472 .with_writer(nb_output.clone())
473 .with_filter(log_filter)
474 .boxed();
475
476 layers.push(fmt_layer);
477
478 if config.log_file.is_none() && stderr().is_tty() && !config.user_info_target.is_empty()
479 {
480 let mut directives = String::from("none");
482 for target in config.user_info_target {
483 directives.push_str(&format!(",{target}=info"));
484 }
485
486 let fmt_layer = fmt::layer()
487 .with_ansi(config.log_file.is_none() && stderr().is_tty())
488 .event_format(
489 fmt::format()
490 .without_time()
491 .with_target(false)
492 .with_level(false),
493 )
494 .with_writer(nb_output)
495 .with_filter(EnvFilter::new(directives))
496 .boxed();
497
498 layers.push(fmt_layer);
499 }
500 }
501
502 let subscriber = tracing_subscriber::registry().with(layers);
503 ::tracing::subscriber::set_global_default(subscriber)
504 .expect("unable to initialize tracing subscriber");
505
506 if config.panic_hook {
507 set_panic_hook(config.crash_on_panic);
508 }
509
510 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
513
514 (
515 guards,
516 TracingHandle {
517 log: log_filter_handle,
518 trace: trace_filter_handle,
519 file_output,
520 sampler,
521 },
522 )
523 }
524}
525
526#[derive(Debug, Clone)]
528struct SamplingFilter {
529 sample_rate: Arc<AtomicF64>,
531}
532
533impl SamplingFilter {
534 fn new(sample_rate: f64) -> Self {
535 SamplingFilter {
536 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
537 }
538 }
539
540 fn clamp(sample_rate: f64) -> f64 {
541 sample_rate.clamp(0.0001, 1.0)
543 }
544
545 fn update_sampling_rate(&self, sample_rate: f64) {
546 let sample_rate = Self::clamp(sample_rate);
548 self.sample_rate.store(sample_rate, Ordering::Relaxed);
549 }
550}
551
552impl ShouldSample for SamplingFilter {
553 fn should_sample(
554 &self,
555 parent_context: Option<&Context>,
556 trace_id: TraceId,
557 name: &str,
558 span_kind: &SpanKind,
559 attributes: &[KeyValue],
560 links: &[Link],
561 ) -> SamplingResult {
562 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
563 let sampler = Sampler::TraceIdRatioBased(sample_rate);
564
565 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
566 }
567}
568
569pub fn init_for_testing() {
571 static LOGGER: Lazy<()> = Lazy::new(|| {
572 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
573 .with_env_filter(
574 EnvFilter::builder()
575 .with_default_directive(LevelFilter::INFO.into())
576 .from_env_lossy(),
577 )
578 .with_file(true)
579 .with_line_number(true)
580 .with_test_writer()
581 .finish();
582 ::tracing::subscriber::set_global_default(subscriber)
583 .expect("unable to initialize logging for tests");
584 });
585
586 Lazy::force(&LOGGER);
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use prometheus::proto::MetricType;
593 use std::time::Duration;
594 use tracing::{debug, debug_span, info, trace_span, warn};
595
596 #[test]
597 #[should_panic]
598 fn test_telemetry_init() {
599 let registry = prometheus::Registry::new();
600 let config = TelemetryConfig::new()
602 .with_span_level(Level::DEBUG)
603 .with_prom_registry(®istry);
604 let _guard = config.init();
605
606 info!(a = 1, "This will be INFO.");
607 debug_span!("yo span yo").in_scope(|| {
610 debug!(a = 2, "This will be DEBUG.");
612 std::thread::sleep(Duration::from_millis(100));
613 warn!(a = 3, "This will be WARNING.");
614 });
615
616 trace_span!("this span should not be created").in_scope(|| {
618 info!("This log appears, but surrounding span is not created");
619 std::thread::sleep(Duration::from_millis(100));
620 });
621
622 let metrics = registry.gather();
623 assert_eq!(metrics.len(), 1);
625 assert_eq!(metrics[0].get_name(), "tracing_span_latencies");
626 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
627 let inner = metrics[0].get_metric();
628 assert_eq!(inner.len(), 1);
629 let labels = inner[0].get_label();
630 assert_eq!(labels.len(), 1);
631 assert_eq!(labels[0].get_name(), "span_name");
632 assert_eq!(labels[0].get_value(), "yo span yo");
633
634 panic!("This should cause error logs to be printed out!");
635 }
636
637 #[test]
640 fn testing_logger_1() {
641 init_for_testing();
642 }
643
644 #[test]
645 fn testing_logger_2() {
646 init_for_testing();
647 }
648}