sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8    register_histogram_vec_with_registry, register_histogram_with_registry,
9    register_int_counter_vec_with_registry, register_int_counter_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16/// Histogram buckets for the distribution of checkpoint fetching latencies.
17const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
22/// the timestamp in the checkpoint).
23const LAG_SEC_BUCKETS: &[f64] = &[
24    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
29/// (without having to call out to other services).
30const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34/// Histogram buckets for the distribution of latencies for writing to the database.
35const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37    2000.0, 5000.0, 10000.0,
38];
39
40/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
41const BATCH_SIZE_BUCKETS: &[f64] = &[
42    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45/// Metrics specific to the ingestion service.
46#[derive(Clone)]
47pub struct IngestionMetrics {
48    // Statistics related to fetching data from the remote store.
49    pub total_ingested_checkpoints: IntCounter,
50    pub total_ingested_transactions: IntCounter,
51    pub total_ingested_events: IntCounter,
52    pub total_ingested_objects: IntCounter,
53    pub total_ingested_bytes: IntCounter,
54    pub total_ingested_transient_retries: IntCounterVec,
55    pub total_ingested_not_found_retries: IntCounter,
56    pub total_ingested_permanent_errors: IntCounterVec,
57    pub total_streamed_checkpoints: IntCounter,
58    pub total_stream_disconnections: IntCounter,
59    pub total_streaming_connection_failures: IntCounter,
60
61    // Checkpoint lag metrics for the ingestion pipeline.
62    pub latest_ingested_checkpoint: IntGauge,
63    pub latest_streamed_checkpoint: IntGauge,
64    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65    pub ingested_checkpoint_timestamp_lag: Histogram,
66
67    pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72    // Statistics related to individual ingestion pipelines' handlers.
73    pub total_handler_checkpoints_received: IntCounterVec,
74    pub total_handler_checkpoints_processed: IntCounterVec,
75    pub total_handler_rows_created: IntCounterVec,
76
77    pub latest_processed_checkpoint: IntGaugeVec,
78    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79    pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81    pub handler_checkpoint_latency: HistogramVec,
82
83    // Statistics related to individual ingestion pipelines.
84    pub total_collector_checkpoints_received: IntCounterVec,
85    pub total_collector_rows_received: IntCounterVec,
86    pub total_collector_batches_created: IntCounterVec,
87    pub total_committer_batches_attempted: IntCounterVec,
88    pub total_committer_batches_succeeded: IntCounterVec,
89    pub total_committer_batches_failed: IntCounterVec,
90    pub total_committer_rows_committed: IntCounterVec,
91    pub total_committer_rows_affected: IntCounterVec,
92    pub total_watermarks_out_of_order: IntCounterVec,
93    pub total_pruner_chunks_attempted: IntCounterVec,
94    pub total_pruner_chunks_deleted: IntCounterVec,
95    pub total_pruner_rows_deleted: IntCounterVec,
96
97    // Checkpoint lag metrics for the collector.
98    pub latest_collected_checkpoint: IntGaugeVec,
99    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100    pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102    // Checkpoint lag metrics for the committer.
103    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
104    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
105    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
106    pub latest_partially_committed_checkpoint: IntGaugeVec,
107    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110    // Checkpoint lag metrics for the watermarker.
111    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
112    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
113    // for consistency.
114    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117    pub collector_gather_latency: HistogramVec,
118    pub collector_batch_size: HistogramVec,
119    pub total_collector_skipped_checkpoints: IntCounterVec,
120    pub committer_commit_latency: HistogramVec,
121    pub committer_tx_rows: HistogramVec,
122    pub watermark_gather_latency: HistogramVec,
123    pub watermark_commit_latency: HistogramVec,
124    pub watermark_pruner_read_latency: HistogramVec,
125    pub watermark_pruner_write_latency: HistogramVec,
126    pub pruner_delete_latency: HistogramVec,
127
128    pub watermark_epoch: IntGaugeVec,
129    pub watermark_checkpoint: IntGaugeVec,
130    pub watermark_transaction: IntGaugeVec,
131    pub watermark_timestamp_ms: IntGaugeVec,
132    pub watermark_reader_lo: IntGaugeVec,
133    pub watermark_pruner_hi: IntGaugeVec,
134
135    pub watermark_epoch_in_db: IntGaugeVec,
136    pub watermark_checkpoint_in_db: IntGaugeVec,
137    pub watermark_transaction_in_db: IntGaugeVec,
138    pub watermark_timestamp_in_db_ms: IntGaugeVec,
139    pub watermark_reader_lo_in_db: IntGaugeVec,
140    pub watermark_pruner_hi_in_db: IntGaugeVec,
141}
142
143/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
144pub(crate) struct CheckpointLagMetricReporter {
145    /// Metric to report the lag distribution of each checkpoint.
146    checkpoint_time_lag_histogram: Histogram,
147    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
148    /// This is needed since concurrent pipelines observe checkpoints out of order.
149    latest_checkpoint_time_lag_gauge: IntGauge,
150    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
151    latest_checkpoint_sequence_number_gauge: IntGauge,
152    // Internal state to keep track of the highest checkpoint sequence number reported so far.
153    latest_reported_checkpoint: AtomicU64,
154}
155
156impl IngestionMetrics {
157    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
158        let prefix = prefix.unwrap_or("indexer");
159        let name = |n| format!("{prefix}_{n}");
160        Arc::new(Self {
161            total_ingested_checkpoints: register_int_counter_with_registry!(
162                name("total_ingested_checkpoints"),
163                "Total number of checkpoints fetched from the remote store",
164                registry,
165            )
166            .unwrap(),
167            total_ingested_transactions: register_int_counter_with_registry!(
168                name("total_ingested_transactions"),
169                "Total number of transactions fetched from the remote store",
170                registry,
171            )
172            .unwrap(),
173            total_ingested_events: register_int_counter_with_registry!(
174                name("total_ingested_events"),
175                "Total number of events fetched from the remote store",
176                registry,
177            )
178            .unwrap(),
179            total_ingested_objects: register_int_counter_with_registry!(
180                name("total_ingested_objects"),
181                "Total number of objects in checkpoints fetched from the remote store",
182                registry,
183            )
184            .unwrap(),
185            total_ingested_bytes: register_int_counter_with_registry!(
186                name("total_ingested_bytes"),
187                "Total number of bytes fetched from the remote store, this metric will not \
188                be updated when data are fetched over gRPC.",
189                registry,
190            )
191            .unwrap(),
192            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
193                name("total_ingested_retries"),
194                "Total number of retries due to transient errors while fetching data from the \
195                 remote store",
196                &["reason"],
197                registry,
198            )
199            .unwrap(),
200            total_ingested_not_found_retries: register_int_counter_with_registry!(
201                name("total_ingested_not_found_retries"),
202                "Total number of retries due to the not found errors while fetching data from the \
203                 remote store",
204                registry,
205            )
206            .unwrap(),
207            total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
208                name("total_ingested_permanent_errors"),
209                "Total number of permanent errors encountered while fetching data from the \
210                 remote store, which cause the ingestion service to shutdown",
211                &["reason"],
212                registry,
213            )
214            .unwrap(),
215            total_streamed_checkpoints: register_int_counter_with_registry!(
216                name("total_streamed_checkpoints"),
217                "Total number of checkpoints received from gRPC streaming",
218                registry,
219            )
220            .unwrap(),
221            total_stream_disconnections: register_int_counter_with_registry!(
222                name("total_stream_disconnections"),
223                "Total number of times the gRPC stream was disconnected",
224                registry,
225            )
226            .unwrap(),
227            total_streaming_connection_failures: register_int_counter_with_registry!(
228                name("total_streaming_connection_failures"),
229                "Total number of failures due to streaming service connection or peek failures",
230                registry,
231            )
232            .unwrap(),
233            latest_ingested_checkpoint: register_int_gauge_with_registry!(
234                name("latest_ingested_checkpoint"),
235                "Latest checkpoint sequence number fetched from the remote store",
236                registry,
237            )
238            .unwrap(),
239            latest_streamed_checkpoint: register_int_gauge_with_registry!(
240                name("latest_streamed_checkpoint"),
241                "Latest checkpoint sequence number received from gRPC streaming",
242                registry,
243            )
244            .unwrap(),
245            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
246                name("latest_ingested_checkpoint_timestamp_lag_ms"),
247                "Difference between the system timestamp when the latest checkpoint was fetched and the \
248                 timestamp in the checkpoint, in milliseconds",
249                registry,
250            )
251            .unwrap(),
252            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
253                name("ingested_checkpoint_timestamp_lag"),
254                "Difference between the system timestamp when a checkpoint was fetched and the \
255                 timestamp in each checkpoint, in seconds",
256                LAG_SEC_BUCKETS.to_vec(),
257                registry,
258            )
259            .unwrap(),
260            ingested_checkpoint_latency: register_histogram_with_registry!(
261                name("ingested_checkpoint_latency"),
262                "Time taken to fetch a checkpoint from the remote store, including retries",
263                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
264                registry,
265            )
266            .unwrap(),
267        })
268    }
269
270    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
271    /// reason and error.
272    pub(crate) fn inc_retry(
273        &self,
274        checkpoint: u64,
275        reason: &str,
276        error: Error,
277    ) -> backoff::Error<Error> {
278        warn!(checkpoint, reason, "Retrying due to error: {error}");
279
280        self.total_ingested_transient_retries
281            .with_label_values(&[reason])
282            .inc();
283
284        backoff::Error::transient(error)
285    }
286}
287
288impl IndexerMetrics {
289    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
290        let prefix = prefix.unwrap_or("indexer");
291        let name = |n| format!("{prefix}_{n}");
292        Arc::new(Self {
293            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
294                name("total_handler_checkpoints_received"),
295                "Total number of checkpoints received by this handler",
296                &["pipeline"],
297                registry,
298            )
299            .unwrap(),
300            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
301                name("total_handler_checkpoints_processed"),
302                "Total number of checkpoints processed (converted into rows) by this handler",
303                &["pipeline"],
304                registry,
305            )
306            .unwrap(),
307            total_handler_rows_created: register_int_counter_vec_with_registry!(
308                name("total_handler_rows_created"),
309                "Total number of rows created by this handler",
310                &["pipeline"],
311                registry,
312            )
313            .unwrap(),
314            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
315                name("latest_processed_checkpoint"),
316                "Latest checkpoint sequence number processed by this handler",
317                &["pipeline"],
318                registry,
319            )
320            .unwrap(),
321            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
322                name("latest_processed_checkpoint_timestamp_lag_ms"),
323                "Difference between the system timestamp when the latest checkpoint was processed and the \
324                 timestamp in the checkpoint, in milliseconds",
325                &["pipeline"],
326                registry,
327            )
328            .unwrap(),
329            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
330                name("processed_checkpoint_timestamp_lag"),
331                "Difference between the system timestamp when a checkpoint was processed and the \
332                 timestamp in each checkpoint, in seconds",
333                &["pipeline"],
334                LAG_SEC_BUCKETS.to_vec(),
335                registry,
336            )
337            .unwrap(),
338            handler_checkpoint_latency: register_histogram_vec_with_registry!(
339                name("handler_checkpoint_latency"),
340                "Time taken to process a checkpoint by this handler",
341                &["pipeline"],
342                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
343                registry,
344            )
345            .unwrap(),
346            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
347                name("total_collector_checkpoints_received"),
348                "Total number of checkpoints received by this collector",
349                &["pipeline"],
350                registry,
351            )
352            .unwrap(),
353            total_collector_rows_received: register_int_counter_vec_with_registry!(
354                name("total_collector_rows_received"),
355                "Total number of rows received by this collector",
356                &["pipeline"],
357                registry,
358            )
359            .unwrap(),
360            total_collector_batches_created: register_int_counter_vec_with_registry!(
361                name("total_collector_batches_created"),
362                "Total number of batches created by this collector",
363                &["pipeline"],
364                registry,
365            )
366            .unwrap(),
367            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
368                name("total_committer_batches_attempted"),
369                "Total number of batches writes attempted by this committer",
370                &["pipeline"],
371                registry,
372            )
373            .unwrap(),
374            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
375                name("total_committer_batches_succeeded"),
376                "Total number of successful batches writes by this committer",
377                &["pipeline"],
378                registry,
379            )
380            .unwrap(),
381            total_committer_batches_failed: register_int_counter_vec_with_registry!(
382                name("total_committer_batches_failed"),
383                "Total number of failed batches writes by this committer",
384                &["pipeline"],
385                registry,
386            )
387            .unwrap(),
388            total_committer_rows_committed: register_int_counter_vec_with_registry!(
389                name("total_committer_rows_committed"),
390                "Total number of rows sent to the database by this committer",
391                &["pipeline"],
392                registry,
393            )
394            .unwrap(),
395            total_committer_rows_affected: register_int_counter_vec_with_registry!(
396                name("total_committer_rows_affected"),
397                "Total number of rows actually written to the database by this committer",
398                &["pipeline"],
399                registry,
400            )
401            .unwrap(),
402            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
403                name("watermark_out_of_order"),
404                "Number of times this committer encountered a batch for a checkpoint before its watermark",
405                &["pipeline"],
406                registry,
407            )
408            .unwrap(),
409            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
410                name("pruner_chunks_attempted"),
411                "Number of chunks this pruner attempted to delete",
412                &["pipeline"],
413                registry,
414            )
415            .unwrap(),
416            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
417                name("pruner_chunks_deleted"),
418                "Number of chunks this pruner successfully deleted",
419                &["pipeline"],
420                registry,
421            )
422            .unwrap(),
423            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
424                name("pruner_rows_deleted"),
425                "Number of rows this pruner successfully deleted",
426                &["pipeline"],
427                registry,
428            )
429            .unwrap(),
430            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
431                name("latest_collected_checkpoint"),
432                "Latest checkpoint sequence number collected by this collector",
433                &["pipeline"],
434                registry,
435            )
436            .unwrap(),
437            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
438                name("latest_collected_checkpoint_timestamp_lag_ms"),
439                "Difference between the system timestamp when the latest checkpoint was collected and the \
440                 timestamp in the checkpoint, in milliseconds",
441                &["pipeline"],
442                registry,
443            )
444            .unwrap(),
445            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
446                name("collected_checkpoint_timestamp_lag"),
447                "Difference between the system timestamp when a checkpoint was collected and the \
448                 timestamp in each checkpoint, in seconds",
449                &["pipeline"],
450                LAG_SEC_BUCKETS.to_vec(),
451                registry,
452            )
453            .unwrap(),
454            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
455                name("latest_partially_committed_checkpoint"),
456                "Latest checkpoint sequence number partially committed by this collector",
457                &["pipeline"],
458                registry,
459            )
460            .unwrap(),
461            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
462                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
463                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
464                 timestamp in the checkpoint, in milliseconds",
465                &["pipeline"],
466                registry,
467            )
468            .unwrap(),
469            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
470                name("partially_committed_checkpoint_timestamp_lag"),
471                "Difference between the system timestamp when a checkpoint was partially committed and the \
472                 timestamp in each checkpoint, in seconds",
473                &["pipeline"],
474                LAG_SEC_BUCKETS.to_vec(),
475                registry,
476            )
477            .unwrap(),
478            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
479                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
480                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
481                 timestamp in the checkpoint, in milliseconds",
482                &["pipeline"],
483                registry,
484            )
485            .unwrap(),
486            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
487                name("watermarked_checkpoint_timestamp_lag"),
488                "Difference between the system timestamp when a checkpoint was watermarked and the \
489                 timestamp in each checkpoint, in seconds",
490                &["pipeline"],
491                LAG_SEC_BUCKETS.to_vec(),
492                registry,
493            )
494            .unwrap(),
495            collector_gather_latency: register_histogram_vec_with_registry!(
496                name("collector_gather_latency"),
497                "Time taken to gather rows into a batch by this collector",
498                &["pipeline"],
499                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
500                registry,
501            )
502            .unwrap(),
503            collector_batch_size: register_histogram_vec_with_registry!(
504                name("collector_batch_size"),
505                "Number of rows in a batch written to the database by this collector",
506                &["pipeline"],
507                BATCH_SIZE_BUCKETS.to_vec(),
508                registry,
509            )
510            .unwrap(),
511            total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
512                name("total_collector_skipped_checkpoints"),
513                "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
514                &["pipeline"],
515                registry,
516            ).unwrap(),
517            committer_commit_latency: register_histogram_vec_with_registry!(
518                name("committer_commit_latency"),
519                "Time taken to write a batch of rows to the database by this committer",
520                &["pipeline"],
521                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
522                registry,
523            )
524            .unwrap(),
525            committer_tx_rows: register_histogram_vec_with_registry!(
526                name("committer_tx_rows"),
527                "Number of rows written to the database in a single database transaction by this committer",
528                &["pipeline"],
529                BATCH_SIZE_BUCKETS.to_vec(),
530                registry,
531            )
532            .unwrap(),
533            watermark_gather_latency: register_histogram_vec_with_registry!(
534                name("watermark_gather_latency"),
535                "Time taken to calculate the new high watermark after a write by this committer",
536                &["pipeline"],
537                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
538                registry,
539            )
540            .unwrap(),
541            watermark_commit_latency: register_histogram_vec_with_registry!(
542                name("watermark_commit_latency"),
543                "Time taken to write the new high watermark to the database by this committer",
544                &["pipeline"],
545                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
546                registry,
547            )
548            .unwrap(),
549            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
550                name("watermark_pruner_read_latency"),
551                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
552                &["pipeline"],
553                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
554                registry,
555            )
556            .unwrap(),
557            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
558                name("watermark_pruner_write_latency"),
559                "Time taken to write the pruner's new upperbound to the database by this pruner",
560                &["pipeline"],
561                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
562                registry,
563            )
564            .unwrap(),
565            pruner_delete_latency: register_histogram_vec_with_registry!(
566                name("pruner_delete_latency"),
567                "Time taken to delete a chunk of data from the database by this pruner",
568                &["pipeline"],
569                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
570                registry,
571            )
572            .unwrap(),
573            watermark_epoch: register_int_gauge_vec_with_registry!(
574                name("watermark_epoch"),
575                "Current epoch high watermark for this committer",
576                &["pipeline"],
577                registry,
578            )
579            .unwrap(),
580            watermark_checkpoint: register_int_gauge_vec_with_registry!(
581                name("watermark_checkpoint"),
582                "Current checkpoint high watermark for this committer",
583                &["pipeline"],
584                registry,
585            )
586            .unwrap(),
587            watermark_transaction: register_int_gauge_vec_with_registry!(
588                name("watermark_transaction"),
589                "Current transaction high watermark for this committer",
590                &["pipeline"],
591                registry,
592            )
593            .unwrap(),
594            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
595                name("watermark_timestamp_ms"),
596                "Current timestamp high watermark for this committer, in milliseconds",
597                &["pipeline"],
598                registry,
599            )
600            .unwrap(),
601            watermark_reader_lo: register_int_gauge_vec_with_registry!(
602                name("watermark_reader_lo"),
603                "Current reader low watermark for this pruner",
604                &["pipeline"],
605                registry,
606            )
607            .unwrap(),
608            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
609                name("watermark_pruner_hi"),
610                "Current pruner high watermark for this pruner",
611                &["pipeline"],
612                registry,
613            )
614            .unwrap(),
615            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
616                name("watermark_epoch_in_db"),
617                "Last epoch high watermark this committer wrote to the DB",
618                &["pipeline"],
619                registry,
620            )
621            .unwrap(),
622            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
623                name("watermark_checkpoint_in_db"),
624                "Last checkpoint high watermark this committer wrote to the DB",
625                &["pipeline"],
626                registry,
627            )
628            .unwrap(),
629            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
630                name("watermark_transaction_in_db"),
631                "Last transaction high watermark this committer wrote to the DB",
632                &["pipeline"],
633                registry,
634            )
635            .unwrap(),
636            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
637                name("watermark_timestamp_ms_in_db"),
638                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
639                &["pipeline"],
640                registry,
641            )
642            .unwrap(),
643            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
644                name("watermark_reader_lo_in_db"),
645                "Last reader low watermark this pruner wrote to the DB",
646                &["pipeline"],
647                registry,
648            )
649            .unwrap(),
650            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
651                name("watermark_pruner_hi_in_db"),
652                "Last pruner high watermark this pruner wrote to the DB",
653                &["pipeline"],
654                registry,
655            )
656            .unwrap(),
657        })
658    }
659}
660
661impl CheckpointLagMetricReporter {
662    pub fn new(
663        checkpoint_time_lag_histogram: Histogram,
664        latest_checkpoint_time_lag_gauge: IntGauge,
665        latest_checkpoint_sequence_number_gauge: IntGauge,
666    ) -> Arc<Self> {
667        Arc::new(Self {
668            checkpoint_time_lag_histogram,
669            latest_checkpoint_time_lag_gauge,
670            latest_checkpoint_sequence_number_gauge,
671            latest_reported_checkpoint: AtomicU64::new(0),
672        })
673    }
674
675    pub fn new_for_pipeline<P: Processor>(
676        checkpoint_time_lag_histogram: &HistogramVec,
677        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
678        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
679    ) -> Arc<Self> {
680        Self::new(
681            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
682            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
683            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
684        )
685    }
686
687    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
688        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
689        self.checkpoint_time_lag_histogram
690            .observe((lag as f64) / 1000.0);
691
692        let prev = self
693            .latest_reported_checkpoint
694            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
695        if cp_sequence_number > prev {
696            self.latest_checkpoint_sequence_number_gauge
697                .set(cp_sequence_number as i64);
698            self.latest_checkpoint_time_lag_gauge.set(lag);
699        }
700    }
701}
702
703#[cfg(test)]
704pub(crate) mod tests {
705    use std::sync::Arc;
706
707    use prometheus::Registry;
708
709    use super::{IndexerMetrics, IngestionMetrics};
710
711    /// Construct IndexerMetrics for test purposes.
712    pub fn test_metrics() -> Arc<IndexerMetrics> {
713        IndexerMetrics::new(None, &Registry::new())
714    }
715
716    /// Construct IngestionMetrics for test purposes.
717    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
718        IngestionMetrics::new(None, &Registry::new())
719    }
720}