telemetry_subscribers/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8    Context, KeyValue,
9    trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14    self, Resource, runtime,
15    trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21    env,
22    io::{Write, stderr},
23    str::FromStr,
24    sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::metadata::LevelFilter;
27use tracing::{Level, error, info};
28use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
29use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
30
31use crate::file_exporter::{CachedOpenFile, FileExporter};
32
33mod file_exporter;
34pub mod span_latency_prom;
35
36/// Alias for a type-erased error type.
37pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39/// Configuration for different logging/tracing options
40/// ===
41/// - json_log_output: Output JSON logs to stdout only.
42/// - log_file: If defined, write output to a file starting with this name, ex app.log
43/// - log_level: error/warn/info/debug/trace, defaults to info
44#[derive(Default, Clone, Debug)]
45pub struct TelemetryConfig {
46    pub enable_otlp_tracing: bool,
47    /// Enables Tokio Console debugging on port 6669
48    pub tokio_console: bool,
49    /// Output JSON logs.
50    pub json_log_output: bool,
51    /// If defined, write output to a file starting with this name, ex app.log
52    pub log_file: Option<String>,
53    /// Log level to set, defaults to info
54    pub log_string: Option<String>,
55    /// Span level - what level of spans should be created.  Note this is not same as logging level
56    /// If set to None, then defaults to INFO
57    pub span_level: Option<Level>,
58    /// Set a panic hook
59    pub panic_hook: bool,
60    /// Crash on panic
61    pub crash_on_panic: bool,
62    /// Optional Prometheus registry - if present, all enabled span latencies are measured
63    pub prom_registry: Option<prometheus::Registry>,
64    pub sample_rate: f64,
65    /// Add directive to include trace logs with provided target
66    pub trace_target: Option<Vec<String>>,
67}
68
69#[must_use]
70#[allow(dead_code)]
71pub struct TelemetryGuards {
72    worker_guard: WorkerGuard,
73    provider: Option<TracerProvider>,
74}
75
76impl TelemetryGuards {
77    fn new(
78        config: TelemetryConfig,
79        worker_guard: WorkerGuard,
80        provider: Option<TracerProvider>,
81    ) -> Self {
82        set_global_telemetry_config(config);
83        Self {
84            worker_guard,
85            provider,
86        }
87    }
88}
89
90impl Drop for TelemetryGuards {
91    fn drop(&mut self) {
92        clear_global_telemetry_config();
93    }
94}
95
96#[derive(Clone, Debug)]
97pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
98
99impl FilterHandle {
100    pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
101        let filter = EnvFilter::try_new(directives)?;
102        self.0.reload(filter)?;
103        Ok(())
104    }
105
106    pub fn get(&self) -> Result<String, BoxError> {
107        self.0
108            .with_current(|filter| filter.to_string())
109            .map_err(Into::into)
110    }
111}
112
113pub struct TracingHandle {
114    log: FilterHandle,
115    trace: Option<FilterHandle>,
116    file_output: CachedOpenFile,
117    sampler: SamplingFilter,
118}
119
120impl TracingHandle {
121    pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
122        self.log.update(directives)
123    }
124
125    pub fn get_log(&self) -> Result<String, BoxError> {
126        self.log.get()
127    }
128
129    pub fn update_sampling_rate(&self, sample_rate: f64) {
130        self.sampler.update_sampling_rate(sample_rate);
131    }
132
133    pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
134        let trace_path = PathBuf::from_str(trace_file.as_ref())?;
135        self.file_output.update_path(trace_path)?;
136        Ok(())
137    }
138
139    pub fn update_trace_filter<S: AsRef<str>>(
140        &self,
141        directives: S,
142        duration: Duration,
143    ) -> Result<(), BoxError> {
144        if let Some(trace) = &self.trace {
145            let res = trace.update(directives);
146            // after duration is elapsed, reset to the env setting
147            let trace = trace.clone();
148            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
149            tokio::spawn(async move {
150                tokio::time::sleep(duration).await;
151                if let Err(e) = trace.update(trace_filter_env) {
152                    error!("failed to reset trace filter: {}", e);
153                }
154            });
155            res
156        } else {
157            info!("tracing not enabled, ignoring update");
158            Ok(())
159        }
160    }
161
162    pub fn clear_file_output(&self) {
163        self.file_output.clear_path();
164    }
165
166    pub fn reset_trace(&self) {
167        if let Some(trace) = &self.trace {
168            let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
169            if let Err(e) = trace.update(trace_filter_env) {
170                error!("failed to reset trace filter: {}", e);
171            }
172        }
173    }
174}
175
176fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
177    if let Some(logfile_prefix) = log_file {
178        let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
179        tracing_appender::non_blocking(file_appender)
180    } else {
181        tracing_appender::non_blocking(stderr())
182    }
183}
184
185// NOTE: this function is copied from tracing's panic_hook example
186fn set_panic_hook(crash_on_panic: bool) {
187    let default_panic_handler = std::panic::take_hook();
188
189    // Set a panic hook that records the panic as a `tracing` event at the
190    // `ERROR` verbosity level.
191    //
192    // If we are currently in a span when the panic occurred, the logged event
193    // will include the current span, allowing the context in which the panic
194    // occurred to be recorded.
195    std::panic::set_hook(Box::new(move |panic| {
196        // If the panic has a source location, record it as structured fields.
197        if let Some(location) = panic.location() {
198            // On nightly Rust, where the `PanicInfo` type also exposes a
199            // `message()` method returning just the message, we could record
200            // just the message instead of the entire `fmt::Display`
201            // implementation, avoiding the duplicated location
202            tracing::error!(
203                message = %panic,
204                panic.file = location.file(),
205                panic.line = location.line(),
206                panic.column = location.column(),
207            );
208        } else {
209            tracing::error!(message = %panic);
210        }
211
212        default_panic_handler(panic);
213
214        // We're panicking so we can't do anything about the flush failing
215        let _ = std::io::stderr().flush();
216        let _ = std::io::stdout().flush();
217
218        if crash_on_panic {
219            // Kill the process
220            std::process::exit(12);
221        }
222    }));
223}
224
225static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
226    Lazy::new(|| Arc::new(Mutex::new(None)));
227
228fn set_global_telemetry_config(config: TelemetryConfig) {
229    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
230    assert!(global_config.is_none());
231    *global_config = Some(config);
232}
233
234fn clear_global_telemetry_config() {
235    let mut global_config = GLOBAL_CONFIG.lock().unwrap();
236    *global_config = None;
237}
238
239pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
240    let global_config = GLOBAL_CONFIG.lock().unwrap();
241    global_config.clone()
242}
243
244impl TelemetryConfig {
245    pub fn new() -> Self {
246        Self {
247            enable_otlp_tracing: false,
248            tokio_console: false,
249            json_log_output: false,
250            log_file: None,
251            log_string: None,
252            span_level: None,
253            panic_hook: true,
254            crash_on_panic: false,
255            prom_registry: None,
256            sample_rate: 1.0,
257            trace_target: None,
258        }
259    }
260
261    pub fn with_json(mut self) -> Self {
262        self.json_log_output = true;
263        self
264    }
265
266    pub fn with_log_level(mut self, log_string: &str) -> Self {
267        self.log_string = Some(log_string.to_owned());
268        self
269    }
270
271    pub fn with_span_level(mut self, span_level: Level) -> Self {
272        self.span_level = Some(span_level);
273        self
274    }
275
276    pub fn with_log_file(mut self, filename: &str) -> Self {
277        self.log_file = Some(filename.to_owned());
278        self
279    }
280
281    pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
282        self.prom_registry = Some(registry.clone());
283        self
284    }
285
286    pub fn with_sample_rate(mut self, rate: f64) -> Self {
287        self.sample_rate = rate;
288        self
289    }
290
291    pub fn with_trace_target(mut self, target: &str) -> Self {
292        match self.trace_target {
293            Some(ref mut v) => v.push(target.to_owned()),
294            None => self.trace_target = Some(vec![target.to_owned()]),
295        };
296
297        self
298    }
299
300    pub fn with_env(mut self) -> Self {
301        if env::var("CRASH_ON_PANIC").is_ok() {
302            self.crash_on_panic = true
303        }
304
305        if env::var("TRACE_FILTER").is_ok() {
306            self.enable_otlp_tracing = true
307        }
308
309        if env::var("RUST_LOG_JSON").is_ok() {
310            self.json_log_output = true;
311        }
312
313        if env::var("TOKIO_CONSOLE").is_ok() {
314            self.tokio_console = true;
315        }
316
317        if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
318            self.span_level =
319                Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
320        }
321
322        if let Ok(filepath) = env::var("RUST_LOG_FILE") {
323            self.log_file = Some(filepath);
324        }
325
326        if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
327            self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
328        }
329
330        self
331    }
332
333    pub fn init(self) -> (TelemetryGuards, TracingHandle) {
334        let config = self;
335        let config_clone = config.clone();
336
337        // Setup an EnvFilter for filtering logging output layers.
338        // NOTE: we don't want to use this to filter all layers.  That causes problems for layers with
339        // different filtering needs, including tokio-console/console-subscriber, and it also doesn't
340        // fit with the span creation needs for distributed tracing and other span-based tools.
341        let mut directives = config.log_string.unwrap_or_else(|| "info".into());
342        if let Some(targets) = config.trace_target {
343            for target in targets {
344                directives.push_str(&format!(",{}=trace", target));
345            }
346        }
347        let env_filter =
348            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
349        let (log_filter, reload_handle) = reload::Layer::new(env_filter);
350        let log_filter_handle = FilterHandle(reload_handle);
351
352        // Separate span level filter.
353        // This is a dumb filter for now - allows all spans that are below a given level.
354        // TODO: implement a sampling filter
355        let span_level = config.span_level.unwrap_or(Level::INFO);
356        let span_filter = filter::filter_fn(move |metadata| {
357            metadata.is_span() && *metadata.level() <= span_level
358        });
359
360        let mut layers = Vec::new();
361
362        // tokio-console layer
363        // Please see https://docs.rs/console-subscriber/latest/console_subscriber/struct.Builder.html#configuration
364        // for environment vars/config options
365        if config.tokio_console {
366            layers.push(console_subscriber::spawn().boxed());
367        }
368
369        if let Some(registry) = config.prom_registry {
370            let span_lat_layer = PrometheusSpanLatencyLayer::try_new(&registry, 15)
371                .expect("Could not initialize span latency layer");
372            layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
373        }
374
375        let mut trace_filter_handle = None;
376        let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
377        let mut provider = None;
378        let sampler = SamplingFilter::new(config.sample_rate);
379        let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
380
381        if config.enable_otlp_tracing {
382            let trace_file = env::var("TRACE_FILE").ok();
383            let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
384                "service.name",
385                service_name.clone(),
386            )];
387            if let Ok(namespace) = env::var("NAMESPACE") {
388                otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
389            }
390            if let Ok(hostname) = env::var("HOSTNAME") {
391                otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
392            }
393            if let Ok(network) = env::var("NETWORK") {
394                otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
395            }
396
397            let resource = Resource::new(otel_kv_vec);
398            let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
399
400            // We can either do file output or OTLP, but not both. tracing-opentelemetry
401            // only supports a single tracer at a time.
402            let telemetry = if let Some(trace_file) = trace_file {
403                let exporter =
404                    FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
405                file_output = exporter.cached_open_file.clone();
406                let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
407
408                let p = TracerProvider::builder()
409                    .with_resource(resource)
410                    .with_sampler(sampler)
411                    .with_span_processor(processor)
412                    .build();
413
414                let tracer = p.tracer(service_name);
415                provider = Some(p);
416
417                tracing_opentelemetry::layer().with_tracer(tracer)
418            } else {
419                let endpoint = env::var("OTLP_ENDPOINT")
420                    .unwrap_or_else(|_| "http://localhost:4317".to_string());
421                let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
422                    .with_tonic()
423                    .with_endpoint(endpoint)
424                    .build()
425                    .unwrap();
426                let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
427                    .with_resource(resource)
428                    .with_sampler(sampler)
429                    .with_batch_exporter(otlp_exporter, runtime::Tokio)
430                    .build();
431                let tracer = tracer_provider.tracer(service_name);
432                tracing_opentelemetry::layer().with_tracer(tracer)
433            };
434
435            // Enable Trace Contexts for tying spans together
436            opentelemetry::global::set_text_map_propagator(
437                opentelemetry_sdk::propagation::TraceContextPropagator::new(),
438            );
439
440            let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
441            let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
442            trace_filter_handle = Some(FilterHandle(reload_handle));
443
444            layers.push(telemetry.with_filter(trace_env_filter).boxed());
445        }
446
447        let (nb_output, worker_guard) = get_output(config.log_file.clone());
448        if config.json_log_output {
449            // Output to file or to stderr in a newline-delimited JSON format
450            let json_layer = fmt::layer()
451                .with_file(true)
452                .with_line_number(true)
453                .json()
454                .with_writer(nb_output)
455                .with_filter(log_filter)
456                .boxed();
457            layers.push(json_layer);
458        } else {
459            // Output to file or to stderr with ANSI colors
460            let fmt_layer = fmt::layer()
461                .with_ansi(config.log_file.is_none() && stderr().is_tty())
462                .with_writer(nb_output)
463                .with_filter(log_filter)
464                .boxed();
465            layers.push(fmt_layer);
466        }
467
468        let subscriber = tracing_subscriber::registry().with(layers);
469        ::tracing::subscriber::set_global_default(subscriber)
470            .expect("unable to initialize tracing subscriber");
471
472        if config.panic_hook {
473            set_panic_hook(config.crash_on_panic);
474        }
475
476        // The guard must be returned and kept in the main fn of the app, as when it's dropped then the output
477        // gets flushed and closed. If this is dropped too early then no output will appear!
478        let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
479
480        (
481            guards,
482            TracingHandle {
483                log: log_filter_handle,
484                trace: trace_filter_handle,
485                file_output,
486                sampler,
487            },
488        )
489    }
490}
491
492// Like Sampler::TraceIdRatioBased, but can be updated at runtime
493#[derive(Debug, Clone)]
494struct SamplingFilter {
495    // Sampling filter needs to be fast, so we avoid a mutex.
496    sample_rate: Arc<AtomicF64>,
497}
498
499impl SamplingFilter {
500    fn new(sample_rate: f64) -> Self {
501        SamplingFilter {
502            sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
503        }
504    }
505
506    fn clamp(sample_rate: f64) -> f64 {
507        // clamp sample rate to between 0.0001 and 1.0
508        sample_rate.clamp(0.0001, 1.0)
509    }
510
511    fn update_sampling_rate(&self, sample_rate: f64) {
512        // clamp sample rate to between 0.0001 and 1.0
513        let sample_rate = Self::clamp(sample_rate);
514        self.sample_rate.store(sample_rate, Ordering::Relaxed);
515    }
516}
517
518impl ShouldSample for SamplingFilter {
519    fn should_sample(
520        &self,
521        parent_context: Option<&Context>,
522        trace_id: TraceId,
523        name: &str,
524        span_kind: &SpanKind,
525        attributes: &[KeyValue],
526        links: &[Link],
527    ) -> SamplingResult {
528        let sample_rate = self.sample_rate.load(Ordering::Relaxed);
529        let sampler = Sampler::TraceIdRatioBased(sample_rate);
530
531        sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
532    }
533}
534
535/// Globally set a tracing subscriber suitable for testing environments
536pub fn init_for_testing() {
537    static LOGGER: Lazy<()> = Lazy::new(|| {
538        let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
539            .with_env_filter(
540                EnvFilter::builder()
541                    .with_default_directive(LevelFilter::INFO.into())
542                    .from_env_lossy(),
543            )
544            .with_file(true)
545            .with_line_number(true)
546            .with_test_writer()
547            .finish();
548        ::tracing::subscriber::set_global_default(subscriber)
549            .expect("unable to initialize logging for tests");
550    });
551
552    Lazy::force(&LOGGER);
553}
554
555#[cfg(test)]
556mod tests {
557    use super::*;
558    use prometheus::proto::MetricType;
559    use std::time::Duration;
560    use tracing::{debug, debug_span, info, trace_span, warn};
561
562    #[test]
563    #[should_panic]
564    fn test_telemetry_init() {
565        let registry = prometheus::Registry::new();
566        // Default logging level is INFO, but here we set the span level to DEBUG.  TRACE spans should be ignored.
567        let config = TelemetryConfig::new()
568            .with_span_level(Level::DEBUG)
569            .with_prom_registry(&registry);
570        let _guard = config.init();
571
572        info!(a = 1, "This will be INFO.");
573        // Spans are debug level or below, so they won't be printed out either.  However latencies
574        // should be recorded for at least one span
575        debug_span!("yo span yo").in_scope(|| {
576            // This debug log will not print out, log level set to INFO by default
577            debug!(a = 2, "This will be DEBUG.");
578            std::thread::sleep(Duration::from_millis(100));
579            warn!(a = 3, "This will be WARNING.");
580        });
581
582        // This span won't be enabled
583        trace_span!("this span should not be created").in_scope(|| {
584            info!("This log appears, but surrounding span is not created");
585            std::thread::sleep(Duration::from_millis(100));
586        });
587
588        let metrics = registry.gather();
589        // There should be 1 metricFamily and 1 metric
590        assert_eq!(metrics.len(), 1);
591        assert_eq!(metrics[0].get_name(), "tracing_span_latencies");
592        assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
593        let inner = metrics[0].get_metric();
594        assert_eq!(inner.len(), 1);
595        let labels = inner[0].get_label();
596        assert_eq!(labels.len(), 1);
597        assert_eq!(labels[0].get_name(), "span_name");
598        assert_eq!(labels[0].get_value(), "yo span yo");
599
600        panic!("This should cause error logs to be printed out!");
601    }
602
603    // Both the following tests should be able to "race" to initialize logging without causing a
604    // panic
605    #[test]
606    fn testing_logger_1() {
607        init_for_testing();
608    }
609
610    #[test]
611    fn testing_logger_2() {
612        init_for_testing();
613    }
614}