1use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8 register_histogram_vec_with_registry, register_histogram_with_registry,
9 register_int_counter_vec_with_registry, register_int_counter_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21const LAG_SEC_BUCKETS: &[f64] = &[
24 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37 2000.0, 5000.0, 10000.0,
38];
39
40const BATCH_SIZE_BUCKETS: &[f64] = &[
42 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45#[derive(Clone)]
46pub struct IndexerMetrics {
47 pub total_ingested_checkpoints: IntCounter,
49 pub total_ingested_transactions: IntCounter,
50 pub total_ingested_events: IntCounter,
51 pub total_ingested_inputs: IntCounter,
52 pub total_ingested_outputs: IntCounter,
53 pub total_ingested_bytes: IntCounter,
54 pub total_ingested_transient_retries: IntCounterVec,
55 pub total_ingested_not_found_retries: IntCounter,
56
57 pub latest_ingested_checkpoint: IntGauge,
59 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
60 pub ingested_checkpoint_timestamp_lag: Histogram,
61
62 pub ingested_checkpoint_latency: Histogram,
63
64 pub total_handler_checkpoints_received: IntCounterVec,
66 pub total_handler_checkpoints_processed: IntCounterVec,
67 pub total_handler_rows_created: IntCounterVec,
68
69 pub latest_processed_checkpoint: IntGaugeVec,
70 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
71 pub processed_checkpoint_timestamp_lag: HistogramVec,
72
73 pub handler_checkpoint_latency: HistogramVec,
74
75 pub total_collector_checkpoints_received: IntCounterVec,
77 pub total_collector_rows_received: IntCounterVec,
78 pub total_collector_batches_created: IntCounterVec,
79 pub total_committer_batches_attempted: IntCounterVec,
80 pub total_committer_batches_succeeded: IntCounterVec,
81 pub total_committer_batches_failed: IntCounterVec,
82 pub total_committer_rows_committed: IntCounterVec,
83 pub total_committer_rows_affected: IntCounterVec,
84 pub total_watermarks_out_of_order: IntCounterVec,
85 pub total_pruner_chunks_attempted: IntCounterVec,
86 pub total_pruner_chunks_deleted: IntCounterVec,
87 pub total_pruner_rows_deleted: IntCounterVec,
88
89 pub latest_collected_checkpoint: IntGaugeVec,
91 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
92 pub collected_checkpoint_timestamp_lag: HistogramVec,
93
94 pub latest_partially_committed_checkpoint: IntGaugeVec,
99 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
100 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
101
102 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
107 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
108
109 pub collector_gather_latency: HistogramVec,
110 pub collector_batch_size: HistogramVec,
111 pub committer_commit_latency: HistogramVec,
112 pub committer_tx_rows: HistogramVec,
113 pub watermark_gather_latency: HistogramVec,
114 pub watermark_commit_latency: HistogramVec,
115 pub watermark_pruner_read_latency: HistogramVec,
116 pub watermark_pruner_write_latency: HistogramVec,
117 pub pruner_delete_latency: HistogramVec,
118
119 pub watermark_epoch: IntGaugeVec,
120 pub watermark_checkpoint: IntGaugeVec,
121 pub watermark_transaction: IntGaugeVec,
122 pub watermark_timestamp_ms: IntGaugeVec,
123 pub watermark_reader_lo: IntGaugeVec,
124 pub watermark_pruner_hi: IntGaugeVec,
125
126 pub watermark_epoch_in_db: IntGaugeVec,
127 pub watermark_checkpoint_in_db: IntGaugeVec,
128 pub watermark_transaction_in_db: IntGaugeVec,
129 pub watermark_timestamp_in_db_ms: IntGaugeVec,
130 pub watermark_reader_lo_in_db: IntGaugeVec,
131 pub watermark_pruner_hi_in_db: IntGaugeVec,
132}
133
134pub(crate) struct CheckpointLagMetricReporter {
136 checkpoint_time_lag_histogram: Histogram,
138 latest_checkpoint_time_lag_gauge: IntGauge,
141 latest_checkpoint_sequence_number_gauge: IntGauge,
143 latest_reported_checkpoint: AtomicU64,
145}
146
147impl IndexerMetrics {
148 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
149 let prefix = prefix.unwrap_or("indexer");
150 let name = |n| format!("{prefix}_{n}");
151 Arc::new(Self {
152 total_ingested_checkpoints: register_int_counter_with_registry!(
153 name("total_ingested_checkpoints"),
154 "Total number of checkpoints fetched from the remote store",
155 registry,
156 )
157 .unwrap(),
158 total_ingested_transactions: register_int_counter_with_registry!(
159 name("total_ingested_transactions"),
160 "Total number of transactions fetched from the remote store",
161 registry,
162 )
163 .unwrap(),
164 total_ingested_events: register_int_counter_with_registry!(
165 name("total_ingested_events"),
166 "Total number of events fetched from the remote store",
167 registry,
168 )
169 .unwrap(),
170 total_ingested_inputs: register_int_counter_with_registry!(
171 name("total_ingested_inputs"),
172 "Total number of input objects fetched from the remote store",
173 registry,
174 )
175 .unwrap(),
176 total_ingested_outputs: register_int_counter_with_registry!(
177 name("total_ingested_outputs"),
178 "Total number of output objects fetched from the remote store",
179 registry,
180 )
181 .unwrap(),
182 total_ingested_bytes: register_int_counter_with_registry!(
183 name("total_ingested_bytes"),
184 "Total number of bytes fetched from the remote store, this metric will not \
185 be updated when data are fetched over gRPC.",
186 registry,
187 )
188 .unwrap(),
189 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
190 name("total_ingested_retries"),
191 "Total number of retries due to transient errors while fetching data from the \
192 remote store",
193 &["reason"],
194 registry,
195 )
196 .unwrap(),
197 total_ingested_not_found_retries: register_int_counter_with_registry!(
198 name("total_ingested_not_found_retries"),
199 "Total number of retries due to the not found errors while fetching data from the \
200 remote store",
201 registry,
202 )
203 .unwrap(),
204 latest_ingested_checkpoint: register_int_gauge_with_registry!(
205 name("latest_ingested_checkpoint"),
206 "Latest checkpoint sequence number fetched from the remote store",
207 registry,
208 )
209 .unwrap(),
210 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
211 name("latest_ingested_checkpoint_timestamp_lag_ms"),
212 "Difference between the system timestamp when the latest checkpoint was fetched and the \
213 timestamp in the checkpoint, in milliseconds",
214 registry,
215 )
216 .unwrap(),
217 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
218 name("ingested_checkpoint_timestamp_lag"),
219 "Difference between the system timestamp when a checkpoint was fetched and the \
220 timestamp in each checkpoint, in seconds",
221 LAG_SEC_BUCKETS.to_vec(),
222 registry,
223 )
224 .unwrap(),
225 ingested_checkpoint_latency: register_histogram_with_registry!(
226 name("ingested_checkpoint_latency"),
227 "Time taken to fetch a checkpoint from the remote store, including retries",
228 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
229 registry,
230 )
231 .unwrap(),
232 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
233 name("total_handler_checkpoints_received"),
234 "Total number of checkpoints received by this handler",
235 &["pipeline"],
236 registry,
237 )
238 .unwrap(),
239 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
240 name("total_handler_checkpoints_processed"),
241 "Total number of checkpoints processed (converted into rows) by this handler",
242 &["pipeline"],
243 registry,
244 )
245 .unwrap(),
246 total_handler_rows_created: register_int_counter_vec_with_registry!(
247 name("total_handler_rows_created"),
248 "Total number of rows created by this handler",
249 &["pipeline"],
250 registry,
251 )
252 .unwrap(),
253 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
254 name("latest_processed_checkpoint"),
255 "Latest checkpoint sequence number processed by this handler",
256 &["pipeline"],
257 registry,
258 )
259 .unwrap(),
260 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
261 name("latest_processed_checkpoint_timestamp_lag_ms"),
262 "Difference between the system timestamp when the latest checkpoint was processed and the \
263 timestamp in the checkpoint, in milliseconds",
264 &["pipeline"],
265 registry,
266 )
267 .unwrap(),
268 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
269 name("processed_checkpoint_timestamp_lag"),
270 "Difference between the system timestamp when a checkpoint was processed and the \
271 timestamp in each checkpoint, in seconds",
272 &["pipeline"],
273 LAG_SEC_BUCKETS.to_vec(),
274 registry,
275 )
276 .unwrap(),
277 handler_checkpoint_latency: register_histogram_vec_with_registry!(
278 name("handler_checkpoint_latency"),
279 "Time taken to process a checkpoint by this handler",
280 &["pipeline"],
281 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
282 registry,
283 )
284 .unwrap(),
285 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
286 name("total_collector_checkpoints_received"),
287 "Total number of checkpoints received by this collector",
288 &["pipeline"],
289 registry,
290 )
291 .unwrap(),
292 total_collector_rows_received: register_int_counter_vec_with_registry!(
293 name("total_collector_rows_received"),
294 "Total number of rows received by this collector",
295 &["pipeline"],
296 registry,
297 )
298 .unwrap(),
299 total_collector_batches_created: register_int_counter_vec_with_registry!(
300 name("total_collector_batches_created"),
301 "Total number of batches created by this collector",
302 &["pipeline"],
303 registry,
304 )
305 .unwrap(),
306 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
307 name("total_committer_batches_attempted"),
308 "Total number of batches writes attempted by this committer",
309 &["pipeline"],
310 registry,
311 )
312 .unwrap(),
313 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
314 name("total_committer_batches_succeeded"),
315 "Total number of successful batches writes by this committer",
316 &["pipeline"],
317 registry,
318 )
319 .unwrap(),
320 total_committer_batches_failed: register_int_counter_vec_with_registry!(
321 name("total_committer_batches_failed"),
322 "Total number of failed batches writes by this committer",
323 &["pipeline"],
324 registry,
325 )
326 .unwrap(),
327 total_committer_rows_committed: register_int_counter_vec_with_registry!(
328 name("total_committer_rows_committed"),
329 "Total number of rows sent to the database by this committer",
330 &["pipeline"],
331 registry,
332 )
333 .unwrap(),
334 total_committer_rows_affected: register_int_counter_vec_with_registry!(
335 name("total_committer_rows_affected"),
336 "Total number of rows actually written to the database by this committer",
337 &["pipeline"],
338 registry,
339 )
340 .unwrap(),
341 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
342 name("watermark_out_of_order"),
343 "Number of times this committer encountered a batch for a checkpoint before its watermark",
344 &["pipeline"],
345 registry,
346 )
347 .unwrap(),
348 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
349 name("pruner_chunks_attempted"),
350 "Number of chunks this pruner attempted to delete",
351 &["pipeline"],
352 registry,
353 )
354 .unwrap(),
355 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
356 name("pruner_chunks_deleted"),
357 "Number of chunks this pruner successfully deleted",
358 &["pipeline"],
359 registry,
360 )
361 .unwrap(),
362 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
363 name("pruner_rows_deleted"),
364 "Number of rows this pruner successfully deleted",
365 &["pipeline"],
366 registry,
367 )
368 .unwrap(),
369 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
370 name("latest_collected_checkpoint"),
371 "Latest checkpoint sequence number collected by this collector",
372 &["pipeline"],
373 registry,
374 )
375 .unwrap(),
376 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
377 name("latest_collected_checkpoint_timestamp_lag_ms"),
378 "Difference between the system timestamp when the latest checkpoint was collected and the \
379 timestamp in the checkpoint, in milliseconds",
380 &["pipeline"],
381 registry,
382 )
383 .unwrap(),
384 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
385 name("collected_checkpoint_timestamp_lag"),
386 "Difference between the system timestamp when a checkpoint was collected and the \
387 timestamp in each checkpoint, in seconds",
388 &["pipeline"],
389 LAG_SEC_BUCKETS.to_vec(),
390 registry,
391 )
392 .unwrap(),
393 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
394 name("latest_partially_committed_checkpoint"),
395 "Latest checkpoint sequence number partially committed by this collector",
396 &["pipeline"],
397 registry,
398 )
399 .unwrap(),
400 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
401 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
402 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
403 timestamp in the checkpoint, in milliseconds",
404 &["pipeline"],
405 registry,
406 )
407 .unwrap(),
408 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
409 name("partially_committed_checkpoint_timestamp_lag"),
410 "Difference between the system timestamp when a checkpoint was partially committed and the \
411 timestamp in each checkpoint, in seconds",
412 &["pipeline"],
413 LAG_SEC_BUCKETS.to_vec(),
414 registry,
415 )
416 .unwrap(),
417 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
418 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
419 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
420 timestamp in the checkpoint, in milliseconds",
421 &["pipeline"],
422 registry,
423 )
424 .unwrap(),
425 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
426 name("watermarked_checkpoint_timestamp_lag"),
427 "Difference between the system timestamp when a checkpoint was watermarked and the \
428 timestamp in each checkpoint, in seconds",
429 &["pipeline"],
430 LAG_SEC_BUCKETS.to_vec(),
431 registry,
432 )
433 .unwrap(),
434 collector_gather_latency: register_histogram_vec_with_registry!(
435 name("collector_gather_latency"),
436 "Time taken to gather rows into a batch by this collector",
437 &["pipeline"],
438 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
439 registry,
440 )
441 .unwrap(),
442 collector_batch_size: register_histogram_vec_with_registry!(
443 name("collector_batch_size"),
444 "Number of rows in a batch written to the database by this collector",
445 &["pipeline"],
446 BATCH_SIZE_BUCKETS.to_vec(),
447 registry,
448 )
449 .unwrap(),
450 committer_commit_latency: register_histogram_vec_with_registry!(
451 name("committer_commit_latency"),
452 "Time taken to write a batch of rows to the database by this committer",
453 &["pipeline"],
454 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
455 registry,
456 )
457 .unwrap(),
458 committer_tx_rows: register_histogram_vec_with_registry!(
459 name("committer_tx_rows"),
460 "Number of rows written to the database in a single database transaction by this committer",
461 &["pipeline"],
462 BATCH_SIZE_BUCKETS.to_vec(),
463 registry,
464 )
465 .unwrap(),
466 watermark_gather_latency: register_histogram_vec_with_registry!(
467 name("watermark_gather_latency"),
468 "Time taken to calculate the new high watermark after a write by this committer",
469 &["pipeline"],
470 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
471 registry,
472 )
473 .unwrap(),
474 watermark_commit_latency: register_histogram_vec_with_registry!(
475 name("watermark_commit_latency"),
476 "Time taken to write the new high watermark to the database by this committer",
477 &["pipeline"],
478 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
479 registry,
480 )
481 .unwrap(),
482 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
483 name("watermark_pruner_read_latency"),
484 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
485 &["pipeline"],
486 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
487 registry,
488 )
489 .unwrap(),
490 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
491 name("watermark_pruner_write_latency"),
492 "Time taken to write the pruner's new upperbound to the database by this pruner",
493 &["pipeline"],
494 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
495 registry,
496 )
497 .unwrap(),
498 pruner_delete_latency: register_histogram_vec_with_registry!(
499 name("pruner_delete_latency"),
500 "Time taken to delete a chunk of data from the database by this pruner",
501 &["pipeline"],
502 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
503 registry,
504 )
505 .unwrap(),
506 watermark_epoch: register_int_gauge_vec_with_registry!(
507 name("watermark_epoch"),
508 "Current epoch high watermark for this committer",
509 &["pipeline"],
510 registry,
511 )
512 .unwrap(),
513 watermark_checkpoint: register_int_gauge_vec_with_registry!(
514 name("watermark_checkpoint"),
515 "Current checkpoint high watermark for this committer",
516 &["pipeline"],
517 registry,
518 )
519 .unwrap(),
520 watermark_transaction: register_int_gauge_vec_with_registry!(
521 name("watermark_transaction"),
522 "Current transaction high watermark for this committer",
523 &["pipeline"],
524 registry,
525 )
526 .unwrap(),
527 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
528 name("watermark_timestamp_ms"),
529 "Current timestamp high watermark for this committer, in milliseconds",
530 &["pipeline"],
531 registry,
532 )
533 .unwrap(),
534 watermark_reader_lo: register_int_gauge_vec_with_registry!(
535 name("watermark_reader_lo"),
536 "Current reader low watermark for this pruner",
537 &["pipeline"],
538 registry,
539 )
540 .unwrap(),
541 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
542 name("watermark_pruner_hi"),
543 "Current pruner high watermark for this pruner",
544 &["pipeline"],
545 registry,
546 )
547 .unwrap(),
548 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
549 name("watermark_epoch_in_db"),
550 "Last epoch high watermark this committer wrote to the DB",
551 &["pipeline"],
552 registry,
553 )
554 .unwrap(),
555 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
556 name("watermark_checkpoint_in_db"),
557 "Last checkpoint high watermark this committer wrote to the DB",
558 &["pipeline"],
559 registry,
560 )
561 .unwrap(),
562 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
563 name("watermark_transaction_in_db"),
564 "Last transaction high watermark this committer wrote to the DB",
565 &["pipeline"],
566 registry,
567 )
568 .unwrap(),
569 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
570 name("watermark_timestamp_ms_in_db"),
571 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
572 &["pipeline"],
573 registry,
574 )
575 .unwrap(),
576 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
577 name("watermark_reader_lo_in_db"),
578 "Last reader low watermark this pruner wrote to the DB",
579 &["pipeline"],
580 registry,
581 )
582 .unwrap(),
583 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
584 name("watermark_pruner_hi_in_db"),
585 "Last pruner high watermark this pruner wrote to the DB",
586 &["pipeline"],
587 registry,
588 )
589 .unwrap(),
590 })
591 }
592
593 pub(crate) fn inc_retry(
596 &self,
597 checkpoint: u64,
598 reason: &str,
599 error: Error,
600 ) -> backoff::Error<Error> {
601 warn!(checkpoint, reason, "Retrying due to error: {error}");
602
603 self.total_ingested_transient_retries
604 .with_label_values(&[reason])
605 .inc();
606
607 backoff::Error::transient(error)
608 }
609}
610
611impl CheckpointLagMetricReporter {
612 pub fn new(
613 checkpoint_time_lag_histogram: Histogram,
614 latest_checkpoint_time_lag_gauge: IntGauge,
615 latest_checkpoint_sequence_number_gauge: IntGauge,
616 ) -> Arc<Self> {
617 Arc::new(Self {
618 checkpoint_time_lag_histogram,
619 latest_checkpoint_time_lag_gauge,
620 latest_checkpoint_sequence_number_gauge,
621 latest_reported_checkpoint: AtomicU64::new(0),
622 })
623 }
624
625 pub fn new_for_pipeline<P: Processor>(
626 checkpoint_time_lag_histogram: &HistogramVec,
627 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
628 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
629 ) -> Arc<Self> {
630 Self::new(
631 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
632 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
633 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
634 )
635 }
636
637 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
638 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
639 self.checkpoint_time_lag_histogram
640 .observe((lag as f64) / 1000.0);
641
642 let prev = self
643 .latest_reported_checkpoint
644 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
645 if cp_sequence_number > prev {
646 self.latest_checkpoint_sequence_number_gauge
647 .set(cp_sequence_number as i64);
648 self.latest_checkpoint_time_lag_gauge.set(lag);
649 }
650 }
651}
652
653#[cfg(test)]
654pub(crate) mod tests {
655 use std::sync::Arc;
656
657 use prometheus::Registry;
658
659 use super::IndexerMetrics;
660
661 pub fn test_metrics() -> Arc<IndexerMetrics> {
663 IndexerMetrics::new(None, &Registry::new())
664 }
665}