telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8    Context, KeyValue,
9    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14    self, Resource, runtime,
15    trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21    env,
22    io::{Write, stderr},
23    str::FromStr,
24    sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::dispatcher::DefaultGuard;
27use tracing::metadata::LevelFilter;
28use tracing::{Level, error, info};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_error::ErrorLayer;
31use tracing_subscriber::{
32    EnvFilter, Layer, Registry, filter,
33    fmt::{self, format::Pretty},
34    layer::SubscriberExt,
35    reload,
36};
37
38use crate::file_exporter::{CachedOpenFile, FileExporter};
39use crate::test_layer::TestLayer;
40
41mod file_exporter;
42pub mod span_latency_prom;
43mod test_layer;
44
45/// Global filter layer that rejects callsites above the configured levels at registration time.
46///
47/// Without this, per-layer filtering causes the Registry to return `Interest::always()` for
48/// every callsite. That means trace-level `#[instrument]` spans are allocated in the slab,
49/// have their per-layer filters walked, and are immediately discarded — all at significant cost.
50/// The same applies to events below the env filter threshold.
51///
52/// By rejecting high-verbosity callsites globally, `Span::new` short-circuits to
53/// `Span::none()` and event macros short-circuit before formatting arguments.
54struct GlobalLevelFilter {
55    max_span_level: LevelFilter,
56    /// The maximum event level any layer will accept. Loaded from the EnvFilter's
57    /// max_level_hint and updated via a shared atomic when the filter is reloaded.
58    max_event_level: Arc<std::sync::atomic::AtomicU8>,
59}
60
61impl GlobalLevelFilter {
62    fn load_max_event_level(&self) -> LevelFilter {
63        level_filter_from_u8(self.max_event_level.load(Ordering::Relaxed))
64    }
65
66    fn exceeds_max_level(&self, metadata: &tracing::Metadata<'_>) -> bool {
67        if metadata.is_span() {
68            LevelFilter::from_level(*metadata.level()) > self.max_span_level
69        } else {
70            let max = self.load_max_event_level();
71            !max.eq(&LevelFilter::OFF) && LevelFilter::from_level(*metadata.level()) > max
72        }
73    }
74}
75
76fn level_filter_to_u8(lf: LevelFilter) -> u8 {
77    match lf {
78        LevelFilter::OFF => 0,
79        LevelFilter::ERROR => 1,
80        LevelFilter::WARN => 2,
81        LevelFilter::INFO => 3,
82        LevelFilter::DEBUG => 4,
83        LevelFilter::TRACE => 5,
84    }
85}
86
87fn level_filter_from_u8(v: u8) -> LevelFilter {
88    match v {
89        0 => LevelFilter::OFF,
90        1 => LevelFilter::ERROR,
91        2 => LevelFilter::WARN,
92        3 => LevelFilter::INFO,
93        4 => LevelFilter::DEBUG,
94        _ => LevelFilter::TRACE,
95    }
96}
97
98impl<S: tracing::Subscriber> Layer<S> for GlobalLevelFilter {
99    fn register_callsite(
100        &self,
101        metadata: &'static tracing::Metadata<'static>,
102    ) -> tracing::subscriber::Interest {
103        if self.exceeds_max_level(metadata) {
104            tracing::subscriber::Interest::never()
105        } else {
106            // Return always() for passing callsites. The final interest will be
107            // determined by the Registry (which returns sometimes() when per-layer
108            // filters are present). We intentionally do NOT override enabled() —
109            // doing so would add an extra check to every enabled() walk, which the
110            // per-layer filters already handle. For dynamic env filter reloads,
111            // rebuild_interest_cache() re-calls register_callsite with our updated
112            // atomic max_event_level.
113            tracing::subscriber::Interest::always()
114        }
115    }
116
117    fn max_level_hint(&self) -> Option<LevelFilter> {
118        let event = self.load_max_event_level();
119        Some(std::cmp::max(self.max_span_level, event))
120    }
121}
122
123/// Alias for a type-erased error type.
124pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
125
126/// Configuration for different logging/tracing options
127/// ===
128/// - json_log_output: Output JSON logs to stdout only.
129/// - log_file: If defined, write output to a file starting with this name, ex app.log
130/// - log_level: error/warn/info/debug/trace, defaults to info
131#[derive(Default, Clone, Debug)]
132pub struct TelemetryConfig {
133    pub enable_otlp_tracing: bool,
134    /// Enables Tokio Console debugging on port 6669
135    pub tokio_console: bool,
136    /// Output JSON logs.
137    pub json_log_output: bool,
138    /// If defined, write output to a file starting with this name, ex app.log
139    pub log_file: Option<String>,
140    /// Log level to set, defaults to info
141    pub log_string: Option<String>,
142    /// Span level - what level of spans should be created.  Note this is not same as logging level.
143    /// If set to None, then defaults to INFO. Use LevelFilter::OFF to disable all spans.
144    pub span_level: Option<LevelFilter>,
145    /// Set a panic hook
146    pub panic_hook: bool,
147    /// Crash on panic
148    pub crash_on_panic: bool,
149    /// Optional Prometheus registry - if present, all enabled span latencies are measured
150    pub prom_registry: Option<prometheus::Registry>,
151    /// Disable the PrometheusSpanLatencyLayer even when a prom_registry is set.
152    pub disable_span_latency: bool,
153    pub sample_rate: f64,
154    /// Add directive to include trace logs with provided target
155    pub trace_target: Option<Vec<String>>,
156    /// Print unadorned logs from the target on standard error if this is a tty
157    pub user_info_target: Vec<String>,
158    // Sets the subscriber created by this config to be the global default. Defaults to true if not set.
159    pub set_global_default: bool,
160    // Add error layer to capture trace spans. Defaults to false if not set.
161    pub enable_error_layer: bool,
162    // Capture output for tests. Defaults to false if not set.
163    pub enable_test_layer: bool,
164}
165
166#[must_use]
167#[allow(dead_code)]
168pub struct TelemetryGuards {
169    worker_guard: WorkerGuard,
170    provider: Option<TracerProvider>,
171    subscriber: Option<DefaultGuard>,
172}
173
174impl TelemetryGuards {
175    fn new(
176        config: TelemetryConfig,
177        worker_guard: WorkerGuard,
178        provider: Option<TracerProvider>,
179        subscriber: Option<DefaultGuard>,
180    ) -> Self {
181        // Do not set the global config if subscriber is present because a global subscriber was not set.
182        if subscriber.is_none() {
183            set_global_telemetry_config(config);
184        }
185        Self {
186            worker_guard,
187            provider,
188            subscriber,
189        }
190    }
191}
192
193impl Drop for TelemetryGuards {
194    fn drop(&mut self) {
195        clear_global_telemetry_config();
196    }
197}
198
199#[derive(Clone, Debug)]
200pub struct FilterHandle {
201    reload: reload::Handle<EnvFilter, Registry>,
202    /// Shared with GlobalLevelFilter so that reloading the env filter also
203    /// updates the global event-level gate.
204    max_event_level: Option<Arc<std::sync::atomic::AtomicU8>>,
205}
206
207impl FilterHandle {
208    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
209        let filter = EnvFilter::try_new(directives)?;
210        if let Some(ref max_level) = self.max_event_level {
211            let hint = filter.max_level_hint().unwrap_or(LevelFilter::TRACE);
212            max_level.store(level_filter_to_u8(hint), Ordering::Relaxed);
213        }
214        self.reload.reload(filter)?;
215        Ok(())
216    }
217
218    pub fn get(&self) -> Result<String, BoxError> {
219        self.reload
220            .with_current(|filter| filter.to_string())
221            .map_err(Into::into)
222    }
223}
224
225pub struct TracingHandle {
226    log: FilterHandle,
227    trace: Option<FilterHandle>,
228    file_output: CachedOpenFile,
229    test_layer: Option<TestLayer>,
230    sampler: SamplingFilter,
231}
232
233impl TracingHandle {
234    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
235        self.log.update(directives)
236    }
237
238    pub fn get_log(&self) -> Result<String, BoxError> {
239        self.log.get()
240    }
241
242    pub fn update_sampling_rate(&self, sample_rate: f64) {
243        self.sampler.update_sampling_rate(sample_rate);
244    }
245
246    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
247        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
248        self.file_output.update_path(trace_path)?;
249        Ok(())
250    }
251
252    pub fn update_trace_filter<S: AsRef<str>>(
253        &self,
254        directives: S,
255        duration: Duration,
256    ) -> Result<(), BoxError> {
257        if let Some(trace) = &self.trace {
258            let res = trace.update(directives);
259            // after duration is elapsed, reset to the env setting
260            let trace = trace.clone();
261            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
262            tokio::spawn(async move {
263                tokio::time::sleep(duration).await;
264                if let Err(e) = trace.update(trace_filter_env) {
265                    error!("failed to reset trace filter: {}", e);
266                }
267            });
268            res
269        } else {
270            info!("tracing not enabled, ignoring update");
271            Ok(())
272        }
273    }
274
275    pub fn clear_file_output(&self) {
276        self.file_output.clear_path();
277    }
278
279    pub fn reset_trace(&self) {
280        if let Some(trace) = &self.trace {
281            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
282            if let Err(e) = trace.update(trace_filter_env) {
283                error!("failed to reset trace filter: {}", e);
284            }
285        }
286    }
287
288    pub fn get_test_layer_events(&self) -> Vec<String> {
289        self.test_layer
290            .as_ref()
291            .expect("test layer not enabled")
292            .get_events()
293    }
294}
295
296fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
297    if let Some(logfile_prefix) = log_file {
298        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
299        tracing_appender::non_blocking(file_appender)
300    } else {
301        tracing_appender::non_blocking(stderr())
302    }
303}
304
305// NOTE: this function is copied from tracing's panic_hook example
306fn set_panic_hook(crash_on_panic: bool) {
307    let default_panic_handler = std::panic::take_hook();
308
309    // Set a panic hook that records the panic as a `tracing` event at the
310    // `ERROR` verbosity level.
311    //
312    // If we are currently in a span when the panic occurred, the logged event
313    // will include the current span, allowing the context in which the panic
314    // occurred to be recorded.
315    std::panic::set_hook(Box::new(move |panic| {
316        // If the panic has a source location, record it as structured fields.
317        if let Some(location) = panic.location() {
318            // On nightly Rust, where the `PanicInfo` type also exposes a
319            // `message()` method returning just the message, we could record
320            // just the message instead of the entire `fmt::Display`
321            // implementation, avoiding the duplicated location
322            tracing::error!(
323                message = %panic,
324                panic.file = location.file(),
325                panic.line = location.line(),
326                panic.column = location.column(),
327            );
328        } else {
329            tracing::error!(message = %panic);
330        }
331
332        default_panic_handler(panic);
333
334        // We're panicking so we can't do anything about the flush failing
335        let _ = std::io::stderr().flush();
336        let _ = std::io::stdout().flush();
337
338        if crash_on_panic {
339            // Kill the process
340            std::process::exit(12);
341        }
342    }));
343}
344
345static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
346    Lazy::new(|| Arc::new(Mutex::new(None)));
347
348fn set_global_telemetry_config(config: TelemetryConfig) {
349    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
350    assert!(global_config.is_none());
351    *global_config = Some(config);
352}
353
354fn clear_global_telemetry_config() {
355    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
356    *global_config = None;
357}
358
359pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
360    let global_config = GLOBAL_CONFIG.lock().unwrap();
361    global_config.clone()
362}
363
364impl TelemetryConfig {
365    pub fn new() -> Self {
366        Self {
367            enable_otlp_tracing: false,
368            tokio_console: false,
369            json_log_output: false,
370            log_file: None,
371            log_string: None,
372            span_level: None,
373            panic_hook: true,
374            crash_on_panic: false,
375            prom_registry: None,
376            disable_span_latency: false,
377            sample_rate: 1.0,
378            trace_target: None,
379            user_info_target: Vec::new(),
380            set_global_default: true,
381            enable_error_layer: false,
382            enable_test_layer: false,
383        }
384    }
385
386    pub fn with_json(mut self) -> Self {
387        self.json_log_output = true;
388        self
389    }
390
391    pub fn with_log_level(mut self, log_string: &str) -> Self {
392        self.log_string = Some(log_string.to_owned());
393        self
394    }
395
396    pub fn with_span_level(mut self, span_level: Level) -> Self {
397        self.span_level = Some(LevelFilter::from_level(span_level));
398        self
399    }
400
401    pub fn with_log_file(mut self, filename: &str) -> Self {
402        self.log_file = Some(filename.to_owned());
403        self
404    }
405
406    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
407        self.prom_registry = Some(registry.clone());
408        self
409    }
410
411    pub fn with_disable_span_latency(mut self, disable: bool) -> Self {
412        self.disable_span_latency = disable;
413        self
414    }
415
416    pub fn with_sample_rate(mut self, rate: f64) -> Self {
417        self.sample_rate = rate;
418        self
419    }
420
421    pub fn with_trace_target(mut self, target: &str) -> Self {
422        match self.trace_target {
423            Some(ref mut v) => v.push(target.to_owned()),
424            None => self.trace_target = Some(vec![target.to_owned()]),
425        };
426
427        self
428    }
429
430    /// Adds `target` to the list of modules that will have their info & above logs printed
431    /// unadorned to standard error (for non-json output)
432    pub fn with_user_info_target(mut self, target: &str) -> Self {
433        self.user_info_target.push(target.to_owned());
434        self
435    }
436
437    pub fn with_set_global_default(mut self, set_global_default: bool) -> Self {
438        self.set_global_default = set_global_default;
439        self
440    }
441
442    pub fn with_enable_error_layer(mut self, enable_error_layer: bool) -> Self {
443        self.enable_error_layer = enable_error_layer;
444        self
445    }
446
447    pub fn with_enable_test_layer(mut self, enable_test_layer: bool) -> Self {
448        self.enable_test_layer = enable_test_layer;
449        self
450    }
451
452    pub fn with_env(mut self) -> Self {
453        if env::var("CRASH_ON_PANIC").is_ok() {
454            self.crash_on_panic = true
455        }
456
457        if env::var("TRACE_FILTER").is_ok() {
458            self.enable_otlp_tracing = true
459        }
460
461        if env::var("RUST_LOG_JSON").is_ok() {
462            self.json_log_output = true;
463        }
464
465        if env::var("TOKIO_CONSOLE").is_ok() {
466            self.tokio_console = true;
467        }
468
469        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
470            self.span_level =
471                Some(LevelFilter::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
472        }
473
474        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
475            self.log_file = Some(filepath);
476        }
477
478        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
479            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
480        }
481
482        if let Ok(enable_error_layer) = env::var("ENABLE_ERROR_LAYER") {
483            self.enable_error_layer = enable_error_layer
484                .parse()
485                .expect("Cannot parse ENABLE_ERROR_LAYER");
486        }
487
488        self
489    }
490
491    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
492        let config = self;
493        let config_clone = config.clone();
494
495        // Setup an EnvFilter for filtering logging output layers.
496        // NOTE: we don't want to use this to filter all layers.  That causes problems for layers with
497        // different filtering needs, including tokio-console/console-subscriber, and it also doesn't
498        // fit with the span creation needs for distributed tracing and other span-based tools.
499        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
500        if let Some(targets) = config.trace_target {
501            for target in targets {
502                directives.push_str(&format!(",{}=trace", target));
503            }
504        }
505        let env_filter =
506            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
507        // When the test layer is enabled, it accepts events at any level without a
508        // per-layer filter, so global event-level filtering would hide events the
509        // test wants to capture. Disable it by pinning max_event_level to TRACE
510        // (and skip propagating env filter reloads into it).
511        let max_event_level_seed = if config.enable_test_layer {
512            LevelFilter::TRACE
513        } else {
514            env_filter.max_level_hint().unwrap_or(LevelFilter::TRACE)
515        };
516        let max_event_level = Arc::new(std::sync::atomic::AtomicU8::new(level_filter_to_u8(
517            max_event_level_seed,
518        )));
519        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
520        let log_filter_handle = FilterHandle {
521            reload: reload_handle,
522            max_event_level: if config.enable_test_layer {
523                None
524            } else {
525                Some(max_event_level.clone())
526            },
527        };
528
529        // Separate span level filter.
530        // This is a dumb filter for now - allows all spans that are below a given level.
531        // TODO: implement a sampling filter
532        let span_level = config
533            .span_level
534            .unwrap_or(LevelFilter::from_level(Level::INFO));
535        let span_filter = filter::filter_fn(move |metadata| {
536            metadata.is_span() && LevelFilter::from_level(*metadata.level()) <= span_level
537        });
538
539        let mut layers = Vec::new();
540
541        // tokio-console layer
542        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
543        // for environment vars/config options
544        if config.tokio_console {
545            layers.push(console_subscriber::spawn().boxed());
546        }
547
548        if let Some(registry) = config.prom_registry
549            && !config.disable_span_latency
550        {
551            let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
552                .expect("Could not initialize span latency layer");
553            layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
554        }
555
556        let mut trace_filter_handle = None;
557        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
558        let mut provider = None;
559        let sampler = SamplingFilter::new(config.sample_rate);
560        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
561
562        if config.enable_otlp_tracing {
563            let trace_file = env::var("TRACE_FILE").ok();
564            let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
565                "service.name",
566                service_name.clone(),
567            )];
568            if let Ok(namespace) = env::var("NAMESPACE") {
569                otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
570            }
571            if let Ok(hostname) = env::var("HOSTNAME") {
572                otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
573            }
574            if let Ok(network) = env::var("NETWORK") {
575                otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
576            }
577
578            let resource = Resource::new(otel_kv_vec);
579            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
580
581            // We can either do file output or OTLP, but not both. tracing-opentelemetry
582            // only supports a single tracer at a time.
583            let telemetry = if let Some(trace_file) = trace_file {
584                let exporter =
585                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
586                file_output = exporter.cached_open_file.clone();
587                let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
588
589                let p = TracerProvider::builder()
590                    .with_resource(resource)
591                    .with_sampler(sampler)
592                    .with_span_processor(processor)
593                    .build();
594
595                let tracer = p.tracer(service_name);
596                provider = Some(p);
597
598                tracing_opentelemetry::layer().with_tracer(tracer)
599            } else {
600                let endpoint = env::var("OTLP_ENDPOINT")
601                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
602                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
603                    .with_tonic()
604                    .with_endpoint(endpoint)
605                    .build()
606                    .unwrap();
607                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
608                    .with_resource(resource)
609                    .with_sampler(sampler)
610                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
611                    .build();
612                let tracer = tracer_provider.tracer(service_name);
613                tracing_opentelemetry::layer().with_tracer(tracer)
614            };
615
616            // Enable Trace Contexts for tying spans together
617            opentelemetry::global::set_text_map_propagator(
618                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
619            );
620
621            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
622            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
623            trace_filter_handle = Some(FilterHandle {
624                reload: reload_handle,
625                max_event_level: None,
626            });
627
628            layers.push(telemetry.with_filter(trace_env_filter).boxed());
629        }
630
631        let (nb_output, worker_guard) = get_output(config.log_file.clone());
632        if config.json_log_output {
633            // Output to file or to stderr in a newline-delimited JSON format
634            let json_layer = fmt::layer()
635                .with_file(true)
636                .with_line_number(true)
637                .json()
638                .with_writer(nb_output)
639                .with_filter(log_filter)
640                .boxed();
641            layers.push(json_layer);
642        } else {
643            // Output to file or to stderr with ANSI colors
644            let fmt_layer = fmt::layer()
645                .with_ansi(config.log_file.is_none() && stderr().is_tty())
646                .with_writer(nb_output.clone())
647                .with_filter(log_filter)
648                .boxed();
649
650            layers.push(fmt_layer);
651
652            if config.log_file.is_none() && stderr().is_tty() && !config.user_info_target.is_empty()
653            {
654                // Add another printer that prints unadorned info messages to stderr
655                let mut directives = String::from("none");
656                for target in config.user_info_target {
657                    directives.push_str(&format!(",{target}=info"));
658                }
659
660                let fmt_layer = fmt::layer()
661                    .with_ansi(config.log_file.is_none() && stderr().is_tty())
662                    .event_format(
663                        fmt::format()
664                            .without_time()
665                            .with_target(false)
666                            .with_level(false),
667                    )
668                    .with_writer(nb_output)
669                    .with_filter(EnvFilter::new(directives))
670                    .boxed();
671
672                layers.push(fmt_layer);
673            }
674        }
675
676        if config.enable_error_layer {
677            layers.push(ErrorLayer::new(Pretty::default()).boxed())
678        }
679
680        let test_layer = if config.enable_test_layer {
681            let test_layer = TestLayer::new();
682            layers.push(test_layer.clone().boxed());
683            Some(test_layer)
684        } else {
685            None
686        };
687
688        // Global level filter: rejects span callsites above span_level and event
689        // callsites above the env filter's max level at registration time, preventing
690        // the Registry from dispatching callsites that every per-layer filter would
691        // immediately discard.
692        //
693        // Must be stacked on top of `layers` via `.with()` rather than pushed into the
694        // Vec. `Vec<Layer>::register_callsite` returns the most permissive Interest
695        // across its layers, so a `never()` from this filter would be overridden by
696        // any layer (e.g. fmt+EnvFilter) returning `sometimes()`. As an outer
697        // `Layered`, `pick_interest` short-circuits to `never()` and the inner stack
698        // is never consulted.
699        let global_filter = GlobalLevelFilter {
700            max_span_level: span_level,
701            max_event_level,
702        };
703        let subscriber = tracing_subscriber::registry()
704            .with(layers)
705            .with(global_filter);
706        let subscriber_guard = if config.set_global_default {
707            tracing::subscriber::set_global_default(subscriber)
708                .expect("unable to initialize tracing subscriber");
709            None
710        } else {
711            Some(tracing::subscriber::set_default(subscriber))
712        };
713
714        if config.panic_hook {
715            set_panic_hook(config.crash_on_panic);
716        }
717
718        // The guard must be returned and kept in the main fn of the app, as when it's dropped then the output
719        // gets flushed and closed. If this is dropped too early then no output will appear!
720        let guards = TelemetryGuards::new(config_clone, worker_guard, provider, subscriber_guard);
721
722        (
723            guards,
724            TracingHandle {
725                log: log_filter_handle,
726                trace: trace_filter_handle,
727                file_output,
728                test_layer,
729                sampler,
730            },
731        )
732    }
733}
734
735// Like Sampler::TraceIdRatioBased, but can be updated at runtime
736#[derive(Debug, Clone)]
737struct SamplingFilter {
738    // Sampling filter needs to be fast, so we avoid a mutex.
739    sample_rate: Arc<AtomicF64>,
740}
741
742impl SamplingFilter {
743    fn new(sample_rate: f64) -> Self {
744        SamplingFilter {
745            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
746        }
747    }
748
749    fn clamp(sample_rate: f64) -> f64 {
750        // clamp sample rate to between 0.0001 and 1.0
751        sample_rate.clamp(0.0001, 1.0)
752    }
753
754    fn update_sampling_rate(&self, sample_rate: f64) {
755        // clamp sample rate to between 0.0001 and 1.0
756        let sample_rate = Self::clamp(sample_rate);
757        self.sample_rate.store(sample_rate, Ordering::Relaxed);
758    }
759}
760
761impl ShouldSample for SamplingFilter {
762    fn should_sample(
763        &self,
764        parent_context: Option<&Context>,
765        trace_id: TraceId,
766        name: &str,
767        span_kind: &SpanKind,
768        attributes: &[KeyValue],
769        links: &[Link],
770    ) -> SamplingResult {
771        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
772        let sampler = Sampler::TraceIdRatioBased(sample_rate);
773
774        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
775    }
776}
777
778/// Globally set a tracing subscriber suitable for testing environments
779pub fn init_for_testing() {
780    static LOGGER: Lazy<()> = Lazy::new(|| {
781        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
782            .with_env_filter(
783                EnvFilter::builder()
784                    .with_default_directive(LevelFilter::INFO.into())
785                    .from_env_lossy(),
786            )
787            .with_file(true)
788            .with_line_number(true)
789            .with_test_writer()
790            .finish();
791        ::tracing::subscriber::set_global_default(subscriber)
792            .expect("unable to initialize logging for tests");
793    });
794
795    Lazy::force(&LOGGER);
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801    use prometheus::proto::MetricType;
802    use std::time::Duration;
803    use tracing::{debug, debug_span, info, trace_span, warn};
804
805    #[test]
806    #[should_panic]
807    fn test_telemetry_init() {
808        let registry = prometheus::Registry::new();
809        // Default logging level is INFO, but here we set the span level to DEBUG.  TRACE spans should be ignored.
810        let config = TelemetryConfig::new()
811            .with_span_level(Level::DEBUG)
812            .with_prom_registry(&registry);
813        let _guard = config.init();
814
815        info!(a = 1, "This will be INFO.");
816        // Spans are debug level or below, so they won't be printed out either.  However latencies
817        // should be recorded for at least one span
818        debug_span!("yo span yo").in_scope(|| {
819            // This debug log will not print out, log level set to INFO by default
820            debug!(a = 2, "This will be DEBUG.");
821            std::thread::sleep(Duration::from_millis(100));
822            warn!(a = 3, "This will be WARNING.");
823        });
824
825        // This span won't be enabled
826        trace_span!("this span should not be created").in_scope(|| {
827            info!("This log appears, but surrounding span is not created");
828            std::thread::sleep(Duration::from_millis(100));
829        });
830
831        let metrics = registry.gather();
832        // There should be 1 metricFamily and 1 metric
833        assert_eq!(metrics.len(), 1);
834        assert_eq!(metrics[0].name(), "tracing_span_latencies");
835        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
836        let inner = metrics[0].get_metric();
837        assert_eq!(inner.len(), 1);
838        let labels = inner[0].get_label();
839        assert_eq!(labels.len(), 1);
840        assert_eq!(labels[0].name(), "span_name");
841        assert_eq!(labels[0].value(), "yo span yo");
842
843        panic!("This should cause error logs to be printed out!");
844    }
845
846    // Both the following tests should be able to "race" to initialize logging without causing a
847    // panic
848    #[test]
849    fn testing_logger_1() {
850        init_for_testing();
851    }
852
853    #[test]
854    fn testing_logger_2() {
855        init_for_testing();
856    }
857}