sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8    register_histogram_vec_with_registry, register_histogram_with_registry,
9    register_int_counter_vec_with_registry, register_int_counter_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16/// Histogram buckets for the distribution of checkpoint fetching latencies.
17const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
22/// the timestamp in the checkpoint).
23const LAG_SEC_BUCKETS: &[f64] = &[
24    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
29/// (without having to call out to other services).
30const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34/// Histogram buckets for the distribution of latencies for writing to the database.
35const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37    2000.0, 5000.0, 10000.0,
38];
39
40/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
41const BATCH_SIZE_BUCKETS: &[f64] = &[
42    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45/// Metrics specific to the ingestion service.
46#[derive(Clone)]
47pub struct IngestionMetrics {
48    // Statistics related to fetching data from the remote store.
49    pub total_ingested_checkpoints: IntCounter,
50    pub total_ingested_transactions: IntCounter,
51    pub total_ingested_events: IntCounter,
52    pub total_ingested_objects: IntCounter,
53    pub total_ingested_bytes: IntCounter,
54    pub total_ingested_transient_retries: IntCounterVec,
55    pub total_ingested_not_found_retries: IntCounter,
56    pub total_ingested_permanent_errors: IntCounterVec,
57    pub total_streamed_checkpoints: IntCounter,
58    pub total_stream_disconnections: IntCounter,
59    pub total_streaming_connection_failures: IntCounter,
60
61    // Checkpoint lag metrics for the ingestion pipeline.
62    pub latest_ingested_checkpoint: IntGauge,
63    pub latest_streamed_checkpoint: IntGauge,
64    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65    pub ingested_checkpoint_timestamp_lag: Histogram,
66
67    pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72    // Statistics related to individual ingestion pipelines' handlers.
73    pub total_handler_checkpoints_received: IntCounterVec,
74    pub total_handler_checkpoints_processed: IntCounterVec,
75    pub total_handler_rows_created: IntCounterVec,
76
77    pub latest_processed_checkpoint: IntGaugeVec,
78    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79    pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81    pub handler_checkpoint_latency: HistogramVec,
82
83    // Statistics related to individual ingestion pipelines.
84    pub total_collector_checkpoints_received: IntCounterVec,
85    pub total_collector_rows_received: IntCounterVec,
86    pub total_collector_batches_created: IntCounterVec,
87    pub total_committer_batches_attempted: IntCounterVec,
88    pub total_committer_batches_succeeded: IntCounterVec,
89    pub total_committer_batches_failed: IntCounterVec,
90    pub total_committer_rows_committed: IntCounterVec,
91    pub total_committer_rows_affected: IntCounterVec,
92    pub total_watermarks_out_of_order: IntCounterVec,
93    pub total_pruner_chunks_attempted: IntCounterVec,
94    pub total_pruner_chunks_deleted: IntCounterVec,
95    pub total_pruner_rows_deleted: IntCounterVec,
96
97    // Checkpoint lag metrics for the collector.
98    pub latest_collected_checkpoint: IntGaugeVec,
99    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100    pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102    // Checkpoint lag metrics for the committer.
103    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
104    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
105    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
106    pub latest_partially_committed_checkpoint: IntGaugeVec,
107    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110    // Checkpoint lag metrics for the watermarker.
111    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
112    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
113    // for consistency.
114    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117    pub collector_gather_latency: HistogramVec,
118    pub collector_batch_size: HistogramVec,
119    pub total_collector_skipped_checkpoints: IntCounterVec,
120    pub collector_reader_lo: IntGaugeVec,
121    pub committer_commit_latency: HistogramVec,
122    pub committer_tx_rows: HistogramVec,
123    pub watermark_gather_latency: HistogramVec,
124    pub watermark_commit_latency: HistogramVec,
125    pub watermark_pruner_read_latency: HistogramVec,
126    pub watermark_pruner_write_latency: HistogramVec,
127    pub pruner_delete_latency: HistogramVec,
128
129    pub watermark_epoch: IntGaugeVec,
130    pub watermark_checkpoint: IntGaugeVec,
131    pub watermark_transaction: IntGaugeVec,
132    pub watermark_timestamp_ms: IntGaugeVec,
133    pub watermark_reader_lo: IntGaugeVec,
134    pub watermark_pruner_hi: IntGaugeVec,
135
136    pub watermark_epoch_in_db: IntGaugeVec,
137    pub watermark_checkpoint_in_db: IntGaugeVec,
138    pub watermark_transaction_in_db: IntGaugeVec,
139    pub watermark_timestamp_in_db_ms: IntGaugeVec,
140    pub watermark_reader_lo_in_db: IntGaugeVec,
141    pub watermark_pruner_hi_in_db: IntGaugeVec,
142}
143
144/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
145pub(crate) struct CheckpointLagMetricReporter {
146    /// Metric to report the lag distribution of each checkpoint.
147    checkpoint_time_lag_histogram: Histogram,
148    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
149    /// This is needed since concurrent pipelines observe checkpoints out of order.
150    latest_checkpoint_time_lag_gauge: IntGauge,
151    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
152    latest_checkpoint_sequence_number_gauge: IntGauge,
153    // Internal state to keep track of the highest checkpoint sequence number reported so far.
154    latest_reported_checkpoint: AtomicU64,
155}
156
157impl IngestionMetrics {
158    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
159        let prefix = prefix.unwrap_or("indexer");
160        let name = |n| format!("{prefix}_{n}");
161        Arc::new(Self {
162            total_ingested_checkpoints: register_int_counter_with_registry!(
163                name("total_ingested_checkpoints"),
164                "Total number of checkpoints fetched from the remote store",
165                registry,
166            )
167            .unwrap(),
168            total_ingested_transactions: register_int_counter_with_registry!(
169                name("total_ingested_transactions"),
170                "Total number of transactions fetched from the remote store",
171                registry,
172            )
173            .unwrap(),
174            total_ingested_events: register_int_counter_with_registry!(
175                name("total_ingested_events"),
176                "Total number of events fetched from the remote store",
177                registry,
178            )
179            .unwrap(),
180            total_ingested_objects: register_int_counter_with_registry!(
181                name("total_ingested_objects"),
182                "Total number of objects in checkpoints fetched from the remote store",
183                registry,
184            )
185            .unwrap(),
186            total_ingested_bytes: register_int_counter_with_registry!(
187                name("total_ingested_bytes"),
188                "Total number of bytes fetched from the remote store, this metric will not \
189                be updated when data are fetched over gRPC.",
190                registry,
191            )
192            .unwrap(),
193            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
194                name("total_ingested_retries"),
195                "Total number of retries due to transient errors while fetching data from the \
196                 remote store",
197                &["reason"],
198                registry,
199            )
200            .unwrap(),
201            total_ingested_not_found_retries: register_int_counter_with_registry!(
202                name("total_ingested_not_found_retries"),
203                "Total number of retries due to the not found errors while fetching data from the \
204                 remote store",
205                registry,
206            )
207            .unwrap(),
208            total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
209                name("total_ingested_permanent_errors"),
210                "Total number of permanent errors encountered while fetching data from the \
211                 remote store, which cause the ingestion service to shutdown",
212                &["reason"],
213                registry,
214            )
215            .unwrap(),
216            total_streamed_checkpoints: register_int_counter_with_registry!(
217                name("total_streamed_checkpoints"),
218                "Total number of checkpoints received from gRPC streaming",
219                registry,
220            )
221            .unwrap(),
222            total_stream_disconnections: register_int_counter_with_registry!(
223                name("total_stream_disconnections"),
224                "Total number of times the gRPC stream was disconnected",
225                registry,
226            )
227            .unwrap(),
228            total_streaming_connection_failures: register_int_counter_with_registry!(
229                name("total_streaming_connection_failures"),
230                "Total number of failures due to streaming service connection or peek failures",
231                registry,
232            )
233            .unwrap(),
234            latest_ingested_checkpoint: register_int_gauge_with_registry!(
235                name("latest_ingested_checkpoint"),
236                "Latest checkpoint sequence number fetched from the remote store",
237                registry,
238            )
239            .unwrap(),
240            latest_streamed_checkpoint: register_int_gauge_with_registry!(
241                name("latest_streamed_checkpoint"),
242                "Latest checkpoint sequence number received from gRPC streaming",
243                registry,
244            )
245            .unwrap(),
246            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
247                name("latest_ingested_checkpoint_timestamp_lag_ms"),
248                "Difference between the system timestamp when the latest checkpoint was fetched and the \
249                 timestamp in the checkpoint, in milliseconds",
250                registry,
251            )
252            .unwrap(),
253            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
254                name("ingested_checkpoint_timestamp_lag"),
255                "Difference between the system timestamp when a checkpoint was fetched and the \
256                 timestamp in each checkpoint, in seconds",
257                LAG_SEC_BUCKETS.to_vec(),
258                registry,
259            )
260            .unwrap(),
261            ingested_checkpoint_latency: register_histogram_with_registry!(
262                name("ingested_checkpoint_latency"),
263                "Time taken to fetch a checkpoint from the remote store, including retries",
264                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
265                registry,
266            )
267            .unwrap(),
268        })
269    }
270
271    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
272    /// reason and error.
273    pub(crate) fn inc_retry(
274        &self,
275        checkpoint: u64,
276        reason: &str,
277        error: Error,
278    ) -> backoff::Error<Error> {
279        warn!(checkpoint, reason, "Retrying due to error: {error}");
280
281        self.total_ingested_transient_retries
282            .with_label_values(&[reason])
283            .inc();
284
285        backoff::Error::transient(error)
286    }
287}
288
289impl IndexerMetrics {
290    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
291        let prefix = prefix.unwrap_or("indexer");
292        let name = |n| format!("{prefix}_{n}");
293        Arc::new(Self {
294            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
295                name("total_handler_checkpoints_received"),
296                "Total number of checkpoints received by this handler",
297                &["pipeline"],
298                registry,
299            )
300            .unwrap(),
301            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
302                name("total_handler_checkpoints_processed"),
303                "Total number of checkpoints processed (converted into rows) by this handler",
304                &["pipeline"],
305                registry,
306            )
307            .unwrap(),
308            total_handler_rows_created: register_int_counter_vec_with_registry!(
309                name("total_handler_rows_created"),
310                "Total number of rows created by this handler",
311                &["pipeline"],
312                registry,
313            )
314            .unwrap(),
315            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
316                name("latest_processed_checkpoint"),
317                "Latest checkpoint sequence number processed by this handler",
318                &["pipeline"],
319                registry,
320            )
321            .unwrap(),
322            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
323                name("latest_processed_checkpoint_timestamp_lag_ms"),
324                "Difference between the system timestamp when the latest checkpoint was processed and the \
325                 timestamp in the checkpoint, in milliseconds",
326                &["pipeline"],
327                registry,
328            )
329            .unwrap(),
330            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
331                name("processed_checkpoint_timestamp_lag"),
332                "Difference between the system timestamp when a checkpoint was processed and the \
333                 timestamp in each checkpoint, in seconds",
334                &["pipeline"],
335                LAG_SEC_BUCKETS.to_vec(),
336                registry,
337            )
338            .unwrap(),
339            handler_checkpoint_latency: register_histogram_vec_with_registry!(
340                name("handler_checkpoint_latency"),
341                "Time taken to process a checkpoint by this handler",
342                &["pipeline"],
343                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
344                registry,
345            )
346            .unwrap(),
347            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
348                name("total_collector_checkpoints_received"),
349                "Total number of checkpoints received by this collector",
350                &["pipeline"],
351                registry,
352            )
353            .unwrap(),
354            total_collector_rows_received: register_int_counter_vec_with_registry!(
355                name("total_collector_rows_received"),
356                "Total number of rows received by this collector",
357                &["pipeline"],
358                registry,
359            )
360            .unwrap(),
361            total_collector_batches_created: register_int_counter_vec_with_registry!(
362                name("total_collector_batches_created"),
363                "Total number of batches created by this collector",
364                &["pipeline"],
365                registry,
366            )
367            .unwrap(),
368            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
369                name("total_committer_batches_attempted"),
370                "Total number of batches writes attempted by this committer",
371                &["pipeline"],
372                registry,
373            )
374            .unwrap(),
375            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
376                name("total_committer_batches_succeeded"),
377                "Total number of successful batches writes by this committer",
378                &["pipeline"],
379                registry,
380            )
381            .unwrap(),
382            total_committer_batches_failed: register_int_counter_vec_with_registry!(
383                name("total_committer_batches_failed"),
384                "Total number of failed batches writes by this committer",
385                &["pipeline"],
386                registry,
387            )
388            .unwrap(),
389            total_committer_rows_committed: register_int_counter_vec_with_registry!(
390                name("total_committer_rows_committed"),
391                "Total number of rows sent to the database by this committer",
392                &["pipeline"],
393                registry,
394            )
395            .unwrap(),
396            total_committer_rows_affected: register_int_counter_vec_with_registry!(
397                name("total_committer_rows_affected"),
398                "Total number of rows actually written to the database by this committer",
399                &["pipeline"],
400                registry,
401            )
402            .unwrap(),
403            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
404                name("watermark_out_of_order"),
405                "Number of times this committer encountered a batch for a checkpoint before its watermark",
406                &["pipeline"],
407                registry,
408            )
409            .unwrap(),
410            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
411                name("pruner_chunks_attempted"),
412                "Number of chunks this pruner attempted to delete",
413                &["pipeline"],
414                registry,
415            )
416            .unwrap(),
417            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
418                name("pruner_chunks_deleted"),
419                "Number of chunks this pruner successfully deleted",
420                &["pipeline"],
421                registry,
422            )
423            .unwrap(),
424            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
425                name("pruner_rows_deleted"),
426                "Number of rows this pruner successfully deleted",
427                &["pipeline"],
428                registry,
429            )
430            .unwrap(),
431            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
432                name("latest_collected_checkpoint"),
433                "Latest checkpoint sequence number collected by this collector",
434                &["pipeline"],
435                registry,
436            )
437            .unwrap(),
438            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
439                name("latest_collected_checkpoint_timestamp_lag_ms"),
440                "Difference between the system timestamp when the latest checkpoint was collected and the \
441                 timestamp in the checkpoint, in milliseconds",
442                &["pipeline"],
443                registry,
444            )
445            .unwrap(),
446            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
447                name("collected_checkpoint_timestamp_lag"),
448                "Difference between the system timestamp when a checkpoint was collected and the \
449                 timestamp in each checkpoint, in seconds",
450                &["pipeline"],
451                LAG_SEC_BUCKETS.to_vec(),
452                registry,
453            )
454            .unwrap(),
455            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
456                name("latest_partially_committed_checkpoint"),
457                "Latest checkpoint sequence number partially committed by this collector",
458                &["pipeline"],
459                registry,
460            )
461            .unwrap(),
462            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
463                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
464                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
465                 timestamp in the checkpoint, in milliseconds",
466                &["pipeline"],
467                registry,
468            )
469            .unwrap(),
470            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
471                name("partially_committed_checkpoint_timestamp_lag"),
472                "Difference between the system timestamp when a checkpoint was partially committed and the \
473                 timestamp in each checkpoint, in seconds",
474                &["pipeline"],
475                LAG_SEC_BUCKETS.to_vec(),
476                registry,
477            )
478            .unwrap(),
479            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
480                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
481                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
482                 timestamp in the checkpoint, in milliseconds",
483                &["pipeline"],
484                registry,
485            )
486            .unwrap(),
487            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
488                name("watermarked_checkpoint_timestamp_lag"),
489                "Difference between the system timestamp when a checkpoint was watermarked and the \
490                 timestamp in each checkpoint, in seconds",
491                &["pipeline"],
492                LAG_SEC_BUCKETS.to_vec(),
493                registry,
494            )
495            .unwrap(),
496            collector_gather_latency: register_histogram_vec_with_registry!(
497                name("collector_gather_latency"),
498                "Time taken to gather rows into a batch by this collector",
499                &["pipeline"],
500                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
501                registry,
502            )
503            .unwrap(),
504            collector_batch_size: register_histogram_vec_with_registry!(
505                name("collector_batch_size"),
506                "Number of rows in a batch written to the database by this collector",
507                &["pipeline"],
508                BATCH_SIZE_BUCKETS.to_vec(),
509                registry,
510            )
511            .unwrap(),
512            total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
513                name("total_collector_skipped_checkpoints"),
514                "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
515                &["pipeline"],
516                registry,
517            ).unwrap(),
518            collector_reader_lo: register_int_gauge_vec_with_registry!(
519                name("collector_reader_lo"),
520                "Reader low watermark as observed by the collector",
521                &["pipeline"],
522                registry,
523            )
524            .unwrap(),
525            committer_commit_latency: register_histogram_vec_with_registry!(
526                name("committer_commit_latency"),
527                "Time taken to write a batch of rows to the database by this committer",
528                &["pipeline"],
529                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
530                registry,
531            )
532            .unwrap(),
533            committer_tx_rows: register_histogram_vec_with_registry!(
534                name("committer_tx_rows"),
535                "Number of rows written to the database in a single database transaction by this committer",
536                &["pipeline"],
537                BATCH_SIZE_BUCKETS.to_vec(),
538                registry,
539            )
540            .unwrap(),
541            watermark_gather_latency: register_histogram_vec_with_registry!(
542                name("watermark_gather_latency"),
543                "Time taken to calculate the new high watermark after a write by this committer",
544                &["pipeline"],
545                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
546                registry,
547            )
548            .unwrap(),
549            watermark_commit_latency: register_histogram_vec_with_registry!(
550                name("watermark_commit_latency"),
551                "Time taken to write the new high watermark to the database by this committer",
552                &["pipeline"],
553                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
554                registry,
555            )
556            .unwrap(),
557            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
558                name("watermark_pruner_read_latency"),
559                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
560                &["pipeline"],
561                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
562                registry,
563            )
564            .unwrap(),
565            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
566                name("watermark_pruner_write_latency"),
567                "Time taken to write the pruner's new upperbound to the database by this pruner",
568                &["pipeline"],
569                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
570                registry,
571            )
572            .unwrap(),
573            pruner_delete_latency: register_histogram_vec_with_registry!(
574                name("pruner_delete_latency"),
575                "Time taken to delete a chunk of data from the database by this pruner",
576                &["pipeline"],
577                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
578                registry,
579            )
580            .unwrap(),
581            watermark_epoch: register_int_gauge_vec_with_registry!(
582                name("watermark_epoch"),
583                "Current epoch high watermark for this committer",
584                &["pipeline"],
585                registry,
586            )
587            .unwrap(),
588            watermark_checkpoint: register_int_gauge_vec_with_registry!(
589                name("watermark_checkpoint"),
590                "Current checkpoint high watermark for this committer",
591                &["pipeline"],
592                registry,
593            )
594            .unwrap(),
595            watermark_transaction: register_int_gauge_vec_with_registry!(
596                name("watermark_transaction"),
597                "Current transaction high watermark for this committer",
598                &["pipeline"],
599                registry,
600            )
601            .unwrap(),
602            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
603                name("watermark_timestamp_ms"),
604                "Current timestamp high watermark for this committer, in milliseconds",
605                &["pipeline"],
606                registry,
607            )
608            .unwrap(),
609            watermark_reader_lo: register_int_gauge_vec_with_registry!(
610                name("watermark_reader_lo"),
611                "Current reader low watermark for this pruner",
612                &["pipeline"],
613                registry,
614            )
615            .unwrap(),
616            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
617                name("watermark_pruner_hi"),
618                "Current pruner high watermark for this pruner",
619                &["pipeline"],
620                registry,
621            )
622            .unwrap(),
623            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
624                name("watermark_epoch_in_db"),
625                "Last epoch high watermark this committer wrote to the DB",
626                &["pipeline"],
627                registry,
628            )
629            .unwrap(),
630            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
631                name("watermark_checkpoint_in_db"),
632                "Last checkpoint high watermark this committer wrote to the DB",
633                &["pipeline"],
634                registry,
635            )
636            .unwrap(),
637            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
638                name("watermark_transaction_in_db"),
639                "Last transaction high watermark this committer wrote to the DB",
640                &["pipeline"],
641                registry,
642            )
643            .unwrap(),
644            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
645                name("watermark_timestamp_ms_in_db"),
646                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
647                &["pipeline"],
648                registry,
649            )
650            .unwrap(),
651            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
652                name("watermark_reader_lo_in_db"),
653                "Last reader low watermark this pruner wrote to the DB",
654                &["pipeline"],
655                registry,
656            )
657            .unwrap(),
658            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
659                name("watermark_pruner_hi_in_db"),
660                "Last pruner high watermark this pruner wrote to the DB",
661                &["pipeline"],
662                registry,
663            )
664            .unwrap(),
665        })
666    }
667}
668
669impl CheckpointLagMetricReporter {
670    pub fn new(
671        checkpoint_time_lag_histogram: Histogram,
672        latest_checkpoint_time_lag_gauge: IntGauge,
673        latest_checkpoint_sequence_number_gauge: IntGauge,
674    ) -> Arc<Self> {
675        Arc::new(Self {
676            checkpoint_time_lag_histogram,
677            latest_checkpoint_time_lag_gauge,
678            latest_checkpoint_sequence_number_gauge,
679            latest_reported_checkpoint: AtomicU64::new(0),
680        })
681    }
682
683    pub fn new_for_pipeline<P: Processor>(
684        checkpoint_time_lag_histogram: &HistogramVec,
685        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
686        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
687    ) -> Arc<Self> {
688        Self::new(
689            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
690            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
691            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
692        )
693    }
694
695    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
696        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
697        self.checkpoint_time_lag_histogram
698            .observe((lag as f64) / 1000.0);
699
700        let prev = self
701            .latest_reported_checkpoint
702            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
703        if cp_sequence_number > prev {
704            self.latest_checkpoint_sequence_number_gauge
705                .set(cp_sequence_number as i64);
706            self.latest_checkpoint_time_lag_gauge.set(lag);
707        }
708    }
709}
710
711#[cfg(test)]
712pub(crate) mod tests {
713    use std::sync::Arc;
714
715    use prometheus::Registry;
716
717    use super::{IndexerMetrics, IngestionMetrics};
718
719    /// Construct IndexerMetrics for test purposes.
720    pub fn test_metrics() -> Arc<IndexerMetrics> {
721        IndexerMetrics::new(None, &Registry::new())
722    }
723
724    /// Construct IngestionMetrics for test purposes.
725    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
726        IngestionMetrics::new(None, &Registry::new())
727    }
728}