sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6
7use prometheus::Histogram;
8use prometheus::HistogramVec;
9use prometheus::IntCounter;
10use prometheus::IntCounterVec;
11use prometheus::IntGauge;
12use prometheus::IntGaugeVec;
13use prometheus::Registry;
14use prometheus::register_histogram_vec_with_registry;
15use prometheus::register_histogram_with_registry;
16use prometheus::register_int_counter_vec_with_registry;
17use prometheus::register_int_counter_with_registry;
18use prometheus::register_int_gauge_vec_with_registry;
19use prometheus::register_int_gauge_with_registry;
20use tracing::warn;
21
22use crate::ingestion::error::Error;
23use crate::pipeline::Processor;
24
25/// Histogram buckets for the distribution of checkpoint fetching latencies.
26const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
27    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
28];
29
30/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
31/// the timestamp in the checkpoint).
32const LAG_SEC_BUCKETS: &[f64] = &[
33    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
34    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
35];
36
37/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
38/// (without having to call out to other services).
39const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
40    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
41];
42
43/// Histogram buckets for the distribution of latencies for writing to the database.
44const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
45    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
46    2000.0, 5000.0, 10000.0,
47];
48
49/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
50const BATCH_SIZE_BUCKETS: &[f64] = &[
51    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
52];
53
54/// Metrics specific to the ingestion service.
55#[derive(Clone)]
56pub struct IngestionMetrics {
57    // Statistics related to fetching data from the remote store.
58    pub total_ingested_checkpoints: IntCounter,
59    pub total_ingested_transactions: IntCounter,
60    pub total_ingested_events: IntCounter,
61    pub total_ingested_objects: IntCounter,
62    pub total_ingested_bytes: IntCounter,
63    pub total_ingested_transient_retries: IntCounterVec,
64    pub total_ingested_not_found_retries: IntCounter,
65    pub total_ingested_permanent_errors: IntCounterVec,
66    pub total_streamed_checkpoints: IntCounter,
67    pub total_stream_disconnections: IntCounter,
68    pub total_streaming_connection_failures: IntCounter,
69
70    // Checkpoint lag metrics for the ingestion pipeline.
71    pub latest_ingested_checkpoint: IntGauge,
72    pub latest_streamed_checkpoint: IntGauge,
73    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
74    pub ingested_checkpoint_timestamp_lag: Histogram,
75
76    pub ingested_checkpoint_latency: Histogram,
77}
78
79#[derive(Clone)]
80pub struct IndexerMetrics {
81    // Statistics related to individual ingestion pipelines' handlers.
82    pub total_handler_checkpoints_received: IntCounterVec,
83    pub total_handler_checkpoints_processed: IntCounterVec,
84    pub total_handler_rows_created: IntCounterVec,
85
86    pub latest_processed_checkpoint: IntGaugeVec,
87    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
88    pub processed_checkpoint_timestamp_lag: HistogramVec,
89
90    pub handler_checkpoint_latency: HistogramVec,
91
92    // Statistics related to individual ingestion pipelines.
93    pub total_collector_checkpoints_received: IntCounterVec,
94    pub total_collector_rows_received: IntCounterVec,
95    pub total_collector_batches_created: IntCounterVec,
96    pub total_committer_batches_attempted: IntCounterVec,
97    pub total_committer_batches_succeeded: IntCounterVec,
98    pub total_committer_batches_failed: IntCounterVec,
99    pub total_committer_rows_committed: IntCounterVec,
100    pub total_committer_rows_affected: IntCounterVec,
101    pub total_watermarks_out_of_order: IntCounterVec,
102    pub total_pruner_chunks_attempted: IntCounterVec,
103    pub total_pruner_chunks_deleted: IntCounterVec,
104    pub total_pruner_rows_deleted: IntCounterVec,
105
106    // Checkpoint lag metrics for the collector.
107    pub latest_collected_checkpoint: IntGaugeVec,
108    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
109    pub collected_checkpoint_timestamp_lag: HistogramVec,
110
111    // Checkpoint lag metrics for the committer.
112    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
113    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
114    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
115    pub latest_partially_committed_checkpoint: IntGaugeVec,
116    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
117    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
118
119    // Checkpoint lag metrics for the watermarker.
120    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
121    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
122    // for consistency.
123    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
124    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
125
126    pub collector_gather_latency: HistogramVec,
127    pub collector_batch_size: HistogramVec,
128    pub total_collector_skipped_checkpoints: IntCounterVec,
129    pub collector_reader_lo: IntGaugeVec,
130    pub committer_commit_latency: HistogramVec,
131    pub committer_tx_rows: HistogramVec,
132    pub watermark_gather_latency: HistogramVec,
133    pub watermark_commit_latency: HistogramVec,
134    pub watermark_pruner_read_latency: HistogramVec,
135    pub watermark_pruner_write_latency: HistogramVec,
136    pub pruner_delete_latency: HistogramVec,
137
138    pub watermark_epoch: IntGaugeVec,
139    pub watermark_checkpoint: IntGaugeVec,
140    pub watermark_transaction: IntGaugeVec,
141    pub watermark_timestamp_ms: IntGaugeVec,
142    pub watermark_reader_lo: IntGaugeVec,
143    pub watermark_pruner_hi: IntGaugeVec,
144
145    pub watermark_epoch_in_db: IntGaugeVec,
146    pub watermark_checkpoint_in_db: IntGaugeVec,
147    pub watermark_transaction_in_db: IntGaugeVec,
148    pub watermark_timestamp_in_db_ms: IntGaugeVec,
149    pub watermark_reader_lo_in_db: IntGaugeVec,
150    pub watermark_pruner_hi_in_db: IntGaugeVec,
151}
152
153/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
154pub(crate) struct CheckpointLagMetricReporter {
155    /// Metric to report the lag distribution of each checkpoint.
156    checkpoint_time_lag_histogram: Histogram,
157    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
158    /// This is needed since concurrent pipelines observe checkpoints out of order.
159    latest_checkpoint_time_lag_gauge: IntGauge,
160    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
161    latest_checkpoint_sequence_number_gauge: IntGauge,
162    // Internal state to keep track of the highest checkpoint sequence number reported so far.
163    latest_reported_checkpoint: AtomicU64,
164}
165
166impl IngestionMetrics {
167    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
168        let prefix = prefix.unwrap_or("indexer");
169        let name = |n| format!("{prefix}_{n}");
170        Arc::new(Self {
171            total_ingested_checkpoints: register_int_counter_with_registry!(
172                name("total_ingested_checkpoints"),
173                "Total number of checkpoints fetched from the remote store",
174                registry,
175            )
176            .unwrap(),
177            total_ingested_transactions: register_int_counter_with_registry!(
178                name("total_ingested_transactions"),
179                "Total number of transactions fetched from the remote store",
180                registry,
181            )
182            .unwrap(),
183            total_ingested_events: register_int_counter_with_registry!(
184                name("total_ingested_events"),
185                "Total number of events fetched from the remote store",
186                registry,
187            )
188            .unwrap(),
189            total_ingested_objects: register_int_counter_with_registry!(
190                name("total_ingested_objects"),
191                "Total number of objects in checkpoints fetched from the remote store",
192                registry,
193            )
194            .unwrap(),
195            total_ingested_bytes: register_int_counter_with_registry!(
196                name("total_ingested_bytes"),
197                "Total number of bytes fetched from the remote store, this metric will not \
198                be updated when data are fetched over gRPC.",
199                registry,
200            )
201            .unwrap(),
202            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
203                name("total_ingested_retries"),
204                "Total number of retries due to transient errors while fetching data from the \
205                 remote store",
206                &["reason"],
207                registry,
208            )
209            .unwrap(),
210            total_ingested_not_found_retries: register_int_counter_with_registry!(
211                name("total_ingested_not_found_retries"),
212                "Total number of retries due to the not found errors while fetching data from the \
213                 remote store",
214                registry,
215            )
216            .unwrap(),
217            total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
218                name("total_ingested_permanent_errors"),
219                "Total number of permanent errors encountered while fetching data from the \
220                 remote store, which cause the ingestion service to shutdown",
221                &["reason"],
222                registry,
223            )
224            .unwrap(),
225            total_streamed_checkpoints: register_int_counter_with_registry!(
226                name("total_streamed_checkpoints"),
227                "Total number of checkpoints received from gRPC streaming",
228                registry,
229            )
230            .unwrap(),
231            total_stream_disconnections: register_int_counter_with_registry!(
232                name("total_stream_disconnections"),
233                "Total number of times the gRPC stream was disconnected",
234                registry,
235            )
236            .unwrap(),
237            total_streaming_connection_failures: register_int_counter_with_registry!(
238                name("total_streaming_connection_failures"),
239                "Total number of failures due to streaming service connection or peek failures",
240                registry,
241            )
242            .unwrap(),
243            latest_ingested_checkpoint: register_int_gauge_with_registry!(
244                name("latest_ingested_checkpoint"),
245                "Latest checkpoint sequence number fetched from the remote store",
246                registry,
247            )
248            .unwrap(),
249            latest_streamed_checkpoint: register_int_gauge_with_registry!(
250                name("latest_streamed_checkpoint"),
251                "Latest checkpoint sequence number received from gRPC streaming",
252                registry,
253            )
254            .unwrap(),
255            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
256                name("latest_ingested_checkpoint_timestamp_lag_ms"),
257                "Difference between the system timestamp when the latest checkpoint was fetched and the \
258                 timestamp in the checkpoint, in milliseconds",
259                registry,
260            )
261            .unwrap(),
262            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
263                name("ingested_checkpoint_timestamp_lag"),
264                "Difference between the system timestamp when a checkpoint was fetched and the \
265                 timestamp in each checkpoint, in seconds",
266                LAG_SEC_BUCKETS.to_vec(),
267                registry,
268            )
269            .unwrap(),
270            ingested_checkpoint_latency: register_histogram_with_registry!(
271                name("ingested_checkpoint_latency"),
272                "Time taken to fetch a checkpoint from the remote store, including retries",
273                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
274                registry,
275            )
276            .unwrap(),
277        })
278    }
279
280    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
281    /// reason and error.
282    pub(crate) fn inc_retry(
283        &self,
284        checkpoint: u64,
285        reason: &str,
286        error: Error,
287    ) -> backoff::Error<Error> {
288        warn!(checkpoint, reason, "Retrying due to error: {error}");
289
290        self.total_ingested_transient_retries
291            .with_label_values(&[reason])
292            .inc();
293
294        backoff::Error::transient(error)
295    }
296}
297
298impl IndexerMetrics {
299    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
300        let prefix = prefix.unwrap_or("indexer");
301        let name = |n| format!("{prefix}_{n}");
302        Arc::new(Self {
303            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
304                name("total_handler_checkpoints_received"),
305                "Total number of checkpoints received by this handler",
306                &["pipeline"],
307                registry,
308            )
309            .unwrap(),
310            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
311                name("total_handler_checkpoints_processed"),
312                "Total number of checkpoints processed (converted into rows) by this handler",
313                &["pipeline"],
314                registry,
315            )
316            .unwrap(),
317            total_handler_rows_created: register_int_counter_vec_with_registry!(
318                name("total_handler_rows_created"),
319                "Total number of rows created by this handler",
320                &["pipeline"],
321                registry,
322            )
323            .unwrap(),
324            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
325                name("latest_processed_checkpoint"),
326                "Latest checkpoint sequence number processed by this handler",
327                &["pipeline"],
328                registry,
329            )
330            .unwrap(),
331            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
332                name("latest_processed_checkpoint_timestamp_lag_ms"),
333                "Difference between the system timestamp when the latest checkpoint was processed and the \
334                 timestamp in the checkpoint, in milliseconds",
335                &["pipeline"],
336                registry,
337            )
338            .unwrap(),
339            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
340                name("processed_checkpoint_timestamp_lag"),
341                "Difference between the system timestamp when a checkpoint was processed and the \
342                 timestamp in each checkpoint, in seconds",
343                &["pipeline"],
344                LAG_SEC_BUCKETS.to_vec(),
345                registry,
346            )
347            .unwrap(),
348            handler_checkpoint_latency: register_histogram_vec_with_registry!(
349                name("handler_checkpoint_latency"),
350                "Time taken to process a checkpoint by this handler",
351                &["pipeline"],
352                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
353                registry,
354            )
355            .unwrap(),
356            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
357                name("total_collector_checkpoints_received"),
358                "Total number of checkpoints received by this collector",
359                &["pipeline"],
360                registry,
361            )
362            .unwrap(),
363            total_collector_rows_received: register_int_counter_vec_with_registry!(
364                name("total_collector_rows_received"),
365                "Total number of rows received by this collector",
366                &["pipeline"],
367                registry,
368            )
369            .unwrap(),
370            total_collector_batches_created: register_int_counter_vec_with_registry!(
371                name("total_collector_batches_created"),
372                "Total number of batches created by this collector",
373                &["pipeline"],
374                registry,
375            )
376            .unwrap(),
377            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
378                name("total_committer_batches_attempted"),
379                "Total number of batches writes attempted by this committer",
380                &["pipeline"],
381                registry,
382            )
383            .unwrap(),
384            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
385                name("total_committer_batches_succeeded"),
386                "Total number of successful batches writes by this committer",
387                &["pipeline"],
388                registry,
389            )
390            .unwrap(),
391            total_committer_batches_failed: register_int_counter_vec_with_registry!(
392                name("total_committer_batches_failed"),
393                "Total number of failed batches writes by this committer",
394                &["pipeline"],
395                registry,
396            )
397            .unwrap(),
398            total_committer_rows_committed: register_int_counter_vec_with_registry!(
399                name("total_committer_rows_committed"),
400                "Total number of rows sent to the database by this committer",
401                &["pipeline"],
402                registry,
403            )
404            .unwrap(),
405            total_committer_rows_affected: register_int_counter_vec_with_registry!(
406                name("total_committer_rows_affected"),
407                "Total number of rows actually written to the database by this committer",
408                &["pipeline"],
409                registry,
410            )
411            .unwrap(),
412            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
413                name("watermark_out_of_order"),
414                "Number of times this committer encountered a batch for a checkpoint before its watermark",
415                &["pipeline"],
416                registry,
417            )
418            .unwrap(),
419            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
420                name("pruner_chunks_attempted"),
421                "Number of chunks this pruner attempted to delete",
422                &["pipeline"],
423                registry,
424            )
425            .unwrap(),
426            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
427                name("pruner_chunks_deleted"),
428                "Number of chunks this pruner successfully deleted",
429                &["pipeline"],
430                registry,
431            )
432            .unwrap(),
433            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
434                name("pruner_rows_deleted"),
435                "Number of rows this pruner successfully deleted",
436                &["pipeline"],
437                registry,
438            )
439            .unwrap(),
440            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
441                name("latest_collected_checkpoint"),
442                "Latest checkpoint sequence number collected by this collector",
443                &["pipeline"],
444                registry,
445            )
446            .unwrap(),
447            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
448                name("latest_collected_checkpoint_timestamp_lag_ms"),
449                "Difference between the system timestamp when the latest checkpoint was collected and the \
450                 timestamp in the checkpoint, in milliseconds",
451                &["pipeline"],
452                registry,
453            )
454            .unwrap(),
455            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
456                name("collected_checkpoint_timestamp_lag"),
457                "Difference between the system timestamp when a checkpoint was collected and the \
458                 timestamp in each checkpoint, in seconds",
459                &["pipeline"],
460                LAG_SEC_BUCKETS.to_vec(),
461                registry,
462            )
463            .unwrap(),
464            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
465                name("latest_partially_committed_checkpoint"),
466                "Latest checkpoint sequence number partially committed by this collector",
467                &["pipeline"],
468                registry,
469            )
470            .unwrap(),
471            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
472                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
473                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
474                 timestamp in the checkpoint, in milliseconds",
475                &["pipeline"],
476                registry,
477            )
478            .unwrap(),
479            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
480                name("partially_committed_checkpoint_timestamp_lag"),
481                "Difference between the system timestamp when a checkpoint was partially committed and the \
482                 timestamp in each checkpoint, in seconds",
483                &["pipeline"],
484                LAG_SEC_BUCKETS.to_vec(),
485                registry,
486            )
487            .unwrap(),
488            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
489                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
490                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
491                 timestamp in the checkpoint, in milliseconds",
492                &["pipeline"],
493                registry,
494            )
495            .unwrap(),
496            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
497                name("watermarked_checkpoint_timestamp_lag"),
498                "Difference between the system timestamp when a checkpoint was watermarked and the \
499                 timestamp in each checkpoint, in seconds",
500                &["pipeline"],
501                LAG_SEC_BUCKETS.to_vec(),
502                registry,
503            )
504            .unwrap(),
505            collector_gather_latency: register_histogram_vec_with_registry!(
506                name("collector_gather_latency"),
507                "Time taken to gather rows into a batch by this collector",
508                &["pipeline"],
509                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
510                registry,
511            )
512            .unwrap(),
513            collector_batch_size: register_histogram_vec_with_registry!(
514                name("collector_batch_size"),
515                "Number of rows in a batch written to the database by this collector",
516                &["pipeline"],
517                BATCH_SIZE_BUCKETS.to_vec(),
518                registry,
519            )
520            .unwrap(),
521            total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
522                name("total_collector_skipped_checkpoints"),
523                "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
524                &["pipeline"],
525                registry,
526            ).unwrap(),
527            collector_reader_lo: register_int_gauge_vec_with_registry!(
528                name("collector_reader_lo"),
529                "Reader low watermark as observed by the collector",
530                &["pipeline"],
531                registry,
532            )
533            .unwrap(),
534            committer_commit_latency: register_histogram_vec_with_registry!(
535                name("committer_commit_latency"),
536                "Time taken to write a batch of rows to the database by this committer",
537                &["pipeline"],
538                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
539                registry,
540            )
541            .unwrap(),
542            committer_tx_rows: register_histogram_vec_with_registry!(
543                name("committer_tx_rows"),
544                "Number of rows written to the database in a single database transaction by this committer",
545                &["pipeline"],
546                BATCH_SIZE_BUCKETS.to_vec(),
547                registry,
548            )
549            .unwrap(),
550            watermark_gather_latency: register_histogram_vec_with_registry!(
551                name("watermark_gather_latency"),
552                "Time taken to calculate the new high watermark after a write by this committer",
553                &["pipeline"],
554                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
555                registry,
556            )
557            .unwrap(),
558            watermark_commit_latency: register_histogram_vec_with_registry!(
559                name("watermark_commit_latency"),
560                "Time taken to write the new high watermark to the database by this committer",
561                &["pipeline"],
562                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
563                registry,
564            )
565            .unwrap(),
566            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
567                name("watermark_pruner_read_latency"),
568                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
569                &["pipeline"],
570                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
571                registry,
572            )
573            .unwrap(),
574            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
575                name("watermark_pruner_write_latency"),
576                "Time taken to write the pruner's new upperbound to the database by this pruner",
577                &["pipeline"],
578                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
579                registry,
580            )
581            .unwrap(),
582            pruner_delete_latency: register_histogram_vec_with_registry!(
583                name("pruner_delete_latency"),
584                "Time taken to delete a chunk of data from the database by this pruner",
585                &["pipeline"],
586                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
587                registry,
588            )
589            .unwrap(),
590            watermark_epoch: register_int_gauge_vec_with_registry!(
591                name("watermark_epoch"),
592                "Current epoch high watermark for this committer",
593                &["pipeline"],
594                registry,
595            )
596            .unwrap(),
597            watermark_checkpoint: register_int_gauge_vec_with_registry!(
598                name("watermark_checkpoint"),
599                "Current checkpoint high watermark for this committer",
600                &["pipeline"],
601                registry,
602            )
603            .unwrap(),
604            watermark_transaction: register_int_gauge_vec_with_registry!(
605                name("watermark_transaction"),
606                "Current transaction high watermark for this committer",
607                &["pipeline"],
608                registry,
609            )
610            .unwrap(),
611            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
612                name("watermark_timestamp_ms"),
613                "Current timestamp high watermark for this committer, in milliseconds",
614                &["pipeline"],
615                registry,
616            )
617            .unwrap(),
618            watermark_reader_lo: register_int_gauge_vec_with_registry!(
619                name("watermark_reader_lo"),
620                "Current reader low watermark for this pruner",
621                &["pipeline"],
622                registry,
623            )
624            .unwrap(),
625            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
626                name("watermark_pruner_hi"),
627                "Current pruner high watermark for this pruner",
628                &["pipeline"],
629                registry,
630            )
631            .unwrap(),
632            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
633                name("watermark_epoch_in_db"),
634                "Last epoch high watermark this committer wrote to the DB",
635                &["pipeline"],
636                registry,
637            )
638            .unwrap(),
639            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
640                name("watermark_checkpoint_in_db"),
641                "Last checkpoint high watermark this committer wrote to the DB",
642                &["pipeline"],
643                registry,
644            )
645            .unwrap(),
646            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
647                name("watermark_transaction_in_db"),
648                "Last transaction high watermark this committer wrote to the DB",
649                &["pipeline"],
650                registry,
651            )
652            .unwrap(),
653            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
654                name("watermark_timestamp_ms_in_db"),
655                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
656                &["pipeline"],
657                registry,
658            )
659            .unwrap(),
660            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
661                name("watermark_reader_lo_in_db"),
662                "Last reader low watermark this pruner wrote to the DB",
663                &["pipeline"],
664                registry,
665            )
666            .unwrap(),
667            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
668                name("watermark_pruner_hi_in_db"),
669                "Last pruner high watermark this pruner wrote to the DB",
670                &["pipeline"],
671                registry,
672            )
673            .unwrap(),
674        })
675    }
676}
677
678impl CheckpointLagMetricReporter {
679    pub fn new(
680        checkpoint_time_lag_histogram: Histogram,
681        latest_checkpoint_time_lag_gauge: IntGauge,
682        latest_checkpoint_sequence_number_gauge: IntGauge,
683    ) -> Arc<Self> {
684        Arc::new(Self {
685            checkpoint_time_lag_histogram,
686            latest_checkpoint_time_lag_gauge,
687            latest_checkpoint_sequence_number_gauge,
688            latest_reported_checkpoint: AtomicU64::new(0),
689        })
690    }
691
692    pub fn new_for_pipeline<P: Processor>(
693        checkpoint_time_lag_histogram: &HistogramVec,
694        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
695        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
696    ) -> Arc<Self> {
697        Self::new(
698            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
699            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
700            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
701        )
702    }
703
704    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
705        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
706        self.checkpoint_time_lag_histogram
707            .observe((lag as f64) / 1000.0);
708
709        let prev = self
710            .latest_reported_checkpoint
711            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
712        if cp_sequence_number > prev {
713            self.latest_checkpoint_sequence_number_gauge
714                .set(cp_sequence_number as i64);
715            self.latest_checkpoint_time_lag_gauge.set(lag);
716        }
717    }
718}
719
720#[cfg(test)]
721pub(crate) mod tests {
722    use std::sync::Arc;
723
724    use prometheus::Registry;
725
726    use super::*;
727
728    /// Construct IndexerMetrics for test purposes.
729    pub fn test_metrics() -> Arc<IndexerMetrics> {
730        IndexerMetrics::new(None, &Registry::new())
731    }
732
733    /// Construct IngestionMetrics for test purposes.
734    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
735        IngestionMetrics::new(None, &Registry::new())
736    }
737}