sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6use std::time::Duration;
7
8use prometheus::Histogram;
9use prometheus::HistogramVec;
10use prometheus::IntCounter;
11use prometheus::IntCounterVec;
12use prometheus::IntGauge;
13use prometheus::IntGaugeVec;
14use prometheus::Registry;
15use prometheus::register_histogram_vec_with_registry;
16use prometheus::register_histogram_with_registry;
17use prometheus::register_int_counter_vec_with_registry;
18use prometheus::register_int_counter_with_registry;
19use prometheus::register_int_gauge_vec_with_registry;
20use prometheus::register_int_gauge_with_registry;
21use tracing::warn;
22
23use crate::ingestion::error::Error;
24use crate::pipeline::Processor;
25
26/// Histogram buckets for the distribution of checkpoint fetching latencies.
27const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
28    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
29];
30
31/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
32/// the timestamp in the checkpoint).
33const LAG_SEC_BUCKETS: &[f64] = &[
34    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
35    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
36];
37
38/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
39/// (without having to call out to other services).
40const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
41    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
42];
43
44/// Histogram buckets for the distribution of latencies for writing to the database.
45const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
46    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
47    2000.0, 5000.0, 10000.0,
48];
49
50/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
51const BATCH_SIZE_BUCKETS: &[f64] = &[
52    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
53];
54
55/// Metrics specific to the ingestion service.
56#[derive(Clone)]
57pub struct IngestionMetrics {
58    // Statistics related to fetching data from the remote store.
59    pub total_ingested_checkpoints: IntCounter,
60    pub total_ingested_transactions: IntCounter,
61    pub total_ingested_events: IntCounter,
62    pub total_ingested_objects: IntCounter,
63    pub total_ingested_bytes: IntCounter,
64    pub total_ingested_transient_retries: IntCounterVec,
65    pub total_ingested_not_found_retries: IntCounter,
66    pub total_ingested_permanent_errors: IntCounterVec,
67    pub total_streamed_checkpoints: IntCounter,
68    pub total_skipped_streamed_checkpoints: IntCounter,
69    pub total_out_of_order_streamed_checkpoints: IntCounter,
70    pub total_stream_disconnections: IntCounter,
71    pub total_streaming_connection_failures: IntCounter,
72
73    // Checkpoint lag metrics for the ingestion pipeline.
74    pub latest_ingested_checkpoint: IntGauge,
75    pub latest_streamed_checkpoint: IntGauge,
76    pub latest_skipped_streamed_checkpoint: IntGauge,
77    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
78    pub ingested_checkpoint_timestamp_lag: Histogram,
79
80    pub ingested_checkpoint_latency: Histogram,
81
82    pub ingestion_concurrency_limit: IntGauge,
83    pub ingestion_concurrency_inflight: IntGauge,
84}
85
86#[derive(Clone)]
87pub struct IndexerMetrics {
88    // Statistics related to individual ingestion pipelines' handlers.
89    pub total_handler_checkpoints_received: IntCounterVec,
90    pub total_handler_processor_retries: IntCounterVec,
91    pub total_handler_checkpoints_processed: IntCounterVec,
92    pub total_handler_rows_created: IntCounterVec,
93
94    pub latest_processed_checkpoint: IntGaugeVec,
95    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
96    pub processed_checkpoint_timestamp_lag: HistogramVec,
97
98    pub handler_checkpoint_latency: HistogramVec,
99
100    pub processor_concurrency_limit: IntGaugeVec,
101    pub processor_concurrency_inflight: IntGaugeVec,
102
103    // Statistics related to individual ingestion pipelines.
104    pub total_collector_checkpoints_received: IntCounterVec,
105    pub total_collector_rows_received: IntCounterVec,
106    pub total_collector_batches_created: IntCounterVec,
107    pub total_committer_batches_attempted: IntCounterVec,
108    pub total_committer_batches_succeeded: IntCounterVec,
109    pub total_committer_batches_failed: IntCounterVec,
110    pub total_committer_rows_committed: IntCounterVec,
111    pub total_committer_rows_affected: IntCounterVec,
112    pub total_watermarks_out_of_order: IntCounterVec,
113    pub total_pruner_chunks_attempted: IntCounterVec,
114    pub total_pruner_chunks_deleted: IntCounterVec,
115    pub total_pruner_rows_deleted: IntCounterVec,
116
117    // Checkpoint lag metrics for the collector.
118    pub latest_collected_checkpoint: IntGaugeVec,
119    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
120    pub collected_checkpoint_timestamp_lag: HistogramVec,
121
122    // Checkpoint lag metrics for the committer.
123    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
124    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
125    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
126    pub latest_partially_committed_checkpoint: IntGaugeVec,
127    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
128    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
129
130    // Checkpoint lag metrics for the watermarker.
131    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
132    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
133    // for consistency.
134    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
135    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
136
137    pub collector_gather_latency: HistogramVec,
138    pub collector_batch_size: HistogramVec,
139    pub total_collector_skipped_checkpoints: IntCounterVec,
140    pub collector_reader_lo: IntGaugeVec,
141    pub committer_commit_latency: HistogramVec,
142    pub committer_tx_rows: HistogramVec,
143    pub watermark_gather_latency: HistogramVec,
144    pub watermark_commit_latency: HistogramVec,
145    pub watermark_pruner_read_latency: HistogramVec,
146    pub watermark_pruner_write_latency: HistogramVec,
147    pub pruner_delete_latency: HistogramVec,
148
149    pub watermark_epoch: IntGaugeVec,
150    pub watermark_checkpoint: IntGaugeVec,
151    pub watermark_transaction: IntGaugeVec,
152    pub watermark_timestamp_ms: IntGaugeVec,
153    pub watermark_reader_lo: IntGaugeVec,
154    pub watermark_pruner_hi: IntGaugeVec,
155
156    pub watermark_epoch_in_db: IntGaugeVec,
157    pub watermark_checkpoint_in_db: IntGaugeVec,
158    pub watermark_transaction_in_db: IntGaugeVec,
159    pub watermark_timestamp_in_db_ms: IntGaugeVec,
160    pub watermark_reader_lo_in_db: IntGaugeVec,
161    pub watermark_pruner_hi_in_db: IntGaugeVec,
162}
163
164/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
165pub(crate) struct CheckpointLagMetricReporter {
166    /// Metric to report the lag distribution of each checkpoint.
167    checkpoint_time_lag_histogram: Histogram,
168    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
169    /// This is needed since concurrent pipelines observe checkpoints out of order.
170    latest_checkpoint_time_lag_gauge: IntGauge,
171    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
172    latest_checkpoint_sequence_number_gauge: IntGauge,
173    // Internal state to keep track of the highest checkpoint sequence number reported so far.
174    latest_reported_checkpoint: AtomicU64,
175}
176
177impl IngestionMetrics {
178    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
179        let prefix = prefix.unwrap_or("indexer");
180        let name = |n| format!("{prefix}_{n}");
181        Arc::new(Self {
182            total_ingested_checkpoints: register_int_counter_with_registry!(
183                name("total_ingested_checkpoints"),
184                "Total number of checkpoints fetched from the remote store",
185                registry,
186            )
187            .unwrap(),
188            total_ingested_transactions: register_int_counter_with_registry!(
189                name("total_ingested_transactions"),
190                "Total number of transactions fetched from the remote store",
191                registry,
192            )
193            .unwrap(),
194            total_ingested_events: register_int_counter_with_registry!(
195                name("total_ingested_events"),
196                "Total number of events fetched from the remote store",
197                registry,
198            )
199            .unwrap(),
200            total_ingested_objects: register_int_counter_with_registry!(
201                name("total_ingested_objects"),
202                "Total number of objects in checkpoints fetched from the remote store",
203                registry,
204            )
205            .unwrap(),
206            total_ingested_bytes: register_int_counter_with_registry!(
207                name("total_ingested_bytes"),
208                "Total number of bytes fetched from the remote store, this metric will not \
209                be updated when data are fetched over gRPC.",
210                registry,
211            )
212            .unwrap(),
213            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
214                name("total_ingested_retries"),
215                "Total number of retries due to transient errors while fetching data from the \
216                 remote store",
217                &["reason"],
218                registry,
219            )
220            .unwrap(),
221            total_ingested_not_found_retries: register_int_counter_with_registry!(
222                name("total_ingested_not_found_retries"),
223                "Total number of retries due to the not found errors while fetching data from the \
224                 remote store",
225                registry,
226            )
227            .unwrap(),
228            total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
229                name("total_ingested_permanent_errors"),
230                "Total number of permanent errors encountered while fetching data from the \
231                 remote store, which cause the ingestion service to shutdown",
232                &["reason"],
233                registry,
234            )
235            .unwrap(),
236            total_streamed_checkpoints: register_int_counter_with_registry!(
237                name("total_streamed_checkpoints"),
238                "Total number of checkpoints received from gRPC streaming",
239                registry,
240            )
241            .unwrap(),
242            total_skipped_streamed_checkpoints: register_int_counter_with_registry!(
243                name("total_skipped_streamed_checkpoints"),
244                "Total number of streamed checkpoints skipped because they were already processed",
245                registry,
246            )
247            .unwrap(),
248            total_out_of_order_streamed_checkpoints: register_int_counter_with_registry!(
249                name("total_out_of_order_streamed_checkpoints"),
250                "Total number of streamed checkpoints received out of order",
251                registry,
252            )
253            .unwrap(),
254            total_stream_disconnections: register_int_counter_with_registry!(
255                name("total_stream_disconnections"),
256                "Total number of times the gRPC stream was disconnected",
257                registry,
258            )
259            .unwrap(),
260            total_streaming_connection_failures: register_int_counter_with_registry!(
261                name("total_streaming_connection_failures"),
262                "Total number of failures due to streaming service connection or peek failures",
263                registry,
264            )
265            .unwrap(),
266            latest_ingested_checkpoint: register_int_gauge_with_registry!(
267                name("latest_ingested_checkpoint"),
268                "Latest checkpoint sequence number fetched from the remote store",
269                registry,
270            )
271            .unwrap(),
272            latest_streamed_checkpoint: register_int_gauge_with_registry!(
273                name("latest_streamed_checkpoint"),
274                "Latest checkpoint sequence number received from gRPC streaming",
275                registry,
276            )
277            .unwrap(),
278            latest_skipped_streamed_checkpoint: register_int_gauge_with_registry!(
279                name("latest_skipped_streamed_checkpoint"),
280                "Latest streamed checkpoint sequence number skipped because it was already processed",
281                registry,
282            )
283            .unwrap(),
284            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
285                name("latest_ingested_checkpoint_timestamp_lag_ms"),
286                "Difference between the system timestamp when the latest checkpoint was fetched and the \
287                 timestamp in the checkpoint, in milliseconds",
288                registry,
289            )
290            .unwrap(),
291            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
292                name("ingested_checkpoint_timestamp_lag"),
293                "Difference between the system timestamp when a checkpoint was fetched and the \
294                 timestamp in each checkpoint, in seconds",
295                LAG_SEC_BUCKETS.to_vec(),
296                registry,
297            )
298            .unwrap(),
299            ingested_checkpoint_latency: register_histogram_with_registry!(
300                name("ingested_checkpoint_latency"),
301                "Time taken to fetch a checkpoint from the remote store, including retries",
302                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
303                registry,
304            )
305            .unwrap(),
306            ingestion_concurrency_limit: register_int_gauge_with_registry!(
307                name("ingestion_concurrency_limit"),
308                "Current adaptive concurrency limit for checkpoint ingestion",
309                registry,
310            )
311            .unwrap(),
312            ingestion_concurrency_inflight: register_int_gauge_with_registry!(
313                name("ingestion_concurrency_inflight"),
314                "Current number of in-flight checkpoint ingestion tasks",
315                registry,
316            )
317            .unwrap(),
318        })
319    }
320
321    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
322    /// reason and error.
323    pub(crate) fn inc_retry(
324        &self,
325        checkpoint: u64,
326        reason: &str,
327        error: Error,
328    ) -> backoff::Error<Error> {
329        warn!(checkpoint, reason, "Retrying due to error: {error}");
330
331        self.total_ingested_transient_retries
332            .with_label_values(&[reason])
333            .inc();
334
335        backoff::Error::transient(error)
336    }
337}
338
339impl IndexerMetrics {
340    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
341        let prefix = prefix.unwrap_or("indexer");
342        let name = |n| format!("{prefix}_{n}");
343        Arc::new(Self {
344            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
345                name("total_handler_checkpoints_received"),
346                "Total number of checkpoints received by this handler",
347                &["pipeline"],
348                registry,
349            )
350            .unwrap(),
351            total_handler_processor_retries: register_int_counter_vec_with_registry!(
352                name("total_handler_processor_retries"),
353                "Total number of handler retries after transient processing failures",
354                &["pipeline"],
355                registry,
356            )
357            .unwrap(),
358            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
359                name("total_handler_checkpoints_processed"),
360                "Total number of checkpoints processed (converted into rows) by this handler",
361                &["pipeline"],
362                registry,
363            )
364            .unwrap(),
365            total_handler_rows_created: register_int_counter_vec_with_registry!(
366                name("total_handler_rows_created"),
367                "Total number of rows created by this handler",
368                &["pipeline"],
369                registry,
370            )
371            .unwrap(),
372            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
373                name("latest_processed_checkpoint"),
374                "Latest checkpoint sequence number processed by this handler",
375                &["pipeline"],
376                registry,
377            )
378            .unwrap(),
379            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
380                name("latest_processed_checkpoint_timestamp_lag_ms"),
381                "Difference between the system timestamp when the latest checkpoint was processed and the \
382                 timestamp in the checkpoint, in milliseconds",
383                &["pipeline"],
384                registry,
385            )
386            .unwrap(),
387            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
388                name("processed_checkpoint_timestamp_lag"),
389                "Difference between the system timestamp when a checkpoint was processed and the \
390                 timestamp in each checkpoint, in seconds",
391                &["pipeline"],
392                LAG_SEC_BUCKETS.to_vec(),
393                registry,
394            )
395            .unwrap(),
396            handler_checkpoint_latency: register_histogram_vec_with_registry!(
397                name("handler_checkpoint_latency"),
398                "Time taken to process a checkpoint by this handler",
399                &["pipeline"],
400                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
401                registry,
402            )
403            .unwrap(),
404            processor_concurrency_limit: register_int_gauge_vec_with_registry!(
405                name("processor_concurrency_limit"),
406                "Current adaptive concurrency limit for this processor",
407                &["pipeline"],
408                registry,
409            )
410            .unwrap(),
411            processor_concurrency_inflight: register_int_gauge_vec_with_registry!(
412                name("processor_concurrency_inflight"),
413                "Current number of in-flight processor tasks for this pipeline",
414                &["pipeline"],
415                registry,
416            )
417            .unwrap(),
418            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
419                name("total_collector_checkpoints_received"),
420                "Total number of checkpoints received by this collector",
421                &["pipeline"],
422                registry,
423            )
424            .unwrap(),
425            total_collector_rows_received: register_int_counter_vec_with_registry!(
426                name("total_collector_rows_received"),
427                "Total number of rows received by this collector",
428                &["pipeline"],
429                registry,
430            )
431            .unwrap(),
432            total_collector_batches_created: register_int_counter_vec_with_registry!(
433                name("total_collector_batches_created"),
434                "Total number of batches created by this collector",
435                &["pipeline"],
436                registry,
437            )
438            .unwrap(),
439            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
440                name("total_committer_batches_attempted"),
441                "Total number of batches writes attempted by this committer",
442                &["pipeline"],
443                registry,
444            )
445            .unwrap(),
446            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
447                name("total_committer_batches_succeeded"),
448                "Total number of successful batches writes by this committer",
449                &["pipeline"],
450                registry,
451            )
452            .unwrap(),
453            total_committer_batches_failed: register_int_counter_vec_with_registry!(
454                name("total_committer_batches_failed"),
455                "Total number of failed batches writes by this committer",
456                &["pipeline"],
457                registry,
458            )
459            .unwrap(),
460            total_committer_rows_committed: register_int_counter_vec_with_registry!(
461                name("total_committer_rows_committed"),
462                "Total number of rows sent to the database by this committer",
463                &["pipeline"],
464                registry,
465            )
466            .unwrap(),
467            total_committer_rows_affected: register_int_counter_vec_with_registry!(
468                name("total_committer_rows_affected"),
469                "Total number of rows actually written to the database by this committer",
470                &["pipeline"],
471                registry,
472            )
473            .unwrap(),
474            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
475                name("watermark_out_of_order"),
476                "Number of times this committer encountered a batch for a checkpoint before its watermark",
477                &["pipeline"],
478                registry,
479            )
480            .unwrap(),
481            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
482                name("pruner_chunks_attempted"),
483                "Number of chunks this pruner attempted to delete",
484                &["pipeline"],
485                registry,
486            )
487            .unwrap(),
488            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
489                name("pruner_chunks_deleted"),
490                "Number of chunks this pruner successfully deleted",
491                &["pipeline"],
492                registry,
493            )
494            .unwrap(),
495            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
496                name("pruner_rows_deleted"),
497                "Number of rows this pruner successfully deleted",
498                &["pipeline"],
499                registry,
500            )
501            .unwrap(),
502            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
503                name("latest_collected_checkpoint"),
504                "Latest checkpoint sequence number collected by this collector",
505                &["pipeline"],
506                registry,
507            )
508            .unwrap(),
509            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
510                name("latest_collected_checkpoint_timestamp_lag_ms"),
511                "Difference between the system timestamp when the latest checkpoint was collected and the \
512                 timestamp in the checkpoint, in milliseconds",
513                &["pipeline"],
514                registry,
515            )
516            .unwrap(),
517            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
518                name("collected_checkpoint_timestamp_lag"),
519                "Difference between the system timestamp when a checkpoint was collected and the \
520                 timestamp in each checkpoint, in seconds",
521                &["pipeline"],
522                LAG_SEC_BUCKETS.to_vec(),
523                registry,
524            )
525            .unwrap(),
526            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
527                name("latest_partially_committed_checkpoint"),
528                "Latest checkpoint sequence number partially committed by this collector",
529                &["pipeline"],
530                registry,
531            )
532            .unwrap(),
533            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
534                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
535                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
536                 timestamp in the checkpoint, in milliseconds",
537                &["pipeline"],
538                registry,
539            )
540            .unwrap(),
541            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
542                name("partially_committed_checkpoint_timestamp_lag"),
543                "Difference between the system timestamp when a checkpoint was partially committed and the \
544                 timestamp in each checkpoint, in seconds",
545                &["pipeline"],
546                LAG_SEC_BUCKETS.to_vec(),
547                registry,
548            )
549            .unwrap(),
550            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
551                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
552                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
553                 timestamp in the checkpoint, in milliseconds",
554                &["pipeline"],
555                registry,
556            )
557            .unwrap(),
558            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
559                name("watermarked_checkpoint_timestamp_lag"),
560                "Difference between the system timestamp when a checkpoint was watermarked and the \
561                 timestamp in each checkpoint, in seconds",
562                &["pipeline"],
563                LAG_SEC_BUCKETS.to_vec(),
564                registry,
565            )
566            .unwrap(),
567            collector_gather_latency: register_histogram_vec_with_registry!(
568                name("collector_gather_latency"),
569                "Time taken to gather rows into a batch by this collector",
570                &["pipeline"],
571                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
572                registry,
573            )
574            .unwrap(),
575            collector_batch_size: register_histogram_vec_with_registry!(
576                name("collector_batch_size"),
577                "Number of rows in a batch written to the database by this collector",
578                &["pipeline"],
579                BATCH_SIZE_BUCKETS.to_vec(),
580                registry,
581            )
582            .unwrap(),
583            total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
584                name("total_collector_skipped_checkpoints"),
585                "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
586                &["pipeline"],
587                registry,
588            ).unwrap(),
589            collector_reader_lo: register_int_gauge_vec_with_registry!(
590                name("collector_reader_lo"),
591                "Reader low watermark as observed by the collector",
592                &["pipeline"],
593                registry,
594            )
595            .unwrap(),
596            committer_commit_latency: register_histogram_vec_with_registry!(
597                name("committer_commit_latency"),
598                "Time taken to write a batch of rows to the database by this committer",
599                &["pipeline"],
600                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
601                registry,
602            )
603            .unwrap(),
604            committer_tx_rows: register_histogram_vec_with_registry!(
605                name("committer_tx_rows"),
606                "Number of rows written to the database in a single database transaction by this committer",
607                &["pipeline"],
608                BATCH_SIZE_BUCKETS.to_vec(),
609                registry,
610            )
611            .unwrap(),
612            watermark_gather_latency: register_histogram_vec_with_registry!(
613                name("watermark_gather_latency"),
614                "Time taken to calculate the new high watermark after a write by this committer",
615                &["pipeline"],
616                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
617                registry,
618            )
619            .unwrap(),
620            watermark_commit_latency: register_histogram_vec_with_registry!(
621                name("watermark_commit_latency"),
622                "Time taken to write the new high watermark to the database by this committer",
623                &["pipeline"],
624                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
625                registry,
626            )
627            .unwrap(),
628            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
629                name("watermark_pruner_read_latency"),
630                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
631                &["pipeline"],
632                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
633                registry,
634            )
635            .unwrap(),
636            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
637                name("watermark_pruner_write_latency"),
638                "Time taken to write the pruner's new upperbound to the database by this pruner",
639                &["pipeline"],
640                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
641                registry,
642            )
643            .unwrap(),
644            pruner_delete_latency: register_histogram_vec_with_registry!(
645                name("pruner_delete_latency"),
646                "Time taken to delete a chunk of data from the database by this pruner",
647                &["pipeline"],
648                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
649                registry,
650            )
651            .unwrap(),
652            watermark_epoch: register_int_gauge_vec_with_registry!(
653                name("watermark_epoch"),
654                "Current epoch high watermark for this committer",
655                &["pipeline"],
656                registry,
657            )
658            .unwrap(),
659            watermark_checkpoint: register_int_gauge_vec_with_registry!(
660                name("watermark_checkpoint"),
661                "Current checkpoint high watermark for this committer",
662                &["pipeline"],
663                registry,
664            )
665            .unwrap(),
666            watermark_transaction: register_int_gauge_vec_with_registry!(
667                name("watermark_transaction"),
668                "Current transaction high watermark for this committer",
669                &["pipeline"],
670                registry,
671            )
672            .unwrap(),
673            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
674                name("watermark_timestamp_ms"),
675                "Current timestamp high watermark for this committer, in milliseconds",
676                &["pipeline"],
677                registry,
678            )
679            .unwrap(),
680            watermark_reader_lo: register_int_gauge_vec_with_registry!(
681                name("watermark_reader_lo"),
682                "Current reader low watermark for this pruner",
683                &["pipeline"],
684                registry,
685            )
686            .unwrap(),
687            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
688                name("watermark_pruner_hi"),
689                "Current pruner high watermark for this pruner",
690                &["pipeline"],
691                registry,
692            )
693            .unwrap(),
694            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
695                name("watermark_epoch_in_db"),
696                "Last epoch high watermark this committer wrote to the DB",
697                &["pipeline"],
698                registry,
699            )
700            .unwrap(),
701            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
702                name("watermark_checkpoint_in_db"),
703                "Last checkpoint high watermark this committer wrote to the DB",
704                &["pipeline"],
705                registry,
706            )
707            .unwrap(),
708            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
709                name("watermark_transaction_in_db"),
710                "Last transaction high watermark this committer wrote to the DB",
711                &["pipeline"],
712                registry,
713            )
714            .unwrap(),
715            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
716                name("watermark_timestamp_ms_in_db"),
717                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
718                &["pipeline"],
719                registry,
720            )
721            .unwrap(),
722            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
723                name("watermark_reader_lo_in_db"),
724                "Last reader low watermark this pruner wrote to the DB",
725                &["pipeline"],
726                registry,
727            )
728            .unwrap(),
729            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
730                name("watermark_pruner_hi_in_db"),
731                "Last pruner high watermark this pruner wrote to the DB",
732                &["pipeline"],
733                registry,
734            )
735            .unwrap(),
736        })
737    }
738
739    pub(crate) fn inc_processor_retry<P: Processor>(
740        &self,
741        checkpoint: u64,
742        error: &anyhow::Error,
743        delay: Duration,
744    ) {
745        warn!(
746            pipeline = P::NAME,
747            checkpoint,
748            retry_delay_ms = delay.as_millis(),
749            "Retrying processor after error: {error:?}",
750        );
751
752        self.total_handler_processor_retries
753            .with_label_values(&[P::NAME])
754            .inc();
755    }
756}
757
758impl CheckpointLagMetricReporter {
759    pub fn new(
760        checkpoint_time_lag_histogram: Histogram,
761        latest_checkpoint_time_lag_gauge: IntGauge,
762        latest_checkpoint_sequence_number_gauge: IntGauge,
763    ) -> Arc<Self> {
764        Arc::new(Self {
765            checkpoint_time_lag_histogram,
766            latest_checkpoint_time_lag_gauge,
767            latest_checkpoint_sequence_number_gauge,
768            latest_reported_checkpoint: AtomicU64::new(0),
769        })
770    }
771
772    pub fn new_for_pipeline<P: Processor>(
773        checkpoint_time_lag_histogram: &HistogramVec,
774        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
775        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
776    ) -> Arc<Self> {
777        Self::new(
778            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
779            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
780            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
781        )
782    }
783
784    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
785        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
786        self.checkpoint_time_lag_histogram
787            .observe((lag as f64) / 1000.0);
788
789        let prev = self
790            .latest_reported_checkpoint
791            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
792        if cp_sequence_number > prev {
793            self.latest_checkpoint_sequence_number_gauge
794                .set(cp_sequence_number as i64);
795            self.latest_checkpoint_time_lag_gauge.set(lag);
796        }
797    }
798}
799
800#[cfg(test)]
801pub(crate) mod tests {
802    use std::sync::Arc;
803
804    use prometheus::Registry;
805
806    use super::*;
807
808    /// Construct IndexerMetrics for test purposes.
809    pub fn test_metrics() -> Arc<IndexerMetrics> {
810        IndexerMetrics::new(None, &Registry::new())
811    }
812
813    /// Construct IngestionMetrics for test purposes.
814    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
815        IngestionMetrics::new(None, &Registry::new())
816    }
817}