1use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8 Context, KeyValue,
9 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14 self, Resource, runtime,
15 trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21 env,
22 io::{Write, stderr},
23 str::FromStr,
24 sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::dispatcher::DefaultGuard;
27use tracing::metadata::LevelFilter;
28use tracing::{Level, error, info};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_error::ErrorLayer;
31use tracing_subscriber::{
32 EnvFilter, Layer, Registry, filter,
33 fmt::{self, format::Pretty},
34 layer::SubscriberExt,
35 reload,
36};
37
38use crate::file_exporter::{CachedOpenFile, FileExporter};
39use crate::test_layer::TestLayer;
40
41mod file_exporter;
42pub mod span_latency_prom;
43mod test_layer;
44
45struct GlobalLevelFilter {
55 max_span_level: LevelFilter,
56 max_event_level: Arc<std::sync::atomic::AtomicU8>,
59}
60
61impl GlobalLevelFilter {
62 fn load_max_event_level(&self) -> LevelFilter {
63 level_filter_from_u8(self.max_event_level.load(Ordering::Relaxed))
64 }
65
66 fn exceeds_max_level(&self, metadata: &tracing::Metadata<'_>) -> bool {
67 if metadata.is_span() {
68 LevelFilter::from_level(*metadata.level()) > self.max_span_level
69 } else {
70 let max = self.load_max_event_level();
71 !max.eq(&LevelFilter::OFF) && LevelFilter::from_level(*metadata.level()) > max
72 }
73 }
74}
75
76fn level_filter_to_u8(lf: LevelFilter) -> u8 {
77 match lf {
78 LevelFilter::OFF => 0,
79 LevelFilter::ERROR => 1,
80 LevelFilter::WARN => 2,
81 LevelFilter::INFO => 3,
82 LevelFilter::DEBUG => 4,
83 LevelFilter::TRACE => 5,
84 }
85}
86
87fn level_filter_from_u8(v: u8) -> LevelFilter {
88 match v {
89 0 => LevelFilter::OFF,
90 1 => LevelFilter::ERROR,
91 2 => LevelFilter::WARN,
92 3 => LevelFilter::INFO,
93 4 => LevelFilter::DEBUG,
94 _ => LevelFilter::TRACE,
95 }
96}
97
98impl<S: tracing::Subscriber> Layer<S> for GlobalLevelFilter {
99 fn register_callsite(
100 &self,
101 metadata: &'static tracing::Metadata<'static>,
102 ) -> tracing::subscriber::Interest {
103 if self.exceeds_max_level(metadata) {
104 tracing::subscriber::Interest::never()
105 } else {
106 tracing::subscriber::Interest::always()
114 }
115 }
116
117 fn max_level_hint(&self) -> Option<LevelFilter> {
118 let event = self.load_max_event_level();
119 Some(std::cmp::max(self.max_span_level, event))
120 }
121}
122
123pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
125
126#[derive(Default, Clone, Debug)]
132pub struct TelemetryConfig {
133 pub enable_otlp_tracing: bool,
134 pub tokio_console: bool,
136 pub json_log_output: bool,
138 pub log_file: Option<String>,
140 pub log_string: Option<String>,
142 pub span_level: Option<LevelFilter>,
145 pub panic_hook: bool,
147 pub crash_on_panic: bool,
149 pub prom_registry: Option<prometheus::Registry>,
151 pub disable_span_latency: bool,
153 pub sample_rate: f64,
154 pub trace_target: Option<Vec<String>>,
156 pub user_info_target: Vec<String>,
158 pub set_global_default: bool,
160 pub enable_error_layer: bool,
162 pub enable_test_layer: bool,
164}
165
166#[must_use]
167#[allow(dead_code)]
168pub struct TelemetryGuards {
169 worker_guard: WorkerGuard,
170 provider: Option<TracerProvider>,
171 subscriber: Option<DefaultGuard>,
172}
173
174impl TelemetryGuards {
175 fn new(
176 config: TelemetryConfig,
177 worker_guard: WorkerGuard,
178 provider: Option<TracerProvider>,
179 subscriber: Option<DefaultGuard>,
180 ) -> Self {
181 if subscriber.is_none() {
183 set_global_telemetry_config(config);
184 }
185 Self {
186 worker_guard,
187 provider,
188 subscriber,
189 }
190 }
191}
192
193impl Drop for TelemetryGuards {
194 fn drop(&mut self) {
195 clear_global_telemetry_config();
196 }
197}
198
199#[derive(Clone, Debug)]
200pub struct FilterHandle {
201 reload: reload::Handle<EnvFilter, Registry>,
202 max_event_level: Option<Arc<std::sync::atomic::AtomicU8>>,
205}
206
207impl FilterHandle {
208 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
209 let filter = EnvFilter::try_new(directives)?;
210 if let Some(ref max_level) = self.max_event_level {
211 let hint = filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
212 max_level.store(level_filter_to_u8(hint), Ordering::Relaxed);
213 }
214 self.reload.reload(filter)?;
215 Ok(())
216 }
217
218 pub fn get(&self) -> Result<String, BoxError> {
219 self.reload
220 .with_current(|filter| filter.to_string())
221 .map_err(Into::into)
222 }
223}
224
225pub struct TracingHandle {
226 log: FilterHandle,
227 trace: Option<FilterHandle>,
228 file_output: CachedOpenFile,
229 test_layer: Option<TestLayer>,
230 sampler: SamplingFilter,
231}
232
233impl TracingHandle {
234 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
235 self.log.update(directives)
236 }
237
238 pub fn get_log(&self) -> Result<String, BoxError> {
239 self.log.get()
240 }
241
242 pub fn update_sampling_rate(&self, sample_rate: f64) {
243 self.sampler.update_sampling_rate(sample_rate);
244 }
245
246 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
247 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
248 self.file_output.update_path(trace_path)?;
249 Ok(())
250 }
251
252 pub fn update_trace_filter<S: AsRef<str>>(
253 &self,
254 directives: S,
255 duration: Duration,
256 ) -> Result<(), BoxError> {
257 if let Some(trace) = &self.trace {
258 let res = trace.update(directives);
259 let trace = trace.clone();
261 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
262 tokio::spawn(async move {
263 tokio::time::sleep(duration).await;
264 if let Err(e) = trace.update(trace_filter_env) {
265 error!("failed to reset trace filter: {}", e);
266 }
267 });
268 res
269 } else {
270 info!("tracing not enabled, ignoring update");
271 Ok(())
272 }
273 }
274
275 pub fn clear_file_output(&self) {
276 self.file_output.clear_path();
277 }
278
279 pub fn reset_trace(&self) {
280 if let Some(trace) = &self.trace {
281 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
282 if let Err(e) = trace.update(trace_filter_env) {
283 error!("failed to reset trace filter: {}", e);
284 }
285 }
286 }
287
288 pub fn get_test_layer_events(&self) -> Vec<String> {
289 self.test_layer
290 .as_ref()
291 .expect("test layer not enabled")
292 .get_events()
293 }
294}
295
296fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
297 if let Some(logfile_prefix) = log_file {
298 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
299 tracing_appender::non_blocking(file_appender)
300 } else {
301 tracing_appender::non_blocking(stderr())
302 }
303}
304
305fn set_panic_hook(crash_on_panic: bool) {
307 let default_panic_handler = std::panic::take_hook();
308
309 std::panic::set_hook(Box::new(move |panic| {
316 if let Some(location) = panic.location() {
318 tracing::error!(
323 message = %panic,
324 panic.file = location.file(),
325 panic.line = location.line(),
326 panic.column = location.column(),
327 );
328 } else {
329 tracing::error!(message = %panic);
330 }
331
332 default_panic_handler(panic);
333
334 let _ = std::io::stderr().flush();
336 let _ = std::io::stdout().flush();
337
338 if crash_on_panic {
339 std::process::exit(12);
341 }
342 }));
343}
344
345static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
346 Lazy::new(|| Arc::new(Mutex::new(None)));
347
348fn set_global_telemetry_config(config: TelemetryConfig) {
349 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
350 assert!(global_config.is_none());
351 *global_config = Some(config);
352}
353
354fn clear_global_telemetry_config() {
355 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
356 *global_config = None;
357}
358
359pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
360 let global_config = GLOBAL_CONFIG.lock().unwrap();
361 global_config.clone()
362}
363
364impl TelemetryConfig {
365 pub fn new() -> Self {
366 Self {
367 enable_otlp_tracing: false,
368 tokio_console: false,
369 json_log_output: false,
370 log_file: None,
371 log_string: None,
372 span_level: None,
373 panic_hook: true,
374 crash_on_panic: false,
375 prom_registry: None,
376 disable_span_latency: false,
377 sample_rate: 1.0,
378 trace_target: None,
379 user_info_target: Vec::new(),
380 set_global_default: true,
381 enable_error_layer: false,
382 enable_test_layer: false,
383 }
384 }
385
386 pub fn with_json(mut self) -> Self {
387 self.json_log_output = true;
388 self
389 }
390
391 pub fn with_log_level(mut self, log_string: &str) -> Self {
392 self.log_string = Some(log_string.to_owned());
393 self
394 }
395
396 pub fn with_span_level(mut self, span_level: Level) -> Self {
397 self.span_level = Some(LevelFilter::from_level(span_level));
398 self
399 }
400
401 pub fn with_log_file(mut self, filename: &str) -> Self {
402 self.log_file = Some(filename.to_owned());
403 self
404 }
405
406 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
407 self.prom_registry = Some(registry.clone());
408 self
409 }
410
411 pub fn with_disable_span_latency(mut self, disable: bool) -> Self {
412 self.disable_span_latency = disable;
413 self
414 }
415
416 pub fn with_sample_rate(mut self, rate: f64) -> Self {
417 self.sample_rate = rate;
418 self
419 }
420
421 pub fn with_trace_target(mut self, target: &str) -> Self {
422 match self.trace_target {
423 Some(ref mut v) => v.push(target.to_owned()),
424 None => self.trace_target = Some(vec![target.to_owned()]),
425 };
426
427 self
428 }
429
430 pub fn with_user_info_target(mut self, target: &str) -> Self {
433 self.user_info_target.push(target.to_owned());
434 self
435 }
436
437 pub fn with_set_global_default(mut self, set_global_default: bool) -> Self {
438 self.set_global_default = set_global_default;
439 self
440 }
441
442 pub fn with_enable_error_layer(mut self, enable_error_layer: bool) -> Self {
443 self.enable_error_layer = enable_error_layer;
444 self
445 }
446
447 pub fn with_enable_test_layer(mut self, enable_test_layer: bool) -> Self {
448 self.enable_test_layer = enable_test_layer;
449 self
450 }
451
452 pub fn with_env(mut self) -> Self {
453 if env::var("CRASH_ON_PANIC").is_ok() {
454 self.crash_on_panic = true
455 }
456
457 if env::var("TRACE_FILTER").is_ok() {
458 self.enable_otlp_tracing = true
459 }
460
461 if env::var("RUST_LOG_JSON").is_ok() {
462 self.json_log_output = true;
463 }
464
465 if env::var("TOKIO_CONSOLE").is_ok() {
466 self.tokio_console = true;
467 }
468
469 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
470 self.span_level =
471 Some(LevelFilter::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
472 }
473
474 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
475 self.log_file = Some(filepath);
476 }
477
478 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
479 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
480 }
481
482 if let Ok(enable_error_layer) = env::var("ENABLE_ERROR_LAYER") {
483 self.enable_error_layer = enable_error_layer
484 .parse()
485 .expect("Cannot parse ENABLE_ERROR_LAYER");
486 }
487
488 self
489 }
490
491 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
492 let config = self;
493 let config_clone = config.clone();
494
495 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
500 if let Some(targets) = config.trace_target {
501 for target in targets {
502 directives.push_str(&format!(",{}=trace", target));
503 }
504 }
505 let env_filter =
506 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
507 let max_event_level_seed = if config.enable_test_layer {
512 LevelFilter::TRACE
513 } else {
514 env_filter.max_level_hint().unwrap_or(LevelFilter::TRACE)
515 };
516 let max_event_level = Arc::new(std::sync::atomic::AtomicU8::new(level_filter_to_u8(
517 max_event_level_seed,
518 )));
519 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
520 let log_filter_handle = FilterHandle {
521 reload: reload_handle,
522 max_event_level: if config.enable_test_layer {
523 None
524 } else {
525 Some(max_event_level.clone())
526 },
527 };
528
529 let span_level = config
533 .span_level
534 .unwrap_or(LevelFilter::from_level(Level::INFO));
535 let span_filter = filter::filter_fn(move |metadata| {
536 metadata.is_span() && LevelFilter::from_level(*metadata.level()) <= span_level
537 });
538
539 let mut layers = Vec::new();
540
541 if config.tokio_console {
545 layers.push(console_subscriber::spawn().boxed());
546 }
547
548 if let Some(registry) = config.prom_registry
549 && !config.disable_span_latency
550 {
551 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
552 .expect("Could not initialize span latency layer");
553 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
554 }
555
556 let mut trace_filter_handle = None;
557 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
558 let mut provider = None;
559 let sampler = SamplingFilter::new(config.sample_rate);
560 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
561
562 if config.enable_otlp_tracing {
563 let trace_file = env::var("TRACE_FILE").ok();
564 let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
565 "service.name",
566 service_name.clone(),
567 )];
568 if let Ok(namespace) = env::var("NAMESPACE") {
569 otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
570 }
571 if let Ok(hostname) = env::var("HOSTNAME") {
572 otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
573 }
574 if let Ok(network) = env::var("NETWORK") {
575 otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
576 }
577
578 let resource = Resource::new(otel_kv_vec);
579 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
580
581 let telemetry = if let Some(trace_file) = trace_file {
584 let exporter =
585 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
586 file_output = exporter.cached_open_file.clone();
587 let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
588
589 let p = TracerProvider::builder()
590 .with_resource(resource)
591 .with_sampler(sampler)
592 .with_span_processor(processor)
593 .build();
594
595 let tracer = p.tracer(service_name);
596 provider = Some(p);
597
598 tracing_opentelemetry::layer().with_tracer(tracer)
599 } else {
600 let endpoint = env::var("OTLP_ENDPOINT")
601 .unwrap_or_else(|_| "http://localhost:4317".to_string());
602 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
603 .with_tonic()
604 .with_endpoint(endpoint)
605 .build()
606 .unwrap();
607 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
608 .with_resource(resource)
609 .with_sampler(sampler)
610 .with_batch_exporter(otlp_exporter, runtime::Tokio)
611 .build();
612 let tracer = tracer_provider.tracer(service_name);
613 tracing_opentelemetry::layer().with_tracer(tracer)
614 };
615
616 opentelemetry::global::set_text_map_propagator(
618 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
619 );
620
621 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
622 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
623 trace_filter_handle = Some(FilterHandle {
624 reload: reload_handle,
625 max_event_level: None,
626 });
627
628 layers.push(telemetry.with_filter(trace_env_filter).boxed());
629 }
630
631 let (nb_output, worker_guard) = get_output(config.log_file.clone());
632 if config.json_log_output {
633 let json_layer = fmt::layer()
635 .with_file(true)
636 .with_line_number(true)
637 .json()
638 .with_writer(nb_output)
639 .with_filter(log_filter)
640 .boxed();
641 layers.push(json_layer);
642 } else {
643 let fmt_layer = fmt::layer()
645 .with_ansi(config.log_file.is_none() && stderr().is_tty())
646 .with_writer(nb_output.clone())
647 .with_filter(log_filter)
648 .boxed();
649
650 layers.push(fmt_layer);
651
652 if config.log_file.is_none() && stderr().is_tty() && !config.user_info_target.is_empty()
653 {
654 let mut directives = String::from("none");
656 for target in config.user_info_target {
657 directives.push_str(&format!(",{target}=info"));
658 }
659
660 let fmt_layer = fmt::layer()
661 .with_ansi(config.log_file.is_none() && stderr().is_tty())
662 .event_format(
663 fmt::format()
664 .without_time()
665 .with_target(false)
666 .with_level(false),
667 )
668 .with_writer(nb_output)
669 .with_filter(EnvFilter::new(directives))
670 .boxed();
671
672 layers.push(fmt_layer);
673 }
674 }
675
676 if config.enable_error_layer {
677 layers.push(ErrorLayer::new(Pretty::default()).boxed())
678 }
679
680 let test_layer = if config.enable_test_layer {
681 let test_layer = TestLayer::new();
682 layers.push(test_layer.clone().boxed());
683 Some(test_layer)
684 } else {
685 None
686 };
687
688 let global_filter = GlobalLevelFilter {
700 max_span_level: span_level,
701 max_event_level,
702 };
703 let subscriber = tracing_subscriber::registry()
704 .with(layers)
705 .with(global_filter);
706 let subscriber_guard = if config.set_global_default {
707 tracing::subscriber::set_global_default(subscriber)
708 .expect("unable to initialize tracing subscriber");
709 None
710 } else {
711 Some(tracing::subscriber::set_default(subscriber))
712 };
713
714 if config.panic_hook {
715 set_panic_hook(config.crash_on_panic);
716 }
717
718 let guards = TelemetryGuards::new(config_clone, worker_guard, provider, subscriber_guard);
721
722 (
723 guards,
724 TracingHandle {
725 log: log_filter_handle,
726 trace: trace_filter_handle,
727 file_output,
728 test_layer,
729 sampler,
730 },
731 )
732 }
733}
734
735#[derive(Debug, Clone)]
737struct SamplingFilter {
738 sample_rate: Arc<AtomicF64>,
740}
741
742impl SamplingFilter {
743 fn new(sample_rate: f64) -> Self {
744 SamplingFilter {
745 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
746 }
747 }
748
749 fn clamp(sample_rate: f64) -> f64 {
750 sample_rate.clamp(0.0001, 1.0)
752 }
753
754 fn update_sampling_rate(&self, sample_rate: f64) {
755 let sample_rate = Self::clamp(sample_rate);
757 self.sample_rate.store(sample_rate, Ordering::Relaxed);
758 }
759}
760
761impl ShouldSample for SamplingFilter {
762 fn should_sample(
763 &self,
764 parent_context: Option<&Context>,
765 trace_id: TraceId,
766 name: &str,
767 span_kind: &SpanKind,
768 attributes: &[KeyValue],
769 links: &[Link],
770 ) -> SamplingResult {
771 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
772 let sampler = Sampler::TraceIdRatioBased(sample_rate);
773
774 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
775 }
776}
777
778pub fn init_for_testing() {
780 static LOGGER: Lazy<()> = Lazy::new(|| {
781 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
782 .with_env_filter(
783 EnvFilter::builder()
784 .with_default_directive(LevelFilter::INFO.into())
785 .from_env_lossy(),
786 )
787 .with_file(true)
788 .with_line_number(true)
789 .with_test_writer()
790 .finish();
791 ::tracing::subscriber::set_global_default(subscriber)
792 .expect("unable to initialize logging for tests");
793 });
794
795 Lazy::force(&LOGGER);
796}
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801 use prometheus::proto::MetricType;
802 use std::time::Duration;
803 use tracing::{debug, debug_span, info, trace_span, warn};
804
805 #[test]
806 #[should_panic]
807 fn test_telemetry_init() {
808 let registry = prometheus::Registry::new();
809 let config = TelemetryConfig::new()
811 .with_span_level(Level::DEBUG)
812 .with_prom_registry(®istry);
813 let _guard = config.init();
814
815 info!(a = 1, "This will be INFO.");
816 debug_span!("yo span yo").in_scope(|| {
819 debug!(a = 2, "This will be DEBUG.");
821 std::thread::sleep(Duration::from_millis(100));
822 warn!(a = 3, "This will be WARNING.");
823 });
824
825 trace_span!("this span should not be created").in_scope(|| {
827 info!("This log appears, but surrounding span is not created");
828 std::thread::sleep(Duration::from_millis(100));
829 });
830
831 let metrics = registry.gather();
832 assert_eq!(metrics.len(), 1);
834 assert_eq!(metrics[0].name(), "tracing_span_latencies");
835 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
836 let inner = metrics[0].get_metric();
837 assert_eq!(inner.len(), 1);
838 let labels = inner[0].get_label();
839 assert_eq!(labels.len(), 1);
840 assert_eq!(labels[0].name(), "span_name");
841 assert_eq!(labels[0].value(), "yo span yo");
842
843 panic!("This should cause error logs to be printed out!");
844 }
845
846 #[test]
849 fn testing_logger_1() {
850 init_for_testing();
851 }
852
853 #[test]
854 fn testing_logger_2() {
855 init_for_testing();
856 }
857}