1use atomic_float::AtomicF64;
5use crossterm::tty::IsTty;
6use once_cell::sync::Lazy;
7use opentelemetry::{
8 Context, KeyValue,
9 trace::{Link, SamplingResult, SpanKind, TraceId, TracerProvider as _},
10};
11use opentelemetry_otlp::WithExportConfig;
12use opentelemetry_sdk::trace::Sampler;
13use opentelemetry_sdk::{
14 self, Resource, runtime,
15 trace::{BatchSpanProcessor, ShouldSample, TracerProvider},
16};
17use span_latency_prom::PrometheusSpanLatencyLayer;
18use std::path::PathBuf;
19use std::time::Duration;
20use std::{
21 env,
22 io::{Write, stderr},
23 str::FromStr,
24 sync::{Arc, Mutex, atomic::Ordering},
25};
26use tracing::metadata::LevelFilter;
27use tracing::{Level, error, info};
28use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
29use tracing_subscriber::{EnvFilter, Layer, Registry, filter, fmt, layer::SubscriberExt, reload};
30
31use crate::file_exporter::{CachedOpenFile, FileExporter};
32
33mod file_exporter;
34pub mod span_latency_prom;
35
36pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
38
39#[derive(Default, Clone, Debug)]
45pub struct TelemetryConfig {
46 pub enable_otlp_tracing: bool,
47 pub tokio_console: bool,
49 pub json_log_output: bool,
51 pub log_file: Option<String>,
53 pub log_string: Option<String>,
55 pub span_level: Option<Level>,
58 pub panic_hook: bool,
60 pub crash_on_panic: bool,
62 pub prom_registry: Option<prometheus::Registry>,
64 pub sample_rate: f64,
65 pub trace_target: Option<Vec<String>>,
67}
68
69#[must_use]
70#[allow(dead_code)]
71pub struct TelemetryGuards {
72 worker_guard: WorkerGuard,
73 provider: Option<TracerProvider>,
74}
75
76impl TelemetryGuards {
77 fn new(
78 config: TelemetryConfig,
79 worker_guard: WorkerGuard,
80 provider: Option<TracerProvider>,
81 ) -> Self {
82 set_global_telemetry_config(config);
83 Self {
84 worker_guard,
85 provider,
86 }
87 }
88}
89
90impl Drop for TelemetryGuards {
91 fn drop(&mut self) {
92 clear_global_telemetry_config();
93 }
94}
95
96#[derive(Clone, Debug)]
97pub struct FilterHandle(reload::Handle<EnvFilter, Registry>);
98
99impl FilterHandle {
100 pub fn update<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
101 let filter = EnvFilter::try_new(directives)?;
102 self.0.reload(filter)?;
103 Ok(())
104 }
105
106 pub fn get(&self) -> Result<String, BoxError> {
107 self.0
108 .with_current(|filter| filter.to_string())
109 .map_err(Into::into)
110 }
111}
112
113pub struct TracingHandle {
114 log: FilterHandle,
115 trace: Option<FilterHandle>,
116 file_output: CachedOpenFile,
117 sampler: SamplingFilter,
118}
119
120impl TracingHandle {
121 pub fn update_log<S: AsRef<str>>(&self, directives: S) -> Result<(), BoxError> {
122 self.log.update(directives)
123 }
124
125 pub fn get_log(&self) -> Result<String, BoxError> {
126 self.log.get()
127 }
128
129 pub fn update_sampling_rate(&self, sample_rate: f64) {
130 self.sampler.update_sampling_rate(sample_rate);
131 }
132
133 pub fn update_trace_file<S: AsRef<str>>(&self, trace_file: S) -> Result<(), BoxError> {
134 let trace_path = PathBuf::from_str(trace_file.as_ref())?;
135 self.file_output.update_path(trace_path)?;
136 Ok(())
137 }
138
139 pub fn update_trace_filter<S: AsRef<str>>(
140 &self,
141 directives: S,
142 duration: Duration,
143 ) -> Result<(), BoxError> {
144 if let Some(trace) = &self.trace {
145 let res = trace.update(directives);
146 let trace = trace.clone();
148 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
149 tokio::spawn(async move {
150 tokio::time::sleep(duration).await;
151 if let Err(e) = trace.update(trace_filter_env) {
152 error!("failed to reset trace filter: {}", e);
153 }
154 });
155 res
156 } else {
157 info!("tracing not enabled, ignoring update");
158 Ok(())
159 }
160 }
161
162 pub fn clear_file_output(&self) {
163 self.file_output.clear_path();
164 }
165
166 pub fn reset_trace(&self) {
167 if let Some(trace) = &self.trace {
168 let trace_filter_env = env::var("TRACE_FILTER").unwrap_or_else(|_| "off".to_string());
169 if let Err(e) = trace.update(trace_filter_env) {
170 error!("failed to reset trace filter: {}", e);
171 }
172 }
173 }
174}
175
176fn get_output(log_file: Option<String>) -> (NonBlocking, WorkerGuard) {
177 if let Some(logfile_prefix) = log_file {
178 let file_appender = tracing_appender::rolling::daily("", logfile_prefix);
179 tracing_appender::non_blocking(file_appender)
180 } else {
181 tracing_appender::non_blocking(stderr())
182 }
183}
184
185fn set_panic_hook(crash_on_panic: bool) {
187 let default_panic_handler = std::panic::take_hook();
188
189 std::panic::set_hook(Box::new(move |panic| {
196 if let Some(location) = panic.location() {
198 tracing::error!(
203 message = %panic,
204 panic.file = location.file(),
205 panic.line = location.line(),
206 panic.column = location.column(),
207 );
208 } else {
209 tracing::error!(message = %panic);
210 }
211
212 default_panic_handler(panic);
213
214 let _ = std::io::stderr().flush();
216 let _ = std::io::stdout().flush();
217
218 if crash_on_panic {
219 std::process::exit(12);
221 }
222 }));
223}
224
225static GLOBAL_CONFIG: Lazy<Arc<Mutex<Option<TelemetryConfig>>>> =
226 Lazy::new(|| Arc::new(Mutex::new(None)));
227
228fn set_global_telemetry_config(config: TelemetryConfig) {
229 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
230 assert!(global_config.is_none());
231 *global_config = Some(config);
232}
233
234fn clear_global_telemetry_config() {
235 let mut global_config = GLOBAL_CONFIG.lock().unwrap();
236 *global_config = None;
237}
238
239pub fn get_global_telemetry_config() -> Option<TelemetryConfig> {
240 let global_config = GLOBAL_CONFIG.lock().unwrap();
241 global_config.clone()
242}
243
244impl TelemetryConfig {
245 pub fn new() -> Self {
246 Self {
247 enable_otlp_tracing: false,
248 tokio_console: false,
249 json_log_output: false,
250 log_file: None,
251 log_string: None,
252 span_level: None,
253 panic_hook: true,
254 crash_on_panic: false,
255 prom_registry: None,
256 sample_rate: 1.0,
257 trace_target: None,
258 }
259 }
260
261 pub fn with_json(mut self) -> Self {
262 self.json_log_output = true;
263 self
264 }
265
266 pub fn with_log_level(mut self, log_string: &str) -> Self {
267 self.log_string = Some(log_string.to_owned());
268 self
269 }
270
271 pub fn with_span_level(mut self, span_level: Level) -> Self {
272 self.span_level = Some(span_level);
273 self
274 }
275
276 pub fn with_log_file(mut self, filename: &str) -> Self {
277 self.log_file = Some(filename.to_owned());
278 self
279 }
280
281 pub fn with_prom_registry(mut self, registry: &prometheus::Registry) -> Self {
282 self.prom_registry = Some(registry.clone());
283 self
284 }
285
286 pub fn with_sample_rate(mut self, rate: f64) -> Self {
287 self.sample_rate = rate;
288 self
289 }
290
291 pub fn with_trace_target(mut self, target: &str) -> Self {
292 match self.trace_target {
293 Some(ref mut v) => v.push(target.to_owned()),
294 None => self.trace_target = Some(vec![target.to_owned()]),
295 };
296
297 self
298 }
299
300 pub fn with_env(mut self) -> Self {
301 if env::var("CRASH_ON_PANIC").is_ok() {
302 self.crash_on_panic = true
303 }
304
305 if env::var("TRACE_FILTER").is_ok() {
306 self.enable_otlp_tracing = true
307 }
308
309 if env::var("RUST_LOG_JSON").is_ok() {
310 self.json_log_output = true;
311 }
312
313 if env::var("TOKIO_CONSOLE").is_ok() {
314 self.tokio_console = true;
315 }
316
317 if let Ok(span_level) = env::var("TOKIO_SPAN_LEVEL") {
318 self.span_level =
319 Some(Level::from_str(&span_level).expect("Cannot parse TOKIO_SPAN_LEVEL"));
320 }
321
322 if let Ok(filepath) = env::var("RUST_LOG_FILE") {
323 self.log_file = Some(filepath);
324 }
325
326 if let Ok(sample_rate) = env::var("SAMPLE_RATE") {
327 self.sample_rate = sample_rate.parse().expect("Cannot parse SAMPLE_RATE");
328 }
329
330 self
331 }
332
333 pub fn init(self) -> (TelemetryGuards, TracingHandle) {
334 let config = self;
335 let config_clone = config.clone();
336
337 let mut directives = config.log_string.unwrap_or_else(|| "info".into());
342 if let Some(targets) = config.trace_target {
343 for target in targets {
344 directives.push_str(&format!(",{}=trace", target));
345 }
346 }
347 let env_filter =
348 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(directives));
349 let (log_filter, reload_handle) = reload::Layer::new(env_filter);
350 let log_filter_handle = FilterHandle(reload_handle);
351
352 let span_level = config.span_level.unwrap_or(Level::INFO);
356 let span_filter = filter::filter_fn(move |metadata| {
357 metadata.is_span() && *metadata.level() <= span_level
358 });
359
360 let mut layers = Vec::new();
361
362 if config.tokio_console {
366 layers.push(console_subscriber::spawn().boxed());
367 }
368
369 if let Some(registry) = config.prom_registry {
370 let span_lat_layer = PrometheusSpanLatencyLayer::try_new(®istry, 15)
371 .expect("Could not initialize span latency layer");
372 layers.push(span_lat_layer.with_filter(span_filter.clone()).boxed());
373 }
374
375 let mut trace_filter_handle = None;
376 let mut file_output = CachedOpenFile::new::<&str>(None).unwrap();
377 let mut provider = None;
378 let sampler = SamplingFilter::new(config.sample_rate);
379 let service_name = env::var("OTEL_SERVICE_NAME").unwrap_or("sui-node".to_owned());
380
381 if config.enable_otlp_tracing {
382 let trace_file = env::var("TRACE_FILE").ok();
383 let mut otel_kv_vec = vec![opentelemetry::KeyValue::new(
384 "service.name",
385 service_name.clone(),
386 )];
387 if let Ok(namespace) = env::var("NAMESPACE") {
388 otel_kv_vec.push(opentelemetry::KeyValue::new("service.namespace", namespace));
389 }
390 if let Ok(hostname) = env::var("HOSTNAME") {
391 otel_kv_vec.push(opentelemetry::KeyValue::new("host", hostname));
392 }
393 if let Ok(network) = env::var("NETWORK") {
394 otel_kv_vec.push(opentelemetry::KeyValue::new("network", network));
395 }
396
397 let resource = Resource::new(otel_kv_vec);
398 let sampler = Sampler::ParentBased(Box::new(sampler.clone()));
399
400 let telemetry = if let Some(trace_file) = trace_file {
403 let exporter =
404 FileExporter::new(Some(trace_file.into())).expect("Failed to create exporter");
405 file_output = exporter.cached_open_file.clone();
406 let processor = BatchSpanProcessor::builder(exporter, runtime::Tokio).build();
407
408 let p = TracerProvider::builder()
409 .with_resource(resource)
410 .with_sampler(sampler)
411 .with_span_processor(processor)
412 .build();
413
414 let tracer = p.tracer(service_name);
415 provider = Some(p);
416
417 tracing_opentelemetry::layer().with_tracer(tracer)
418 } else {
419 let endpoint = env::var("OTLP_ENDPOINT")
420 .unwrap_or_else(|_| "http://localhost:4317".to_string());
421 let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
422 .with_tonic()
423 .with_endpoint(endpoint)
424 .build()
425 .unwrap();
426 let tracer_provider = opentelemetry_sdk::trace::TracerProvider::builder()
427 .with_resource(resource)
428 .with_sampler(sampler)
429 .with_batch_exporter(otlp_exporter, runtime::Tokio)
430 .build();
431 let tracer = tracer_provider.tracer(service_name);
432 tracing_opentelemetry::layer().with_tracer(tracer)
433 };
434
435 opentelemetry::global::set_text_map_propagator(
437 opentelemetry_sdk::propagation::TraceContextPropagator::new(),
438 );
439
440 let trace_env_filter = EnvFilter::try_from_env("TRACE_FILTER").unwrap();
441 let (trace_env_filter, reload_handle) = reload::Layer::new(trace_env_filter);
442 trace_filter_handle = Some(FilterHandle(reload_handle));
443
444 layers.push(telemetry.with_filter(trace_env_filter).boxed());
445 }
446
447 let (nb_output, worker_guard) = get_output(config.log_file.clone());
448 if config.json_log_output {
449 let json_layer = fmt::layer()
451 .with_file(true)
452 .with_line_number(true)
453 .json()
454 .with_writer(nb_output)
455 .with_filter(log_filter)
456 .boxed();
457 layers.push(json_layer);
458 } else {
459 let fmt_layer = fmt::layer()
461 .with_ansi(config.log_file.is_none() && stderr().is_tty())
462 .with_writer(nb_output)
463 .with_filter(log_filter)
464 .boxed();
465 layers.push(fmt_layer);
466 }
467
468 let subscriber = tracing_subscriber::registry().with(layers);
469 ::tracing::subscriber::set_global_default(subscriber)
470 .expect("unable to initialize tracing subscriber");
471
472 if config.panic_hook {
473 set_panic_hook(config.crash_on_panic);
474 }
475
476 let guards = TelemetryGuards::new(config_clone, worker_guard, provider);
479
480 (
481 guards,
482 TracingHandle {
483 log: log_filter_handle,
484 trace: trace_filter_handle,
485 file_output,
486 sampler,
487 },
488 )
489 }
490}
491
492#[derive(Debug, Clone)]
494struct SamplingFilter {
495 sample_rate: Arc<AtomicF64>,
497}
498
499impl SamplingFilter {
500 fn new(sample_rate: f64) -> Self {
501 SamplingFilter {
502 sample_rate: Arc::new(AtomicF64::new(Self::clamp(sample_rate))),
503 }
504 }
505
506 fn clamp(sample_rate: f64) -> f64 {
507 sample_rate.clamp(0.0001, 1.0)
509 }
510
511 fn update_sampling_rate(&self, sample_rate: f64) {
512 let sample_rate = Self::clamp(sample_rate);
514 self.sample_rate.store(sample_rate, Ordering::Relaxed);
515 }
516}
517
518impl ShouldSample for SamplingFilter {
519 fn should_sample(
520 &self,
521 parent_context: Option<&Context>,
522 trace_id: TraceId,
523 name: &str,
524 span_kind: &SpanKind,
525 attributes: &[KeyValue],
526 links: &[Link],
527 ) -> SamplingResult {
528 let sample_rate = self.sample_rate.load(Ordering::Relaxed);
529 let sampler = Sampler::TraceIdRatioBased(sample_rate);
530
531 sampler.should_sample(parent_context, trace_id, name, span_kind, attributes, links)
532 }
533}
534
535pub fn init_for_testing() {
537 static LOGGER: Lazy<()> = Lazy::new(|| {
538 let subscriber = ::tracing_subscriber::FmtSubscriber::builder()
539 .with_env_filter(
540 EnvFilter::builder()
541 .with_default_directive(LevelFilter::INFO.into())
542 .from_env_lossy(),
543 )
544 .with_file(true)
545 .with_line_number(true)
546 .with_test_writer()
547 .finish();
548 ::tracing::subscriber::set_global_default(subscriber)
549 .expect("unable to initialize logging for tests");
550 });
551
552 Lazy::force(&LOGGER);
553}
554
555#[cfg(test)]
556mod tests {
557 use super::*;
558 use prometheus::proto::MetricType;
559 use std::time::Duration;
560 use tracing::{debug, debug_span, info, trace_span, warn};
561
562 #[test]
563 #[should_panic]
564 fn test_telemetry_init() {
565 let registry = prometheus::Registry::new();
566 let config = TelemetryConfig::new()
568 .with_span_level(Level::DEBUG)
569 .with_prom_registry(®istry);
570 let _guard = config.init();
571
572 info!(a = 1, "This will be INFO.");
573 debug_span!("yo span yo").in_scope(|| {
576 debug!(a = 2, "This will be DEBUG.");
578 std::thread::sleep(Duration::from_millis(100));
579 warn!(a = 3, "This will be WARNING.");
580 });
581
582 trace_span!("this span should not be created").in_scope(|| {
584 info!("This log appears, but surrounding span is not created");
585 std::thread::sleep(Duration::from_millis(100));
586 });
587
588 let metrics = registry.gather();
589 assert_eq!(metrics.len(), 1);
591 assert_eq!(metrics[0].get_name(), "tracing_span_latencies");
592 assert_eq!(metrics[0].get_field_type(), MetricType::HISTOGRAM);
593 let inner = metrics[0].get_metric();
594 assert_eq!(inner.len(), 1);
595 let labels = inner[0].get_label();
596 assert_eq!(labels.len(), 1);
597 assert_eq!(labels[0].get_name(), "span_name");
598 assert_eq!(labels[0].get_value(), "yo span yo");
599
600 panic!("This should cause error logs to be printed out!");
601 }
602
603 #[test]
606 fn testing_logger_1() {
607 init_for_testing();
608 }
609
610 #[test]
611 fn testing_logger_2() {
612 init_for_testing();
613 }
614}