1use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8 Context, KeyValue,
9 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14 self, Resource, runtime,
15 trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21 env,
22 io::{Write, stderr},
23 str::FromStr,
24 sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::dispatcher::DefaultGuard;
27use tracing::metadata::LevelFilter;
28use tracing::{Level, error, info};
29use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
30use tracing_error::ErrorLayer;
31use tracing_subscriber::{
32 EnvFilter, Layer, Registry, filter,
33 fmt::{self, format::Pretty},
34 layer::SubscriberExt,
35 reload,
36};
37
38use crate::file_exporter::{CachedOpenFile, FileExporter};
39use crate::test_layer::TestLayer;
40
41mod file_exporter;
42pub mod span_latency_prom;
43mod test_layer;
44
45pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
47
48#[derive(Default, Clone, Debug)]
54pub struct TelemetryConfig {
55 pub enable_otlp_tracing: bool,
56 pub tokio_console: bool,
58 pub json_log_output: bool,
60 pub log_file: Option<String>,
62 pub log_string: Option<String>,
64 pub span_level: Option<Level>,
67 pub panic_hook: bool,
69 pub crash_on_panic: bool,
71 pub prom_registry: Option<prometheus::Registry>,
73 pub sample_rate: f64,
74 pub trace_target: Option<Vec<String>>,
76 pub user_info_target: Vec<String>,
78 pub set_global_default: bool,
80 pub enable_error_layer: bool,
82 pub enable_test_layer: bool,
84}
85
86#[must_use]
87#[allow(dead_code)]
88pub struct TelemetryGuards {
89 worker_guard: WorkerGuard,
90 provider: Option<TracerProvider>,
91 subscriber: Option<DefaultGuard>,
92}
93
94impl TelemetryGuards {
95 fn new(
96 config: TelemetryConfig,
97 worker_guard: WorkerGuard,
98 provider: Option<TracerProvider>,
99 subscriber: Option<DefaultGuard>,
100 ) -> Self {
101 if subscriber.is_none() {
103 set_global_telemetry_config(config);
104 }
105 Self {
106 worker_guard,
107 provider,
108 subscriber,
109 }
110 }
111}
112
113impl Drop for TelemetryGuards {
114 fn drop(&mut self) {
115 clear_global_telemetry_config();
116 }
117}
118
119#[derive(Clone, Debug)]
120pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
121
122impl FilterHandle {
123 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
124 let filter = EnvFilter::try_new(directives)?;
125 self.0.reload(filter)?;
126 Ok(())
127 }
128
129 pub fn get(&self) -> Result<String, BoxError> {
130 self.0
131 .with_current(|filter| filter.to_string())
132 .map_err(Into::into)
133 }
134}
135
136pub struct TracingHandle {
137 log: FilterHandle,
138 trace: Option<FilterHandle>,
139 file_output: CachedOpenFile,
140 test_layer: Option<TestLayer>,
141 sampler: SamplingFilter,
142}
143
144impl TracingHandle {
145 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
146 self.log.update(directives)
147 }
148
149 pub fn get_log(&self) -> Result<String, BoxError> {
150 self.log.get()
151 }
152
153 pub fn update_sampling_rate(&self, sample_rate: f64) {
154 self.sampler.update_sampling_rate(sample_rate);
155 }
156
157 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
158 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
159 self.file_output.update_path(trace_path)?;
160 Ok(())
161 }
162
163 pub fn update_trace_filter<S: AsRef<str>>(
164 &self,
165 directives: S,
166 duration: Duration,
167 ) -> Result<(), BoxError> {
168 if let Some(trace) = &self.trace {
169 let res = trace.update(directives);
170 let trace = trace.clone();
172 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
173 tokio::spawn(async move {
174 tokio::time::sleep(duration).await;
175 if let Err(e) = trace.update(trace_filter_env) {
176 error!("failed to reset trace filter: {}", e);
177 }
178 });
179 res
180 } else {
181 info!("tracing not enabled, ignoring update");
182 Ok(())
183 }
184 }
185
186 pub fn clear_file_output(&self) {
187 self.file_output.clear_path();
188 }
189
190 pub fn reset_trace(&self) {
191 if let Some(trace) = &self.trace {
192 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
193 if let Err(e) = trace.update(trace_filter_env) {
194 error!("failed to reset trace filter: {}", e);
195 }
196 }
197 }
198
199 pub fn get_test_layer_events(&self) -> Vec<String> {
200 self.test_layer
201 .as_ref()
202 .expect("test layer not enabled")
203 .get_events()
204 }
205}
206
207fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
208 if let Some(logfile_prefix) = log_file {
209 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
210 tracing_appender::non_blocking(file_appender)
211 } else {
212 tracing_appender::non_blocking(stderr())
213 }
214}
215
216fn set_panic_hook(crash_on_panic: bool) {
218 let default_panic_handler = std::panic::take_hook();
219
220 std::panic::set_hook(Box::new(move |panic| {
227 if let Some(location) = panic.location() {
229 tracing::error!(
234 message = %panic,
235 panic.file = location.file(),
236 panic.line = location.line(),
237 panic.column = location.column(),
238 );
239 } else {
240 tracing::error!(message = %panic);
241 }
242
243 default_panic_handler(panic);
244
245 let _ = std::io::stderr().flush();
247 let _ = std::io::stdout().flush();
248
249 if crash_on_panic {
250 std::process::exit(12);
252 }
253 }));
254}
255
256static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
257 Lazy::new(|| Arc::new(Mutex::new(None)));
258
259fn set_global_telemetry_config(config: TelemetryConfig) {
260 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
261 assert!(global_config.is_none());
262 *global_config = Some(config);
263}
264
265fn clear_global_telemetry_config() {
266 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
267 *global_config = None;
268}
269
270pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
271 let global_config = GLOBAL_CONFIG.lock().unwrap();
272 global_config.clone()
273}
274
275impl TelemetryConfig {
276 pub fn new() -> Self {
277 Self {
278 enable_otlp_tracing: false,
279 tokio_console: false,
280 json_log_output: false,
281 log_file: None,
282 log_string: None,
283 span_level: None,
284 panic_hook: true,
285 crash_on_panic: false,
286 prom_registry: None,
287 sample_rate: 1.0,
288 trace_target: None,
289 user_info_target: Vec::new(),
290 set_global_default: true,
291 enable_error_layer: false,
292 enable_test_layer: false,
293 }
294 }
295
296 pub fn with_json(mut self) -> Self {
297 self.json_log_output = true;
298 self
299 }
300
301 pub fn with_log_level(mut self, log_string: &str) -> Self {
302 self.log_string = Some(log_string.to_owned());
303 self
304 }
305
306 pub fn with_span_level(mut self, span_level: Level) -> Self {
307 self.span_level = Some(span_level);
308 self
309 }
310
311 pub fn with_log_file(mut self, filename: &str) -> Self {
312 self.log_file = Some(filename.to_owned());
313 self
314 }
315
316 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
317 self.prom_registry = Some(registry.clone());
318 self
319 }
320
321 pub fn with_sample_rate(mut self, rate: f64) -> Self {
322 self.sample_rate = rate;
323 self
324 }
325
326 pub fn with_trace_target(mut self, target: &str) -> Self {
327 match self.trace_target {
328 Some(ref mut v) => v.push(target.to_owned()),
329 None => self.trace_target = Some(vec![target.to_owned()]),
330 };
331
332 self
333 }
334
335 pub fn with_user_info_target(mut self, target: &str) -> Self {
338 self.user_info_target.push(target.to_owned());
339 self
340 }
341
342 pub fn with_set_global_default(mut self, set_global_default: bool) -> Self {
343 self.set_global_default = set_global_default;
344 self
345 }
346
347 pub fn with_enable_error_layer(mut self, enable_error_layer: bool) -> Self {
348 self.enable_error_layer = enable_error_layer;
349 self
350 }
351
352 pub fn with_enable_test_layer(mut self, enable_test_layer: bool) -> Self {
353 self.enable_test_layer = enable_test_layer;
354 self
355 }
356
357 pub fn with_env(mut self) -> Self {
358 if env::var("CRASH_ON_PANIC").is_ok() {
359 self.crash_on_panic = true
360 }
361
362 if env::var("TRACE_FILTER").is_ok() {
363 self.enable_otlp_tracing = true
364 }
365
366 if env::var("RUST_LOG_JSON").is_ok() {
367 self.json_log_output = true;
368 }
369
370 if env::var("TOKIO_CONSOLE").is_ok() {
371 self.tokio_console = true;
372 }
373
374 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
375 self.span_level =
376 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
377 }
378
379 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
380 self.log_file = Some(filepath);
381 }
382
383 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
384 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
385 }
386
387 if let Ok(enable_error_layer) = env::var("ENABLE_ERROR_LAYER") {
388 self.enable_error_layer = enable_error_layer
389 .parse()
390 .expect("Cannot parse ENABLE_ERROR_LAYER");
391 }
392
393 self
394 }
395
396 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
397 let config = self;
398 let config_clone = config.clone();
399
400 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
405 if let Some(targets) = config.trace_target {
406 for target in targets {
407 directives.push_str(&format!(",{}=trace", target));
408 }
409 }
410 let env_filter =
411 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
412 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
413 let log_filter_handle = FilterHandle(reload_handle);
414
415 let span_level = config.span_level.unwrap_or(Level::INFO);
419 let span_filter = filter::filter_fn(move |metadata| {
420 metadata.is_span() && *metadata.level() <= span_level
421 });
422
423 let mut layers = Vec::new();
424
425 if config.tokio_console {
429 layers.push(console_subscriber::spawn().boxed());
430 }
431
432 if let Some(registry) = config.prom_registry {
433 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
434 .expect("Could not initialize span latency layer");
435 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
436 }
437
438 let mut trace_filter_handle = None;
439 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
440 let mut provider = None;
441 let sampler = SamplingFilter::new(config.sample_rate);
442 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
443
444 if config.enable_otlp_tracing {
445 let trace_file = env::var("TRACE_FILE").ok();
446 let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
447 "service.name",
448 service_name.clone(),
449 )];
450 if let Ok(namespace) = env::var("NAMESPACE") {
451 otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
452 }
453 if let Ok(hostname) = env::var("HOSTNAME") {
454 otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
455 }
456 if let Ok(network) = env::var("NETWORK") {
457 otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
458 }
459
460 let resource = Resource::new(otel_kv_vec);
461 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
462
463 let telemetry = if let Some(trace_file) = trace_file {
466 let exporter =
467 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
468 file_output = exporter.cached_open_file.clone();
469 let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
470
471 let p = TracerProvider::builder()
472 .with_resource(resource)
473 .with_sampler(sampler)
474 .with_span_processor(processor)
475 .build();
476
477 let tracer = p.tracer(service_name);
478 provider = Some(p);
479
480 tracing_opentelemetry::layer().with_tracer(tracer)
481 } else {
482 let endpoint = env::var("OTLP_ENDPOINT")
483 .unwrap_or_else(|_| "http://localhost:4317".to_string());
484 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
485 .with_tonic()
486 .with_endpoint(endpoint)
487 .build()
488 .unwrap();
489 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
490 .with_resource(resource)
491 .with_sampler(sampler)
492 .with_batch_exporter(otlp_exporter, runtime::Tokio)
493 .build();
494 let tracer = tracer_provider.tracer(service_name);
495 tracing_opentelemetry::layer().with_tracer(tracer)
496 };
497
498 opentelemetry::global::set_text_map_propagator(
500 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
501 );
502
503 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
504 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
505 trace_filter_handle = Some(FilterHandle(reload_handle));
506
507 layers.push(telemetry.with_filter(trace_env_filter).boxed());
508 }
509
510 let (nb_output, worker_guard) = get_output(config.log_file.clone());
511 if config.json_log_output {
512 let json_layer = fmt::layer()
514 .with_file(true)
515 .with_line_number(true)
516 .json()
517 .with_writer(nb_output)
518 .with_filter(log_filter)
519 .boxed();
520 layers.push(json_layer);
521 } else {
522 let fmt_layer = fmt::layer()
524 .with_ansi(config.log_file.is_none() && stderr().is_tty())
525 .with_writer(nb_output.clone())
526 .with_filter(log_filter)
527 .boxed();
528
529 layers.push(fmt_layer);
530
531 if config.log_file.is_none() && stderr().is_tty() && !config.user_info_target.is_empty()
532 {
533 let mut directives = String::from("none");
535 for target in config.user_info_target {
536 directives.push_str(&format!(",{target}=info"));
537 }
538
539 let fmt_layer = fmt::layer()
540 .with_ansi(config.log_file.is_none() && stderr().is_tty())
541 .event_format(
542 fmt::format()
543 .without_time()
544 .with_target(false)
545 .with_level(false),
546 )
547 .with_writer(nb_output)
548 .with_filter(EnvFilter::new(directives))
549 .boxed();
550
551 layers.push(fmt_layer);
552 }
553 }
554
555 if config.enable_error_layer {
556 layers.push(ErrorLayer::new(Pretty::default()).boxed())
557 }
558
559 let test_layer = if config.enable_test_layer {
560 let test_layer = TestLayer::new();
561 layers.push(test_layer.clone().boxed());
562 Some(test_layer)
563 } else {
564 None
565 };
566
567 let subscriber = tracing_subscriber::registry().with(layers);
568 let subscriber_guard = if config.set_global_default {
569 tracing::subscriber::set_global_default(subscriber)
570 .expect("unable to initialize tracing subscriber");
571 None
572 } else {
573 Some(tracing::subscriber::set_default(subscriber))
574 };
575
576 if config.panic_hook {
577 set_panic_hook(config.crash_on_panic);
578 }
579
580 let guards = TelemetryGuards::new(config_clone, worker_guard, provider, subscriber_guard);
583
584 (
585 guards,
586 TracingHandle {
587 log: log_filter_handle,
588 trace: trace_filter_handle,
589 file_output,
590 test_layer,
591 sampler,
592 },
593 )
594 }
595}
596
597#[derive(Debug, Clone)]
599struct SamplingFilter {
600 sample_rate: Arc<AtomicF64>,
602}
603
604impl SamplingFilter {
605 fn new(sample_rate: f64) -> Self {
606 SamplingFilter {
607 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
608 }
609 }
610
611 fn clamp(sample_rate: f64) -> f64 {
612 sample_rate.clamp(0.0001, 1.0)
614 }
615
616 fn update_sampling_rate(&self, sample_rate: f64) {
617 let sample_rate = Self::clamp(sample_rate);
619 self.sample_rate.store(sample_rate, Ordering::Relaxed);
620 }
621}
622
623impl ShouldSample for SamplingFilter {
624 fn should_sample(
625 &self,
626 parent_context: Option<&Context>,
627 trace_id: TraceId,
628 name: &str,
629 span_kind: &SpanKind,
630 attributes: &[KeyValue],
631 links: &[Link],
632 ) -> SamplingResult {
633 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
634 let sampler = Sampler::TraceIdRatioBased(sample_rate);
635
636 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
637 }
638}
639
640pub fn init_for_testing() {
642 static LOGGER: Lazy<()> = Lazy::new(|| {
643 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
644 .with_env_filter(
645 EnvFilter::builder()
646 .with_default_directive(LevelFilter::INFO.into())
647 .from_env_lossy(),
648 )
649 .with_file(true)
650 .with_line_number(true)
651 .with_test_writer()
652 .finish();
653 ::tracing::subscriber::set_global_default(subscriber)
654 .expect("unable to initialize logging for tests");
655 });
656
657 Lazy::force(&LOGGER);
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663 use prometheus::proto::MetricType;
664 use std::time::Duration;
665 use tracing::{debug, debug_span, info, trace_span, warn};
666
667 #[test]
668 #[should_panic]
669 fn test_telemetry_init() {
670 let registry = prometheus::Registry::new();
671 let config = TelemetryConfig::new()
673 .with_span_level(Level::DEBUG)
674 .with_prom_registry(®istry);
675 let _guard = config.init();
676
677 info!(a = 1, "This will be INFO.");
678 debug_span!("yo span yo").in_scope(|| {
681 debug!(a = 2, "This will be DEBUG.");
683 std::thread::sleep(Duration::from_millis(100));
684 warn!(a = 3, "This will be WARNING.");
685 });
686
687 trace_span!("this span should not be created").in_scope(|| {
689 info!("This log appears, but surrounding span is not created");
690 std::thread::sleep(Duration::from_millis(100));
691 });
692
693 let metrics = registry.gather();
694 assert_eq!(metrics.len(), 1);
696 assert_eq!(metrics[0].name(), "tracing_span_latencies");
697 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
698 let inner = metrics[0].get_metric();
699 assert_eq!(inner.len(), 1);
700 let labels = inner[0].get_label();
701 assert_eq!(labels.len(), 1);
702 assert_eq!(labels[0].name(), "span_name");
703 assert_eq!(labels[0].value(), "yo span yo");
704
705 panic!("This should cause error logs to be printed out!");
706 }
707
708 #[test]
711 fn testing_logger_1() {
712 init_for_testing();
713 }
714
715 #[test]
716 fn testing_logger_2() {
717 init_for_testing();
718 }
719}