sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8    register_histogram_vec_with_registry, register_histogram_with_registry,
9    register_int_counter_vec_with_registry, register_int_counter_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16/// Histogram buckets for the distribution of checkpoint fetching latencies.
17const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
22/// the timestamp in the checkpoint).
23const LAG_SEC_BUCKETS: &[f64] = &[
24    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
29/// (without having to call out to other services).
30const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34/// Histogram buckets for the distribution of latencies for writing to the database.
35const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37    2000.0, 5000.0, 10000.0,
38];
39
40/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
41const BATCH_SIZE_BUCKETS: &[f64] = &[
42    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45#[derive(Clone)]
46pub struct IndexerMetrics {
47    // Statistics related to fetching data from the remote store.
48    pub total_ingested_checkpoints: IntCounter,
49    pub total_ingested_transactions: IntCounter,
50    pub total_ingested_events: IntCounter,
51    pub total_ingested_inputs: IntCounter,
52    pub total_ingested_outputs: IntCounter,
53    pub total_ingested_bytes: IntCounter,
54    pub total_ingested_transient_retries: IntCounterVec,
55    pub total_ingested_not_found_retries: IntCounter,
56
57    // Checkpoint lag metrics for the ingestion pipeline.
58    pub latest_ingested_checkpoint: IntGauge,
59    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
60    pub ingested_checkpoint_timestamp_lag: Histogram,
61
62    pub ingested_checkpoint_latency: Histogram,
63
64    // Statistics related to individual ingestion pipelines' handlers.
65    pub total_handler_checkpoints_received: IntCounterVec,
66    pub total_handler_checkpoints_processed: IntCounterVec,
67    pub total_handler_rows_created: IntCounterVec,
68
69    pub latest_processed_checkpoint: IntGaugeVec,
70    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
71    pub processed_checkpoint_timestamp_lag: HistogramVec,
72
73    pub handler_checkpoint_latency: HistogramVec,
74
75    // Statistics related to individual ingestion pipelines.
76    pub total_collector_checkpoints_received: IntCounterVec,
77    pub total_collector_rows_received: IntCounterVec,
78    pub total_collector_batches_created: IntCounterVec,
79    pub total_committer_batches_attempted: IntCounterVec,
80    pub total_committer_batches_succeeded: IntCounterVec,
81    pub total_committer_batches_failed: IntCounterVec,
82    pub total_committer_rows_committed: IntCounterVec,
83    pub total_committer_rows_affected: IntCounterVec,
84    pub total_watermarks_out_of_order: IntCounterVec,
85    pub total_pruner_chunks_attempted: IntCounterVec,
86    pub total_pruner_chunks_deleted: IntCounterVec,
87    pub total_pruner_rows_deleted: IntCounterVec,
88
89    // Checkpoint lag metrics for the collector.
90    pub latest_collected_checkpoint: IntGaugeVec,
91    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
92    pub collected_checkpoint_timestamp_lag: HistogramVec,
93
94    // Checkpoint lag metrics for the committer.
95    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
96    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
97    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
98    pub latest_partially_committed_checkpoint: IntGaugeVec,
99    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
100    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
101
102    // Checkpoint lag metrics for the watermarker.
103    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
104    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
105    // for consistency.
106    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
107    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
108
109    pub collector_gather_latency: HistogramVec,
110    pub collector_batch_size: HistogramVec,
111    pub committer_commit_latency: HistogramVec,
112    pub committer_tx_rows: HistogramVec,
113    pub watermark_gather_latency: HistogramVec,
114    pub watermark_commit_latency: HistogramVec,
115    pub watermark_pruner_read_latency: HistogramVec,
116    pub watermark_pruner_write_latency: HistogramVec,
117    pub pruner_delete_latency: HistogramVec,
118
119    pub watermark_epoch: IntGaugeVec,
120    pub watermark_checkpoint: IntGaugeVec,
121    pub watermark_transaction: IntGaugeVec,
122    pub watermark_timestamp_ms: IntGaugeVec,
123    pub watermark_reader_lo: IntGaugeVec,
124    pub watermark_pruner_hi: IntGaugeVec,
125
126    pub watermark_epoch_in_db: IntGaugeVec,
127    pub watermark_checkpoint_in_db: IntGaugeVec,
128    pub watermark_transaction_in_db: IntGaugeVec,
129    pub watermark_timestamp_in_db_ms: IntGaugeVec,
130    pub watermark_reader_lo_in_db: IntGaugeVec,
131    pub watermark_pruner_hi_in_db: IntGaugeVec,
132}
133
134/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
135pub(crate) struct CheckpointLagMetricReporter {
136    /// Metric to report the lag distribution of each checkpoint.
137    checkpoint_time_lag_histogram: Histogram,
138    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
139    /// This is needed since concurrent pipelines observe checkpoints out of order.
140    latest_checkpoint_time_lag_gauge: IntGauge,
141    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
142    latest_checkpoint_sequence_number_gauge: IntGauge,
143    // Internal state to keep track of the highest checkpoint sequence number reported so far.
144    latest_reported_checkpoint: AtomicU64,
145}
146
147impl IndexerMetrics {
148    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
149        let prefix = prefix.unwrap_or("indexer");
150        let name = |n| format!("{prefix}_{n}");
151        Arc::new(Self {
152            total_ingested_checkpoints: register_int_counter_with_registry!(
153                name("total_ingested_checkpoints"),
154                "Total number of checkpoints fetched from the remote store",
155                registry,
156            )
157            .unwrap(),
158            total_ingested_transactions: register_int_counter_with_registry!(
159                name("total_ingested_transactions"),
160                "Total number of transactions fetched from the remote store",
161                registry,
162            )
163            .unwrap(),
164            total_ingested_events: register_int_counter_with_registry!(
165                name("total_ingested_events"),
166                "Total number of events fetched from the remote store",
167                registry,
168            )
169            .unwrap(),
170            total_ingested_inputs: register_int_counter_with_registry!(
171                name("total_ingested_inputs"),
172                "Total number of input objects fetched from the remote store",
173                registry,
174            )
175            .unwrap(),
176            total_ingested_outputs: register_int_counter_with_registry!(
177                name("total_ingested_outputs"),
178                "Total number of output objects fetched from the remote store",
179                registry,
180            )
181            .unwrap(),
182            total_ingested_bytes: register_int_counter_with_registry!(
183                name("total_ingested_bytes"),
184                "Total number of bytes fetched from the remote store, this metric will not \
185                be updated when data are fetched over gRPC.",
186                registry,
187            )
188            .unwrap(),
189            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
190                name("total_ingested_retries"),
191                "Total number of retries due to transient errors while fetching data from the \
192                 remote store",
193                &["reason"],
194                registry,
195            )
196            .unwrap(),
197            total_ingested_not_found_retries: register_int_counter_with_registry!(
198                name("total_ingested_not_found_retries"),
199                "Total number of retries due to the not found errors while fetching data from the \
200                 remote store",
201                registry,
202            )
203            .unwrap(),
204            latest_ingested_checkpoint: register_int_gauge_with_registry!(
205                name("latest_ingested_checkpoint"),
206                "Latest checkpoint sequence number fetched from the remote store",
207                registry,
208            )
209            .unwrap(),
210            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
211                name("latest_ingested_checkpoint_timestamp_lag_ms"),
212                "Difference between the system timestamp when the latest checkpoint was fetched and the \
213                 timestamp in the checkpoint, in milliseconds",
214                registry,
215            )
216            .unwrap(),
217            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
218                name("ingested_checkpoint_timestamp_lag"),
219                "Difference between the system timestamp when a checkpoint was fetched and the \
220                 timestamp in each checkpoint, in seconds",
221                LAG_SEC_BUCKETS.to_vec(),
222                registry,
223            )
224            .unwrap(),
225            ingested_checkpoint_latency: register_histogram_with_registry!(
226                name("ingested_checkpoint_latency"),
227                "Time taken to fetch a checkpoint from the remote store, including retries",
228                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
229                registry,
230            )
231            .unwrap(),
232            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
233                name("total_handler_checkpoints_received"),
234                "Total number of checkpoints received by this handler",
235                &["pipeline"],
236                registry,
237            )
238            .unwrap(),
239            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
240                name("total_handler_checkpoints_processed"),
241                "Total number of checkpoints processed (converted into rows) by this handler",
242                &["pipeline"],
243                registry,
244            )
245            .unwrap(),
246            total_handler_rows_created: register_int_counter_vec_with_registry!(
247                name("total_handler_rows_created"),
248                "Total number of rows created by this handler",
249                &["pipeline"],
250                registry,
251            )
252            .unwrap(),
253            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
254                name("latest_processed_checkpoint"),
255                "Latest checkpoint sequence number processed by this handler",
256                &["pipeline"],
257                registry,
258            )
259            .unwrap(),
260            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
261                name("latest_processed_checkpoint_timestamp_lag_ms"),
262                "Difference between the system timestamp when the latest checkpoint was processed and the \
263                 timestamp in the checkpoint, in milliseconds",
264                &["pipeline"],
265                registry,
266            )
267            .unwrap(),
268            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
269                name("processed_checkpoint_timestamp_lag"),
270                "Difference between the system timestamp when a checkpoint was processed and the \
271                 timestamp in each checkpoint, in seconds",
272                &["pipeline"],
273                LAG_SEC_BUCKETS.to_vec(),
274                registry,
275            )
276            .unwrap(),
277            handler_checkpoint_latency: register_histogram_vec_with_registry!(
278                name("handler_checkpoint_latency"),
279                "Time taken to process a checkpoint by this handler",
280                &["pipeline"],
281                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
282                registry,
283            )
284            .unwrap(),
285            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
286                name("total_collector_checkpoints_received"),
287                "Total number of checkpoints received by this collector",
288                &["pipeline"],
289                registry,
290            )
291            .unwrap(),
292            total_collector_rows_received: register_int_counter_vec_with_registry!(
293                name("total_collector_rows_received"),
294                "Total number of rows received by this collector",
295                &["pipeline"],
296                registry,
297            )
298            .unwrap(),
299            total_collector_batches_created: register_int_counter_vec_with_registry!(
300                name("total_collector_batches_created"),
301                "Total number of batches created by this collector",
302                &["pipeline"],
303                registry,
304            )
305            .unwrap(),
306            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
307                name("total_committer_batches_attempted"),
308                "Total number of batches writes attempted by this committer",
309                &["pipeline"],
310                registry,
311            )
312            .unwrap(),
313            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
314                name("total_committer_batches_succeeded"),
315                "Total number of successful batches writes by this committer",
316                &["pipeline"],
317                registry,
318            )
319            .unwrap(),
320            total_committer_batches_failed: register_int_counter_vec_with_registry!(
321                name("total_committer_batches_failed"),
322                "Total number of failed batches writes by this committer",
323                &["pipeline"],
324                registry,
325            )
326            .unwrap(),
327            total_committer_rows_committed: register_int_counter_vec_with_registry!(
328                name("total_committer_rows_committed"),
329                "Total number of rows sent to the database by this committer",
330                &["pipeline"],
331                registry,
332            )
333            .unwrap(),
334            total_committer_rows_affected: register_int_counter_vec_with_registry!(
335                name("total_committer_rows_affected"),
336                "Total number of rows actually written to the database by this committer",
337                &["pipeline"],
338                registry,
339            )
340            .unwrap(),
341            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
342                name("watermark_out_of_order"),
343                "Number of times this committer encountered a batch for a checkpoint before its watermark",
344                &["pipeline"],
345                registry,
346            )
347            .unwrap(),
348            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
349                name("pruner_chunks_attempted"),
350                "Number of chunks this pruner attempted to delete",
351                &["pipeline"],
352                registry,
353            )
354            .unwrap(),
355            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
356                name("pruner_chunks_deleted"),
357                "Number of chunks this pruner successfully deleted",
358                &["pipeline"],
359                registry,
360            )
361            .unwrap(),
362            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
363                name("pruner_rows_deleted"),
364                "Number of rows this pruner successfully deleted",
365                &["pipeline"],
366                registry,
367            )
368            .unwrap(),
369            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
370                name("latest_collected_checkpoint"),
371                "Latest checkpoint sequence number collected by this collector",
372                &["pipeline"],
373                registry,
374            )
375            .unwrap(),
376            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
377                name("latest_collected_checkpoint_timestamp_lag_ms"),
378                "Difference between the system timestamp when the latest checkpoint was collected and the \
379                 timestamp in the checkpoint, in milliseconds",
380                &["pipeline"],
381                registry,
382            )
383            .unwrap(),
384            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
385                name("collected_checkpoint_timestamp_lag"),
386                "Difference between the system timestamp when a checkpoint was collected and the \
387                 timestamp in each checkpoint, in seconds",
388                &["pipeline"],
389                LAG_SEC_BUCKETS.to_vec(),
390                registry,
391            )
392            .unwrap(),
393            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
394                name("latest_partially_committed_checkpoint"),
395                "Latest checkpoint sequence number partially committed by this collector",
396                &["pipeline"],
397                registry,
398            )
399            .unwrap(),
400            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
401                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
402                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
403                 timestamp in the checkpoint, in milliseconds",
404                &["pipeline"],
405                registry,
406            )
407            .unwrap(),
408            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
409                name("partially_committed_checkpoint_timestamp_lag"),
410                "Difference between the system timestamp when a checkpoint was partially committed and the \
411                 timestamp in each checkpoint, in seconds",
412                &["pipeline"],
413                LAG_SEC_BUCKETS.to_vec(),
414                registry,
415            )
416            .unwrap(),
417            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
418                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
419                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
420                 timestamp in the checkpoint, in milliseconds",
421                &["pipeline"],
422                registry,
423            )
424            .unwrap(),
425            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
426                name("watermarked_checkpoint_timestamp_lag"),
427                "Difference between the system timestamp when a checkpoint was watermarked and the \
428                 timestamp in each checkpoint, in seconds",
429                &["pipeline"],
430                LAG_SEC_BUCKETS.to_vec(),
431                registry,
432            )
433            .unwrap(),
434            collector_gather_latency: register_histogram_vec_with_registry!(
435                name("collector_gather_latency"),
436                "Time taken to gather rows into a batch by this collector",
437                &["pipeline"],
438                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
439                registry,
440            )
441            .unwrap(),
442            collector_batch_size: register_histogram_vec_with_registry!(
443                name("collector_batch_size"),
444                "Number of rows in a batch written to the database by this collector",
445                &["pipeline"],
446                BATCH_SIZE_BUCKETS.to_vec(),
447                registry,
448            )
449            .unwrap(),
450            committer_commit_latency: register_histogram_vec_with_registry!(
451                name("committer_commit_latency"),
452                "Time taken to write a batch of rows to the database by this committer",
453                &["pipeline"],
454                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
455                registry,
456            )
457            .unwrap(),
458            committer_tx_rows: register_histogram_vec_with_registry!(
459                name("committer_tx_rows"),
460                "Number of rows written to the database in a single database transaction by this committer",
461                &["pipeline"],
462                BATCH_SIZE_BUCKETS.to_vec(),
463                registry,
464            )
465            .unwrap(),
466            watermark_gather_latency: register_histogram_vec_with_registry!(
467                name("watermark_gather_latency"),
468                "Time taken to calculate the new high watermark after a write by this committer",
469                &["pipeline"],
470                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
471                registry,
472            )
473            .unwrap(),
474            watermark_commit_latency: register_histogram_vec_with_registry!(
475                name("watermark_commit_latency"),
476                "Time taken to write the new high watermark to the database by this committer",
477                &["pipeline"],
478                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
479                registry,
480            )
481            .unwrap(),
482            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
483                name("watermark_pruner_read_latency"),
484                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
485                &["pipeline"],
486                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
487                registry,
488            )
489            .unwrap(),
490            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
491                name("watermark_pruner_write_latency"),
492                "Time taken to write the pruner's new upperbound to the database by this pruner",
493                &["pipeline"],
494                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
495                registry,
496            )
497            .unwrap(),
498            pruner_delete_latency: register_histogram_vec_with_registry!(
499                name("pruner_delete_latency"),
500                "Time taken to delete a chunk of data from the database by this pruner",
501                &["pipeline"],
502                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
503                registry,
504            )
505            .unwrap(),
506            watermark_epoch: register_int_gauge_vec_with_registry!(
507                name("watermark_epoch"),
508                "Current epoch high watermark for this committer",
509                &["pipeline"],
510                registry,
511            )
512            .unwrap(),
513            watermark_checkpoint: register_int_gauge_vec_with_registry!(
514                name("watermark_checkpoint"),
515                "Current checkpoint high watermark for this committer",
516                &["pipeline"],
517                registry,
518            )
519            .unwrap(),
520            watermark_transaction: register_int_gauge_vec_with_registry!(
521                name("watermark_transaction"),
522                "Current transaction high watermark for this committer",
523                &["pipeline"],
524                registry,
525            )
526            .unwrap(),
527            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
528                name("watermark_timestamp_ms"),
529                "Current timestamp high watermark for this committer, in milliseconds",
530                &["pipeline"],
531                registry,
532            )
533            .unwrap(),
534            watermark_reader_lo: register_int_gauge_vec_with_registry!(
535                name("watermark_reader_lo"),
536                "Current reader low watermark for this pruner",
537                &["pipeline"],
538                registry,
539            )
540            .unwrap(),
541            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
542                name("watermark_pruner_hi"),
543                "Current pruner high watermark for this pruner",
544                &["pipeline"],
545                registry,
546            )
547            .unwrap(),
548            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
549                name("watermark_epoch_in_db"),
550                "Last epoch high watermark this committer wrote to the DB",
551                &["pipeline"],
552                registry,
553            )
554            .unwrap(),
555            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
556                name("watermark_checkpoint_in_db"),
557                "Last checkpoint high watermark this committer wrote to the DB",
558                &["pipeline"],
559                registry,
560            )
561            .unwrap(),
562            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
563                name("watermark_transaction_in_db"),
564                "Last transaction high watermark this committer wrote to the DB",
565                &["pipeline"],
566                registry,
567            )
568            .unwrap(),
569            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
570                name("watermark_timestamp_ms_in_db"),
571                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
572                &["pipeline"],
573                registry,
574            )
575            .unwrap(),
576            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
577                name("watermark_reader_lo_in_db"),
578                "Last reader low watermark this pruner wrote to the DB",
579                &["pipeline"],
580                registry,
581            )
582            .unwrap(),
583            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
584                name("watermark_pruner_hi_in_db"),
585                "Last pruner high watermark this pruner wrote to the DB",
586                &["pipeline"],
587                registry,
588            )
589            .unwrap(),
590        })
591    }
592
593    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
594    /// reason and error.
595    pub(crate) fn inc_retry(
596        &self,
597        checkpoint: u64,
598        reason: &str,
599        error: Error,
600    ) -> backoff::Error<Error> {
601        warn!(checkpoint, reason, "Retrying due to error: {error}");
602
603        self.total_ingested_transient_retries
604            .with_label_values(&[reason])
605            .inc();
606
607        backoff::Error::transient(error)
608    }
609}
610
611impl CheckpointLagMetricReporter {
612    pub fn new(
613        checkpoint_time_lag_histogram: Histogram,
614        latest_checkpoint_time_lag_gauge: IntGauge,
615        latest_checkpoint_sequence_number_gauge: IntGauge,
616    ) -> Arc<Self> {
617        Arc::new(Self {
618            checkpoint_time_lag_histogram,
619            latest_checkpoint_time_lag_gauge,
620            latest_checkpoint_sequence_number_gauge,
621            latest_reported_checkpoint: AtomicU64::new(0),
622        })
623    }
624
625    pub fn new_for_pipeline<P: Processor>(
626        checkpoint_time_lag_histogram: &HistogramVec,
627        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
628        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
629    ) -> Arc<Self> {
630        Self::new(
631            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
632            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
633            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
634        )
635    }
636
637    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
638        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
639        self.checkpoint_time_lag_histogram
640            .observe((lag as f64) / 1000.0);
641
642        let prev = self
643            .latest_reported_checkpoint
644            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
645        if cp_sequence_number > prev {
646            self.latest_checkpoint_sequence_number_gauge
647                .set(cp_sequence_number as i64);
648            self.latest_checkpoint_time_lag_gauge.set(lag);
649        }
650    }
651}
652
653#[cfg(test)]
654pub(crate) mod tests {
655    use std::sync::Arc;
656
657    use prometheus::Registry;
658
659    use super::IndexerMetrics;
660
661    /// Construct metrics for test purposes.
662    pub fn test_metrics() -> Arc<IndexerMetrics> {
663        IndexerMetrics::new(None, &Registry::new())
664    }
665}