telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8    Context, KeyValue,
9    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14    self, Resource, runtime,
15    trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21    env,
22    io::{Write, stderr},
23    str::FromStr,
24    sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::metadata::LevelFilter;
27use tracing::{Level, error, info};
28use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
29use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
30
31use crate::file_exporter::{CachedOpenFile, FileExporter};
32
33mod file_exporter;
34pub mod span_latency_prom;
35
36/// Alias for a type-erased error type.
37pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39/// Configuration for different logging/tracing options
40/// ===
41/// - json_log_output: Output JSON logs to stdout only.
42/// - log_file: If defined, write output to a file starting with this name, ex app.log
43/// - log_level: error/warn/info/debug/trace, defaults to info
44#[derive(Default, Clone, Debug)]
45pub struct TelemetryConfig {
46    pub enable_otlp_tracing: bool,
47    /// Enables Tokio Console debugging on port 6669
48    pub tokio_console: bool,
49    /// Output JSON logs.
50    pub json_log_output: bool,
51    /// If defined, write output to a file starting with this name, ex app.log
52    pub log_file: Option<String>,
53    /// Log level to set, defaults to info
54    pub log_string: Option<String>,
55    /// Span level - what level of spans should be created.  Note this is not same as logging level
56    /// If set to None, then defaults to INFO
57    pub span_level: Option<Level>,
58    /// Set a panic hook
59    pub panic_hook: bool,
60    /// Crash on panic
61    pub crash_on_panic: bool,
62    /// Optional Prometheus registry - if present, all enabled span latencies are measured
63    pub prom_registry: Option<prometheus::Registry>,
64    pub sample_rate: f64,
65    /// Add directive to include trace logs with provided target
66    pub trace_target: Option<Vec<String>>,
67    /// Print unadorned logs from the target on standard error if this is a tty
68    pub user_info_target: Vec<String>,
69}
70
71#[must_use]
72#[allow(dead_code)]
73pub struct TelemetryGuards {
74    worker_guard: WorkerGuard,
75    provider: Option<TracerProvider>,
76}
77
78impl TelemetryGuards {
79    fn new(
80        config: TelemetryConfig,
81        worker_guard: WorkerGuard,
82        provider: Option<TracerProvider>,
83    ) -> Self {
84        set_global_telemetry_config(config);
85        Self {
86            worker_guard,
87            provider,
88        }
89    }
90}
91
92impl Drop for TelemetryGuards {
93    fn drop(&mut self) {
94        clear_global_telemetry_config();
95    }
96}
97
98#[derive(Clone, Debug)]
99pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
100
101impl FilterHandle {
102    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
103        let filter = EnvFilter::try_new(directives)?;
104        self.0.reload(filter)?;
105        Ok(())
106    }
107
108    pub fn get(&self) -> Result<String, BoxError> {
109        self.0
110            .with_current(|filter| filter.to_string())
111            .map_err(Into::into)
112    }
113}
114
115pub struct TracingHandle {
116    log: FilterHandle,
117    trace: Option<FilterHandle>,
118    file_output: CachedOpenFile,
119    sampler: SamplingFilter,
120}
121
122impl TracingHandle {
123    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
124        self.log.update(directives)
125    }
126
127    pub fn get_log(&self) -> Result<String, BoxError> {
128        self.log.get()
129    }
130
131    pub fn update_sampling_rate(&self, sample_rate: f64) {
132        self.sampler.update_sampling_rate(sample_rate);
133    }
134
135    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
136        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
137        self.file_output.update_path(trace_path)?;
138        Ok(())
139    }
140
141    pub fn update_trace_filter<S: AsRef<str>>(
142        &self,
143        directives: S,
144        duration: Duration,
145    ) -> Result<(), BoxError> {
146        if let Some(trace) = &self.trace {
147            let res = trace.update(directives);
148            // after duration is elapsed, reset to the env setting
149            let trace = trace.clone();
150            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
151            tokio::spawn(async move {
152                tokio::time::sleep(duration).await;
153                if let Err(e) = trace.update(trace_filter_env) {
154                    error!("failed to reset trace filter: {}", e);
155                }
156            });
157            res
158        } else {
159            info!("tracing not enabled, ignoring update");
160            Ok(())
161        }
162    }
163
164    pub fn clear_file_output(&self) {
165        self.file_output.clear_path();
166    }
167
168    pub fn reset_trace(&self) {
169        if let Some(trace) = &self.trace {
170            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
171            if let Err(e) = trace.update(trace_filter_env) {
172                error!("failed to reset trace filter: {}", e);
173            }
174        }
175    }
176}
177
178fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
179    if let Some(logfile_prefix) = log_file {
180        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
181        tracing_appender::non_blocking(file_appender)
182    } else {
183        tracing_appender::non_blocking(stderr())
184    }
185}
186
187// NOTE: this function is copied from tracing's panic_hook example
188fn set_panic_hook(crash_on_panic: bool) {
189    let default_panic_handler = std::panic::take_hook();
190
191    // Set a panic hook that records the panic as a `tracing` event at the
192    // `ERROR` verbosity level.
193    //
194    // If we are currently in a span when the panic occurred, the logged event
195    // will include the current span, allowing the context in which the panic
196    // occurred to be recorded.
197    std::panic::set_hook(Box::new(move |panic| {
198        // If the panic has a source location, record it as structured fields.
199        if let Some(location) = panic.location() {
200            // On nightly Rust, where the `PanicInfo` type also exposes a
201            // `message()` method returning just the message, we could record
202            // just the message instead of the entire `fmt::Display`
203            // implementation, avoiding the duplicated location
204            tracing::error!(
205                message = %panic,
206                panic.file = location.file(),
207                panic.line = location.line(),
208                panic.column = location.column(),
209            );
210        } else {
211            tracing::error!(message = %panic);
212        }
213
214        default_panic_handler(panic);
215
216        // We're panicking so we can't do anything about the flush failing
217        let _ = std::io::stderr().flush();
218        let _ = std::io::stdout().flush();
219
220        if crash_on_panic {
221            // Kill the process
222            std::process::exit(12);
223        }
224    }));
225}
226
227static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
228    Lazy::new(|| Arc::new(Mutex::new(None)));
229
230fn set_global_telemetry_config(config: TelemetryConfig) {
231    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
232    assert!(global_config.is_none());
233    *global_config = Some(config);
234}
235
236fn clear_global_telemetry_config() {
237    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
238    *global_config = None;
239}
240
241pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
242    let global_config = GLOBAL_CONFIG.lock().unwrap();
243    global_config.clone()
244}
245
246impl TelemetryConfig {
247    pub fn new() -> Self {
248        Self {
249            enable_otlp_tracing: false,
250            tokio_console: false,
251            json_log_output: false,
252            log_file: None,
253            log_string: None,
254            span_level: None,
255            panic_hook: true,
256            crash_on_panic: false,
257            prom_registry: None,
258            sample_rate: 1.0,
259            trace_target: None,
260            user_info_target: Vec::new(),
261        }
262    }
263
264    pub fn with_json(mut self) -> Self {
265        self.json_log_output = true;
266        self
267    }
268
269    pub fn with_log_level(mut self, log_string: &str) -> Self {
270        self.log_string = Some(log_string.to_owned());
271        self
272    }
273
274    pub fn with_span_level(mut self, span_level: Level) -> Self {
275        self.span_level = Some(span_level);
276        self
277    }
278
279    pub fn with_log_file(mut self, filename: &str) -> Self {
280        self.log_file = Some(filename.to_owned());
281        self
282    }
283
284    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
285        self.prom_registry = Some(registry.clone());
286        self
287    }
288
289    pub fn with_sample_rate(mut self, rate: f64) -> Self {
290        self.sample_rate = rate;
291        self
292    }
293
294    pub fn with_trace_target(mut self, target: &str) -> Self {
295        match self.trace_target {
296            Some(ref mut v) => v.push(target.to_owned()),
297            None => self.trace_target = Some(vec![target.to_owned()]),
298        };
299
300        self
301    }
302
303    /// Adds `target` to the list of modules that will have their info & above logs printed
304    /// unadorned to standard error (for non-json output)
305    pub fn with_user_info_target(mut self, target: &str) -> Self {
306        self.user_info_target.push(target.to_owned());
307        self
308    }
309
310    pub fn with_env(mut self) -> Self {
311        if env::var("CRASH_ON_PANIC").is_ok() {
312            self.crash_on_panic = true
313        }
314
315        if env::var("TRACE_FILTER").is_ok() {
316            self.enable_otlp_tracing = true
317        }
318
319        if env::var("RUST_LOG_JSON").is_ok() {
320            self.json_log_output = true;
321        }
322
323        if env::var("TOKIO_CONSOLE").is_ok() {
324            self.tokio_console = true;
325        }
326
327        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
328            self.span_level =
329                Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
330        }
331
332        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
333            self.log_file = Some(filepath);
334        }
335
336        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
337            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
338        }
339
340        self
341    }
342
343    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
344        let config = self;
345        let config_clone = config.clone();
346
347        // Setup an EnvFilter for filtering logging output layers.
348        // NOTE: we don't want to use this to filter all layers.  That causes problems for layers with
349        // different filtering needs, including tokio-console/console-subscriber, and it also doesn't
350        // fit with the span creation needs for distributed tracing and other span-based tools.
351        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
352        if let Some(targets) = config.trace_target {
353            for target in targets {
354                directives.push_str(&format!(",{}=trace", target));
355            }
356        }
357        let env_filter =
358            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
359        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
360        let log_filter_handle = FilterHandle(reload_handle);
361
362        // Separate span level filter.
363        // This is a dumb filter for now - allows all spans that are below a given level.
364        // TODO: implement a sampling filter
365        let span_level = config.span_level.unwrap_or(Level::INFO);
366        let span_filter = filter::filter_fn(move |metadata| {
367            metadata.is_span() && *metadata.level() <= span_level
368        });
369
370        let mut layers = Vec::new();
371
372        // tokio-console layer
373        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
374        // for environment vars/config options
375        if config.tokio_console {
376            layers.push(console_subscriber::spawn().boxed());
377        }
378
379        if let Some(registry) = config.prom_registry {
380            let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
381                .expect("Could not initialize span latency layer");
382            layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
383        }
384
385        let mut trace_filter_handle = None;
386        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
387        let mut provider = None;
388        let sampler = SamplingFilter::new(config.sample_rate);
389        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
390
391        if config.enable_otlp_tracing {
392            let trace_file = env::var("TRACE_FILE").ok();
393            let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
394                "service.name",
395                service_name.clone(),
396            )];
397            if let Ok(namespace) = env::var("NAMESPACE") {
398                otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
399            }
400            if let Ok(hostname) = env::var("HOSTNAME") {
401                otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
402            }
403            if let Ok(network) = env::var("NETWORK") {
404                otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
405            }
406
407            let resource = Resource::new(otel_kv_vec);
408            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
409
410            // We can either do file output or OTLP, but not both. tracing-opentelemetry
411            // only supports a single tracer at a time.
412            let telemetry = if let Some(trace_file) = trace_file {
413                let exporter =
414                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
415                file_output = exporter.cached_open_file.clone();
416                let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
417
418                let p = TracerProvider::builder()
419                    .with_resource(resource)
420                    .with_sampler(sampler)
421                    .with_span_processor(processor)
422                    .build();
423
424                let tracer = p.tracer(service_name);
425                provider = Some(p);
426
427                tracing_opentelemetry::layer().with_tracer(tracer)
428            } else {
429                let endpoint = env::var("OTLP_ENDPOINT")
430                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
431                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
432                    .with_tonic()
433                    .with_endpoint(endpoint)
434                    .build()
435                    .unwrap();
436                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
437                    .with_resource(resource)
438                    .with_sampler(sampler)
439                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
440                    .build();
441                let tracer = tracer_provider.tracer(service_name);
442                tracing_opentelemetry::layer().with_tracer(tracer)
443            };
444
445            // Enable Trace Contexts for tying spans together
446            opentelemetry::global::set_text_map_propagator(
447                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
448            );
449
450            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
451            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
452            trace_filter_handle = Some(FilterHandle(reload_handle));
453
454            layers.push(telemetry.with_filter(trace_env_filter).boxed());
455        }
456
457        let (nb_output, worker_guard) = get_output(config.log_file.clone());
458        if config.json_log_output {
459            // Output to file or to stderr in a newline-delimited JSON format
460            let json_layer = fmt::layer()
461                .with_file(true)
462                .with_line_number(true)
463                .json()
464                .with_writer(nb_output)
465                .with_filter(log_filter)
466                .boxed();
467            layers.push(json_layer);
468        } else {
469            // Output to file or to stderr with ANSI colors
470            let fmt_layer = fmt::layer()
471                .with_ansi(config.log_file.is_none() && stderr().is_tty())
472                .with_writer(nb_output.clone())
473                .with_filter(log_filter)
474                .boxed();
475
476            layers.push(fmt_layer);
477
478            if config.log_file.is_none() && stderr().is_tty() && !config.user_info_target.is_empty()
479            {
480                // Add another printer that prints unadorned info messages to stderr
481                let mut directives = String::from("none");
482                for target in config.user_info_target {
483                    directives.push_str(&format!(",{target}=info"));
484                }
485
486                let fmt_layer = fmt::layer()
487                    .with_ansi(config.log_file.is_none() && stderr().is_tty())
488                    .event_format(
489                        fmt::format()
490                            .without_time()
491                            .with_target(false)
492                            .with_level(false),
493                    )
494                    .with_writer(nb_output)
495                    .with_filter(EnvFilter::new(directives))
496                    .boxed();
497
498                layers.push(fmt_layer);
499            }
500        }
501
502        let subscriber = tracing_subscriber::registry().with(layers);
503        ::tracing::subscriber::set_global_default(subscriber)
504            .expect("unable to initialize tracing subscriber");
505
506        if config.panic_hook {
507            set_panic_hook(config.crash_on_panic);
508        }
509
510        // The guard must be returned and kept in the main fn of the app, as when it's dropped then the output
511        // gets flushed and closed. If this is dropped too early then no output will appear!
512        let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
513
514        (
515            guards,
516            TracingHandle {
517                log: log_filter_handle,
518                trace: trace_filter_handle,
519                file_output,
520                sampler,
521            },
522        )
523    }
524}
525
526// Like Sampler::TraceIdRatioBased, but can be updated at runtime
527#[derive(Debug, Clone)]
528struct SamplingFilter {
529    // Sampling filter needs to be fast, so we avoid a mutex.
530    sample_rate: Arc<AtomicF64>,
531}
532
533impl SamplingFilter {
534    fn new(sample_rate: f64) -> Self {
535        SamplingFilter {
536            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
537        }
538    }
539
540    fn clamp(sample_rate: f64) -> f64 {
541        // clamp sample rate to between 0.0001 and 1.0
542        sample_rate.clamp(0.0001, 1.0)
543    }
544
545    fn update_sampling_rate(&self, sample_rate: f64) {
546        // clamp sample rate to between 0.0001 and 1.0
547        let sample_rate = Self::clamp(sample_rate);
548        self.sample_rate.store(sample_rate, Ordering::Relaxed);
549    }
550}
551
552impl ShouldSample for SamplingFilter {
553    fn should_sample(
554        &self,
555        parent_context: Option<&Context>,
556        trace_id: TraceId,
557        name: &str,
558        span_kind: &SpanKind,
559        attributes: &[KeyValue],
560        links: &[Link],
561    ) -> SamplingResult {
562        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
563        let sampler = Sampler::TraceIdRatioBased(sample_rate);
564
565        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
566    }
567}
568
569/// Globally set a tracing subscriber suitable for testing environments
570pub fn init_for_testing() {
571    static LOGGER: Lazy<()> = Lazy::new(|| {
572        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
573            .with_env_filter(
574                EnvFilter::builder()
575                    .with_default_directive(LevelFilter::INFO.into())
576                    .from_env_lossy(),
577            )
578            .with_file(true)
579            .with_line_number(true)
580            .with_test_writer()
581            .finish();
582        ::tracing::subscriber::set_global_default(subscriber)
583            .expect("unable to initialize logging for tests");
584    });
585
586    Lazy::force(&LOGGER);
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use prometheus::proto::MetricType;
593    use std::time::Duration;
594    use tracing::{debug, debug_span, info, trace_span, warn};
595
596    #[test]
597    #[should_panic]
598    fn test_telemetry_init() {
599        let registry = prometheus::Registry::new();
600        // Default logging level is INFO, but here we set the span level to DEBUG.  TRACE spans should be ignored.
601        let config = TelemetryConfig::new()
602            .with_span_level(Level::DEBUG)
603            .with_prom_registry(&registry);
604        let _guard = config.init();
605
606        info!(a = 1, "This will be INFO.");
607        // Spans are debug level or below, so they won't be printed out either.  However latencies
608        // should be recorded for at least one span
609        debug_span!("yo span yo").in_scope(|| {
610            // This debug log will not print out, log level set to INFO by default
611            debug!(a = 2, "This will be DEBUG.");
612            std::thread::sleep(Duration::from_millis(100));
613            warn!(a = 3, "This will be WARNING.");
614        });
615
616        // This span won't be enabled
617        trace_span!("this span should not be created").in_scope(|| {
618            info!("This log appears, but surrounding span is not created");
619            std::thread::sleep(Duration::from_millis(100));
620        });
621
622        let metrics = registry.gather();
623        // There should be 1 metricFamily and 1 metric
624        assert_eq!(metrics.len(), 1);
625        assert_eq!(metrics[0].get_name(), "tracing_span_latencies");
626        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
627        let inner = metrics[0].get_metric();
628        assert_eq!(inner.len(), 1);
629        let labels = inner[0].get_label();
630        assert_eq!(labels.len(), 1);
631        assert_eq!(labels[0].get_name(), "span_name");
632        assert_eq!(labels[0].get_value(), "yo span yo");
633
634        panic!("This should cause error logs to be printed out!");
635    }
636
637    // Both the following tests should be able to "race" to initialize logging without causing a
638    // panic
639    #[test]
640    fn testing_logger_1() {
641        init_for_testing();
642    }
643
644    #[test]
645    fn testing_logger_2() {
646        init_for_testing();
647    }
648}