1use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8 register_histogram_vec_with_registry, register_histogram_with_registry,
9 register_int_counter_vec_with_registry, register_int_counter_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21const LAG_SEC_BUCKETS: &[f64] = &[
24 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37 2000.0, 5000.0, 10000.0,
38];
39
40const BATCH_SIZE_BUCKETS: &[f64] = &[
42 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45#[derive(Clone)]
47pub struct IngestionMetrics {
48 pub total_ingested_checkpoints: IntCounter,
50 pub total_ingested_transactions: IntCounter,
51 pub total_ingested_events: IntCounter,
52 pub total_ingested_objects: IntCounter,
53 pub total_ingested_bytes: IntCounter,
54 pub total_ingested_transient_retries: IntCounterVec,
55 pub total_ingested_not_found_retries: IntCounter,
56 pub total_ingested_permanent_errors: IntCounterVec,
57 pub total_streamed_checkpoints: IntCounter,
58 pub total_stream_disconnections: IntCounter,
59 pub total_streaming_connection_failures: IntCounter,
60
61 pub latest_ingested_checkpoint: IntGauge,
63 pub latest_streamed_checkpoint: IntGauge,
64 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65 pub ingested_checkpoint_timestamp_lag: Histogram,
66
67 pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72 pub total_handler_checkpoints_received: IntCounterVec,
74 pub total_handler_checkpoints_processed: IntCounterVec,
75 pub total_handler_rows_created: IntCounterVec,
76
77 pub latest_processed_checkpoint: IntGaugeVec,
78 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79 pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81 pub handler_checkpoint_latency: HistogramVec,
82
83 pub total_collector_checkpoints_received: IntCounterVec,
85 pub total_collector_rows_received: IntCounterVec,
86 pub total_collector_batches_created: IntCounterVec,
87 pub total_committer_batches_attempted: IntCounterVec,
88 pub total_committer_batches_succeeded: IntCounterVec,
89 pub total_committer_batches_failed: IntCounterVec,
90 pub total_committer_rows_committed: IntCounterVec,
91 pub total_committer_rows_affected: IntCounterVec,
92 pub total_watermarks_out_of_order: IntCounterVec,
93 pub total_pruner_chunks_attempted: IntCounterVec,
94 pub total_pruner_chunks_deleted: IntCounterVec,
95 pub total_pruner_rows_deleted: IntCounterVec,
96
97 pub latest_collected_checkpoint: IntGaugeVec,
99 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100 pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102 pub latest_partially_committed_checkpoint: IntGaugeVec,
107 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117 pub collector_gather_latency: HistogramVec,
118 pub collector_batch_size: HistogramVec,
119 pub committer_commit_latency: HistogramVec,
120 pub committer_tx_rows: HistogramVec,
121 pub watermark_gather_latency: HistogramVec,
122 pub watermark_commit_latency: HistogramVec,
123 pub watermark_pruner_read_latency: HistogramVec,
124 pub watermark_pruner_write_latency: HistogramVec,
125 pub pruner_delete_latency: HistogramVec,
126
127 pub watermark_epoch: IntGaugeVec,
128 pub watermark_checkpoint: IntGaugeVec,
129 pub watermark_transaction: IntGaugeVec,
130 pub watermark_timestamp_ms: IntGaugeVec,
131 pub watermark_reader_lo: IntGaugeVec,
132 pub watermark_pruner_hi: IntGaugeVec,
133
134 pub watermark_epoch_in_db: IntGaugeVec,
135 pub watermark_checkpoint_in_db: IntGaugeVec,
136 pub watermark_transaction_in_db: IntGaugeVec,
137 pub watermark_timestamp_in_db_ms: IntGaugeVec,
138 pub watermark_reader_lo_in_db: IntGaugeVec,
139 pub watermark_pruner_hi_in_db: IntGaugeVec,
140}
141
142pub(crate) struct CheckpointLagMetricReporter {
144 checkpoint_time_lag_histogram: Histogram,
146 latest_checkpoint_time_lag_gauge: IntGauge,
149 latest_checkpoint_sequence_number_gauge: IntGauge,
151 latest_reported_checkpoint: AtomicU64,
153}
154
155impl IngestionMetrics {
156 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
157 let prefix = prefix.unwrap_or("indexer");
158 let name = |n| format!("{prefix}_{n}");
159 Arc::new(Self {
160 total_ingested_checkpoints: register_int_counter_with_registry!(
161 name("total_ingested_checkpoints"),
162 "Total number of checkpoints fetched from the remote store",
163 registry,
164 )
165 .unwrap(),
166 total_ingested_transactions: register_int_counter_with_registry!(
167 name("total_ingested_transactions"),
168 "Total number of transactions fetched from the remote store",
169 registry,
170 )
171 .unwrap(),
172 total_ingested_events: register_int_counter_with_registry!(
173 name("total_ingested_events"),
174 "Total number of events fetched from the remote store",
175 registry,
176 )
177 .unwrap(),
178 total_ingested_objects: register_int_counter_with_registry!(
179 name("total_ingested_objects"),
180 "Total number of objects in checkpoints fetched from the remote store",
181 registry,
182 )
183 .unwrap(),
184 total_ingested_bytes: register_int_counter_with_registry!(
185 name("total_ingested_bytes"),
186 "Total number of bytes fetched from the remote store, this metric will not \
187 be updated when data are fetched over gRPC.",
188 registry,
189 )
190 .unwrap(),
191 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
192 name("total_ingested_retries"),
193 "Total number of retries due to transient errors while fetching data from the \
194 remote store",
195 &["reason"],
196 registry,
197 )
198 .unwrap(),
199 total_ingested_not_found_retries: register_int_counter_with_registry!(
200 name("total_ingested_not_found_retries"),
201 "Total number of retries due to the not found errors while fetching data from the \
202 remote store",
203 registry,
204 )
205 .unwrap(),
206 total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
207 name("total_ingested_permanent_errors"),
208 "Total number of permanent errors encountered while fetching data from the \
209 remote store, which cause the ingestion service to shutdown",
210 &["reason"],
211 registry,
212 )
213 .unwrap(),
214 total_streamed_checkpoints: register_int_counter_with_registry!(
215 name("total_streamed_checkpoints"),
216 "Total number of checkpoints received from gRPC streaming",
217 registry,
218 )
219 .unwrap(),
220 total_stream_disconnections: register_int_counter_with_registry!(
221 name("total_stream_disconnections"),
222 "Total number of times the gRPC stream was disconnected",
223 registry,
224 )
225 .unwrap(),
226 total_streaming_connection_failures: register_int_counter_with_registry!(
227 name("total_streaming_connection_failures"),
228 "Total number of failures due to streaming service connection or peek failures",
229 registry,
230 )
231 .unwrap(),
232 latest_ingested_checkpoint: register_int_gauge_with_registry!(
233 name("latest_ingested_checkpoint"),
234 "Latest checkpoint sequence number fetched from the remote store",
235 registry,
236 )
237 .unwrap(),
238 latest_streamed_checkpoint: register_int_gauge_with_registry!(
239 name("latest_streamed_checkpoint"),
240 "Latest checkpoint sequence number received from gRPC streaming",
241 registry,
242 )
243 .unwrap(),
244 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
245 name("latest_ingested_checkpoint_timestamp_lag_ms"),
246 "Difference between the system timestamp when the latest checkpoint was fetched and the \
247 timestamp in the checkpoint, in milliseconds",
248 registry,
249 )
250 .unwrap(),
251 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
252 name("ingested_checkpoint_timestamp_lag"),
253 "Difference between the system timestamp when a checkpoint was fetched and the \
254 timestamp in each checkpoint, in seconds",
255 LAG_SEC_BUCKETS.to_vec(),
256 registry,
257 )
258 .unwrap(),
259 ingested_checkpoint_latency: register_histogram_with_registry!(
260 name("ingested_checkpoint_latency"),
261 "Time taken to fetch a checkpoint from the remote store, including retries",
262 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
263 registry,
264 )
265 .unwrap(),
266 })
267 }
268
269 pub(crate) fn inc_retry(
272 &self,
273 checkpoint: u64,
274 reason: &str,
275 error: Error,
276 ) -> backoff::Error<Error> {
277 warn!(checkpoint, reason, "Retrying due to error: {error}");
278
279 self.total_ingested_transient_retries
280 .with_label_values(&[reason])
281 .inc();
282
283 backoff::Error::transient(error)
284 }
285}
286
287impl IndexerMetrics {
288 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
289 let prefix = prefix.unwrap_or("indexer");
290 let name = |n| format!("{prefix}_{n}");
291 Arc::new(Self {
292 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
293 name("total_handler_checkpoints_received"),
294 "Total number of checkpoints received by this handler",
295 &["pipeline"],
296 registry,
297 )
298 .unwrap(),
299 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
300 name("total_handler_checkpoints_processed"),
301 "Total number of checkpoints processed (converted into rows) by this handler",
302 &["pipeline"],
303 registry,
304 )
305 .unwrap(),
306 total_handler_rows_created: register_int_counter_vec_with_registry!(
307 name("total_handler_rows_created"),
308 "Total number of rows created by this handler",
309 &["pipeline"],
310 registry,
311 )
312 .unwrap(),
313 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
314 name("latest_processed_checkpoint"),
315 "Latest checkpoint sequence number processed by this handler",
316 &["pipeline"],
317 registry,
318 )
319 .unwrap(),
320 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
321 name("latest_processed_checkpoint_timestamp_lag_ms"),
322 "Difference between the system timestamp when the latest checkpoint was processed and the \
323 timestamp in the checkpoint, in milliseconds",
324 &["pipeline"],
325 registry,
326 )
327 .unwrap(),
328 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
329 name("processed_checkpoint_timestamp_lag"),
330 "Difference between the system timestamp when a checkpoint was processed and the \
331 timestamp in each checkpoint, in seconds",
332 &["pipeline"],
333 LAG_SEC_BUCKETS.to_vec(),
334 registry,
335 )
336 .unwrap(),
337 handler_checkpoint_latency: register_histogram_vec_with_registry!(
338 name("handler_checkpoint_latency"),
339 "Time taken to process a checkpoint by this handler",
340 &["pipeline"],
341 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
342 registry,
343 )
344 .unwrap(),
345 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
346 name("total_collector_checkpoints_received"),
347 "Total number of checkpoints received by this collector",
348 &["pipeline"],
349 registry,
350 )
351 .unwrap(),
352 total_collector_rows_received: register_int_counter_vec_with_registry!(
353 name("total_collector_rows_received"),
354 "Total number of rows received by this collector",
355 &["pipeline"],
356 registry,
357 )
358 .unwrap(),
359 total_collector_batches_created: register_int_counter_vec_with_registry!(
360 name("total_collector_batches_created"),
361 "Total number of batches created by this collector",
362 &["pipeline"],
363 registry,
364 )
365 .unwrap(),
366 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
367 name("total_committer_batches_attempted"),
368 "Total number of batches writes attempted by this committer",
369 &["pipeline"],
370 registry,
371 )
372 .unwrap(),
373 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
374 name("total_committer_batches_succeeded"),
375 "Total number of successful batches writes by this committer",
376 &["pipeline"],
377 registry,
378 )
379 .unwrap(),
380 total_committer_batches_failed: register_int_counter_vec_with_registry!(
381 name("total_committer_batches_failed"),
382 "Total number of failed batches writes by this committer",
383 &["pipeline"],
384 registry,
385 )
386 .unwrap(),
387 total_committer_rows_committed: register_int_counter_vec_with_registry!(
388 name("total_committer_rows_committed"),
389 "Total number of rows sent to the database by this committer",
390 &["pipeline"],
391 registry,
392 )
393 .unwrap(),
394 total_committer_rows_affected: register_int_counter_vec_with_registry!(
395 name("total_committer_rows_affected"),
396 "Total number of rows actually written to the database by this committer",
397 &["pipeline"],
398 registry,
399 )
400 .unwrap(),
401 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
402 name("watermark_out_of_order"),
403 "Number of times this committer encountered a batch for a checkpoint before its watermark",
404 &["pipeline"],
405 registry,
406 )
407 .unwrap(),
408 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
409 name("pruner_chunks_attempted"),
410 "Number of chunks this pruner attempted to delete",
411 &["pipeline"],
412 registry,
413 )
414 .unwrap(),
415 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
416 name("pruner_chunks_deleted"),
417 "Number of chunks this pruner successfully deleted",
418 &["pipeline"],
419 registry,
420 )
421 .unwrap(),
422 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
423 name("pruner_rows_deleted"),
424 "Number of rows this pruner successfully deleted",
425 &["pipeline"],
426 registry,
427 )
428 .unwrap(),
429 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
430 name("latest_collected_checkpoint"),
431 "Latest checkpoint sequence number collected by this collector",
432 &["pipeline"],
433 registry,
434 )
435 .unwrap(),
436 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
437 name("latest_collected_checkpoint_timestamp_lag_ms"),
438 "Difference between the system timestamp when the latest checkpoint was collected and the \
439 timestamp in the checkpoint, in milliseconds",
440 &["pipeline"],
441 registry,
442 )
443 .unwrap(),
444 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
445 name("collected_checkpoint_timestamp_lag"),
446 "Difference between the system timestamp when a checkpoint was collected and the \
447 timestamp in each checkpoint, in seconds",
448 &["pipeline"],
449 LAG_SEC_BUCKETS.to_vec(),
450 registry,
451 )
452 .unwrap(),
453 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
454 name("latest_partially_committed_checkpoint"),
455 "Latest checkpoint sequence number partially committed by this collector",
456 &["pipeline"],
457 registry,
458 )
459 .unwrap(),
460 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
461 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
462 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
463 timestamp in the checkpoint, in milliseconds",
464 &["pipeline"],
465 registry,
466 )
467 .unwrap(),
468 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
469 name("partially_committed_checkpoint_timestamp_lag"),
470 "Difference between the system timestamp when a checkpoint was partially committed and the \
471 timestamp in each checkpoint, in seconds",
472 &["pipeline"],
473 LAG_SEC_BUCKETS.to_vec(),
474 registry,
475 )
476 .unwrap(),
477 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
478 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
479 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
480 timestamp in the checkpoint, in milliseconds",
481 &["pipeline"],
482 registry,
483 )
484 .unwrap(),
485 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
486 name("watermarked_checkpoint_timestamp_lag"),
487 "Difference between the system timestamp when a checkpoint was watermarked and the \
488 timestamp in each checkpoint, in seconds",
489 &["pipeline"],
490 LAG_SEC_BUCKETS.to_vec(),
491 registry,
492 )
493 .unwrap(),
494 collector_gather_latency: register_histogram_vec_with_registry!(
495 name("collector_gather_latency"),
496 "Time taken to gather rows into a batch by this collector",
497 &["pipeline"],
498 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
499 registry,
500 )
501 .unwrap(),
502 collector_batch_size: register_histogram_vec_with_registry!(
503 name("collector_batch_size"),
504 "Number of rows in a batch written to the database by this collector",
505 &["pipeline"],
506 BATCH_SIZE_BUCKETS.to_vec(),
507 registry,
508 )
509 .unwrap(),
510 committer_commit_latency: register_histogram_vec_with_registry!(
511 name("committer_commit_latency"),
512 "Time taken to write a batch of rows to the database by this committer",
513 &["pipeline"],
514 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
515 registry,
516 )
517 .unwrap(),
518 committer_tx_rows: register_histogram_vec_with_registry!(
519 name("committer_tx_rows"),
520 "Number of rows written to the database in a single database transaction by this committer",
521 &["pipeline"],
522 BATCH_SIZE_BUCKETS.to_vec(),
523 registry,
524 )
525 .unwrap(),
526 watermark_gather_latency: register_histogram_vec_with_registry!(
527 name("watermark_gather_latency"),
528 "Time taken to calculate the new high watermark after a write by this committer",
529 &["pipeline"],
530 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
531 registry,
532 )
533 .unwrap(),
534 watermark_commit_latency: register_histogram_vec_with_registry!(
535 name("watermark_commit_latency"),
536 "Time taken to write the new high watermark to the database by this committer",
537 &["pipeline"],
538 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
539 registry,
540 )
541 .unwrap(),
542 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
543 name("watermark_pruner_read_latency"),
544 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
545 &["pipeline"],
546 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
547 registry,
548 )
549 .unwrap(),
550 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
551 name("watermark_pruner_write_latency"),
552 "Time taken to write the pruner's new upperbound to the database by this pruner",
553 &["pipeline"],
554 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
555 registry,
556 )
557 .unwrap(),
558 pruner_delete_latency: register_histogram_vec_with_registry!(
559 name("pruner_delete_latency"),
560 "Time taken to delete a chunk of data from the database by this pruner",
561 &["pipeline"],
562 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
563 registry,
564 )
565 .unwrap(),
566 watermark_epoch: register_int_gauge_vec_with_registry!(
567 name("watermark_epoch"),
568 "Current epoch high watermark for this committer",
569 &["pipeline"],
570 registry,
571 )
572 .unwrap(),
573 watermark_checkpoint: register_int_gauge_vec_with_registry!(
574 name("watermark_checkpoint"),
575 "Current checkpoint high watermark for this committer",
576 &["pipeline"],
577 registry,
578 )
579 .unwrap(),
580 watermark_transaction: register_int_gauge_vec_with_registry!(
581 name("watermark_transaction"),
582 "Current transaction high watermark for this committer",
583 &["pipeline"],
584 registry,
585 )
586 .unwrap(),
587 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
588 name("watermark_timestamp_ms"),
589 "Current timestamp high watermark for this committer, in milliseconds",
590 &["pipeline"],
591 registry,
592 )
593 .unwrap(),
594 watermark_reader_lo: register_int_gauge_vec_with_registry!(
595 name("watermark_reader_lo"),
596 "Current reader low watermark for this pruner",
597 &["pipeline"],
598 registry,
599 )
600 .unwrap(),
601 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
602 name("watermark_pruner_hi"),
603 "Current pruner high watermark for this pruner",
604 &["pipeline"],
605 registry,
606 )
607 .unwrap(),
608 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
609 name("watermark_epoch_in_db"),
610 "Last epoch high watermark this committer wrote to the DB",
611 &["pipeline"],
612 registry,
613 )
614 .unwrap(),
615 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
616 name("watermark_checkpoint_in_db"),
617 "Last checkpoint high watermark this committer wrote to the DB",
618 &["pipeline"],
619 registry,
620 )
621 .unwrap(),
622 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
623 name("watermark_transaction_in_db"),
624 "Last transaction high watermark this committer wrote to the DB",
625 &["pipeline"],
626 registry,
627 )
628 .unwrap(),
629 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
630 name("watermark_timestamp_ms_in_db"),
631 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
632 &["pipeline"],
633 registry,
634 )
635 .unwrap(),
636 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
637 name("watermark_reader_lo_in_db"),
638 "Last reader low watermark this pruner wrote to the DB",
639 &["pipeline"],
640 registry,
641 )
642 .unwrap(),
643 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
644 name("watermark_pruner_hi_in_db"),
645 "Last pruner high watermark this pruner wrote to the DB",
646 &["pipeline"],
647 registry,
648 )
649 .unwrap(),
650 })
651 }
652}
653
654impl CheckpointLagMetricReporter {
655 pub fn new(
656 checkpoint_time_lag_histogram: Histogram,
657 latest_checkpoint_time_lag_gauge: IntGauge,
658 latest_checkpoint_sequence_number_gauge: IntGauge,
659 ) -> Arc<Self> {
660 Arc::new(Self {
661 checkpoint_time_lag_histogram,
662 latest_checkpoint_time_lag_gauge,
663 latest_checkpoint_sequence_number_gauge,
664 latest_reported_checkpoint: AtomicU64::new(0),
665 })
666 }
667
668 pub fn new_for_pipeline<P: Processor>(
669 checkpoint_time_lag_histogram: &HistogramVec,
670 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
671 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
672 ) -> Arc<Self> {
673 Self::new(
674 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
675 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
676 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
677 )
678 }
679
680 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
681 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
682 self.checkpoint_time_lag_histogram
683 .observe((lag as f64) / 1000.0);
684
685 let prev = self
686 .latest_reported_checkpoint
687 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
688 if cp_sequence_number > prev {
689 self.latest_checkpoint_sequence_number_gauge
690 .set(cp_sequence_number as i64);
691 self.latest_checkpoint_time_lag_gauge.set(lag);
692 }
693 }
694}
695
696#[cfg(test)]
697pub(crate) mod tests {
698 use std::sync::Arc;
699
700 use prometheus::Registry;
701
702 use super::{IndexerMetrics, IngestionMetrics};
703
704 pub fn test_metrics() -> Arc<IndexerMetrics> {
706 IndexerMetrics::new(None, &Registry::new())
707 }
708
709 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
711 IngestionMetrics::new(None, &Registry::new())
712 }
713}