sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6use std::time::Duration;
7
8use prometheus::Histogram;
9use prometheus::HistogramVec;
10use prometheus::IntCounter;
11use prometheus::IntCounterVec;
12use prometheus::IntGauge;
13use prometheus::IntGaugeVec;
14use prometheus::Registry;
15use prometheus::register_histogram_vec_with_registry;
16use prometheus::register_histogram_with_registry;
17use prometheus::register_int_counter_vec_with_registry;
18use prometheus::register_int_counter_with_registry;
19use prometheus::register_int_gauge_vec_with_registry;
20use prometheus::register_int_gauge_with_registry;
21use tracing::warn;
22
23use crate::ingestion::error::Error;
24use crate::pipeline::Processor;
25
26/// Histogram buckets for the distribution of checkpoint fetching latencies.
27const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
28    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
29];
30
31/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
32/// the timestamp in the checkpoint).
33const LAG_SEC_BUCKETS: &[f64] = &[
34    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
35    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
36];
37
38/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
39/// (without having to call out to other services).
40const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
41    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
42];
43
44/// Histogram buckets for the distribution of latencies for writing to the database.
45const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
46    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
47    2000.0, 5000.0, 10000.0,
48];
49
50/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
51const BATCH_SIZE_BUCKETS: &[f64] = &[
52    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
53];
54
55/// Metrics specific to the ingestion service.
56#[derive(Clone)]
57pub struct IngestionMetrics {
58    // Statistics related to fetching data from the remote store.
59    pub total_ingested_checkpoints: IntCounter,
60    pub total_ingested_transactions: IntCounter,
61    pub total_ingested_events: IntCounter,
62    pub total_ingested_objects: IntCounter,
63    pub total_ingested_bytes: IntCounter,
64    pub total_ingested_transient_retries: IntCounterVec,
65    pub total_ingested_not_found_retries: IntCounter,
66    pub total_streamed_checkpoints: IntCounter,
67    pub total_skipped_streamed_checkpoints: IntCounter,
68    pub total_out_of_order_streamed_checkpoints: IntCounter,
69    pub total_stream_disconnections: IntCounter,
70    pub total_streaming_connection_failures: IntCounter,
71
72    // Checkpoint lag metrics for the ingestion pipeline.
73    pub latest_ingested_checkpoint: IntGauge,
74    pub latest_streamed_checkpoint: IntGauge,
75    pub latest_skipped_streamed_checkpoint: IntGauge,
76    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
77    pub ingested_checkpoint_timestamp_lag: Histogram,
78
79    pub ingested_checkpoint_latency: Histogram,
80    pub ingested_chain_id_latency: Histogram,
81    pub ingested_latest_checkpoint_latency: Histogram,
82
83    pub ingestion_concurrency_limit: IntGauge,
84    pub ingestion_concurrency_inflight: IntGauge,
85}
86
87#[derive(Clone)]
88pub struct IndexerMetrics {
89    // Statistics related to individual ingestion pipelines' handlers.
90    pub total_handler_checkpoints_received: IntCounterVec,
91    pub total_handler_processor_retries: IntCounterVec,
92    pub total_handler_checkpoints_processed: IntCounterVec,
93    pub total_handler_rows_created: IntCounterVec,
94
95    pub latest_processed_checkpoint: IntGaugeVec,
96    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
97    pub processed_checkpoint_timestamp_lag: HistogramVec,
98
99    pub handler_checkpoint_latency: HistogramVec,
100
101    pub processor_concurrency_limit: IntGaugeVec,
102    pub processor_concurrency_inflight: IntGaugeVec,
103
104    // Statistics related to individual ingestion pipelines.
105    pub total_collector_checkpoints_received: IntCounterVec,
106    pub total_collector_rows_received: IntCounterVec,
107    pub total_collector_batches_created: IntCounterVec,
108    pub total_committer_batches_attempted: IntCounterVec,
109    pub total_committer_batches_succeeded: IntCounterVec,
110    pub total_committer_batches_failed: IntCounterVec,
111    pub total_committer_rows_committed: IntCounterVec,
112    pub total_committer_rows_affected: IntCounterVec,
113    pub total_watermarks_out_of_order: IntCounterVec,
114    pub total_pruner_chunks_attempted: IntCounterVec,
115    pub total_pruner_chunks_deleted: IntCounterVec,
116    pub total_pruner_rows_deleted: IntCounterVec,
117
118    // Checkpoint lag metrics for the collector.
119    pub latest_collected_checkpoint: IntGaugeVec,
120    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
121    pub collected_checkpoint_timestamp_lag: HistogramVec,
122
123    // Checkpoint lag metrics for the committer.
124    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
125    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
126    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
127    pub latest_partially_committed_checkpoint: IntGaugeVec,
128    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
129    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
130
131    // Checkpoint lag metrics for the watermarker.
132    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
133    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
134    // for consistency.
135    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
136    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
137
138    pub collector_gather_latency: HistogramVec,
139    pub collector_batch_size: HistogramVec,
140    pub total_collector_skipped_checkpoints: IntCounterVec,
141    pub collector_reader_lo: IntGaugeVec,
142    pub committer_commit_latency: HistogramVec,
143    pub committer_tx_rows: HistogramVec,
144    pub watermark_gather_latency: HistogramVec,
145    pub watermark_commit_latency: HistogramVec,
146    pub watermark_pruner_read_latency: HistogramVec,
147    pub watermark_pruner_write_latency: HistogramVec,
148    pub pruner_delete_latency: HistogramVec,
149
150    pub watermark_epoch: IntGaugeVec,
151    pub watermark_checkpoint: IntGaugeVec,
152    pub watermark_transaction: IntGaugeVec,
153    pub watermark_timestamp_ms: IntGaugeVec,
154    pub watermark_reader_lo: IntGaugeVec,
155    pub watermark_pruner_hi: IntGaugeVec,
156
157    pub watermark_epoch_in_db: IntGaugeVec,
158    pub watermark_checkpoint_in_db: IntGaugeVec,
159    pub watermark_transaction_in_db: IntGaugeVec,
160    pub watermark_timestamp_in_db_ms: IntGaugeVec,
161    pub watermark_reader_lo_in_db: IntGaugeVec,
162    pub watermark_pruner_hi_in_db: IntGaugeVec,
163}
164
165/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
166pub(crate) struct CheckpointLagMetricReporter {
167    /// Metric to report the lag distribution of each checkpoint.
168    checkpoint_time_lag_histogram: Histogram,
169    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
170    /// This is needed since concurrent pipelines observe checkpoints out of order.
171    latest_checkpoint_time_lag_gauge: IntGauge,
172    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
173    latest_checkpoint_sequence_number_gauge: IntGauge,
174    // Internal state to keep track of the highest checkpoint sequence number reported so far.
175    latest_reported_checkpoint: AtomicU64,
176}
177
178impl IngestionMetrics {
179    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
180        let prefix = prefix.unwrap_or("indexer");
181        let name = |n| format!("{prefix}_{n}");
182        Arc::new(Self {
183            total_ingested_checkpoints: register_int_counter_with_registry!(
184                name("total_ingested_checkpoints"),
185                "Total number of checkpoints fetched from the remote store",
186                registry,
187            )
188            .unwrap(),
189            total_ingested_transactions: register_int_counter_with_registry!(
190                name("total_ingested_transactions"),
191                "Total number of transactions fetched from the remote store",
192                registry,
193            )
194            .unwrap(),
195            total_ingested_events: register_int_counter_with_registry!(
196                name("total_ingested_events"),
197                "Total number of events fetched from the remote store",
198                registry,
199            )
200            .unwrap(),
201            total_ingested_objects: register_int_counter_with_registry!(
202                name("total_ingested_objects"),
203                "Total number of objects in checkpoints fetched from the remote store",
204                registry,
205            )
206            .unwrap(),
207            total_ingested_bytes: register_int_counter_with_registry!(
208                name("total_ingested_bytes"),
209                "Total number of bytes fetched from the remote store",
210                registry,
211            )
212            .unwrap(),
213            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
214                name("total_ingested_retries"),
215                "Total number of retries due to transient errors while fetching data from the \
216                 remote store",
217                &["reason"],
218                registry,
219            )
220            .unwrap(),
221            total_ingested_not_found_retries: register_int_counter_with_registry!(
222                name("total_ingested_not_found_retries"),
223                "Total number of retries due to the not found errors while fetching data from the \
224                 remote store",
225                registry,
226            )
227            .unwrap(),
228            total_streamed_checkpoints: register_int_counter_with_registry!(
229                name("total_streamed_checkpoints"),
230                "Total number of checkpoints received from gRPC streaming",
231                registry,
232            )
233            .unwrap(),
234            total_skipped_streamed_checkpoints: register_int_counter_with_registry!(
235                name("total_skipped_streamed_checkpoints"),
236                "Total number of streamed checkpoints skipped because they were already processed",
237                registry,
238            )
239            .unwrap(),
240            total_out_of_order_streamed_checkpoints: register_int_counter_with_registry!(
241                name("total_out_of_order_streamed_checkpoints"),
242                "Total number of streamed checkpoints received out of order",
243                registry,
244            )
245            .unwrap(),
246            total_stream_disconnections: register_int_counter_with_registry!(
247                name("total_stream_disconnections"),
248                "Total number of times the gRPC stream was disconnected",
249                registry,
250            )
251            .unwrap(),
252            total_streaming_connection_failures: register_int_counter_with_registry!(
253                name("total_streaming_connection_failures"),
254                "Total number of failures due to streaming service connection or peek failures",
255                registry,
256            )
257            .unwrap(),
258            latest_ingested_checkpoint: register_int_gauge_with_registry!(
259                name("latest_ingested_checkpoint"),
260                "Latest checkpoint sequence number fetched from the remote store",
261                registry,
262            )
263            .unwrap(),
264            latest_streamed_checkpoint: register_int_gauge_with_registry!(
265                name("latest_streamed_checkpoint"),
266                "Latest checkpoint sequence number received from gRPC streaming",
267                registry,
268            )
269            .unwrap(),
270            latest_skipped_streamed_checkpoint: register_int_gauge_with_registry!(
271                name("latest_skipped_streamed_checkpoint"),
272                "Latest streamed checkpoint sequence number skipped because it was already processed",
273                registry,
274            )
275            .unwrap(),
276            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
277                name("latest_ingested_checkpoint_timestamp_lag_ms"),
278                "Difference between the system timestamp when the latest checkpoint was fetched and the \
279                 timestamp in the checkpoint, in milliseconds",
280                registry,
281            )
282            .unwrap(),
283            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
284                name("ingested_checkpoint_timestamp_lag"),
285                "Difference between the system timestamp when a checkpoint was fetched and the \
286                 timestamp in each checkpoint, in seconds",
287                LAG_SEC_BUCKETS.to_vec(),
288                registry,
289            )
290            .unwrap(),
291            ingested_checkpoint_latency: register_histogram_with_registry!(
292                name("ingested_checkpoint_latency"),
293                "Time taken to fetch a checkpoint from the remote store, including retries",
294                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
295                registry,
296            )
297            .unwrap(),
298            ingested_chain_id_latency: register_histogram_with_registry!(
299                name("ingested_chain_id_latency"),
300                "Time taken to fetch the chain identifier, including retries",
301                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
302                registry,
303            )
304            .unwrap(),
305            ingested_latest_checkpoint_latency: register_histogram_with_registry!(
306                name("ingested_latest_checkpoint_latency"),
307                "Time taken to fetch the latest checkpoint number, including retries",
308                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
309                registry,
310            )
311            .unwrap(),
312            ingestion_concurrency_limit: register_int_gauge_with_registry!(
313                name("ingestion_concurrency_limit"),
314                "Current adaptive concurrency limit for checkpoint ingestion",
315                registry,
316            )
317            .unwrap(),
318            ingestion_concurrency_inflight: register_int_gauge_with_registry!(
319                name("ingestion_concurrency_inflight"),
320                "Current number of in-flight checkpoint ingestion tasks",
321                registry,
322            )
323            .unwrap(),
324        })
325    }
326
327    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
328    /// reason and error.
329    pub(crate) fn inc_retry(
330        &self,
331        checkpoint: u64,
332        reason: &str,
333        error: Error,
334    ) -> backoff::Error<Error> {
335        warn!(checkpoint, reason, "Retrying due to error: {error}");
336
337        self.total_ingested_transient_retries
338            .with_label_values(&[reason])
339            .inc();
340
341        backoff::Error::transient(error)
342    }
343}
344
345impl IndexerMetrics {
346    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
347        let prefix = prefix.unwrap_or("indexer");
348        let name = |n| format!("{prefix}_{n}");
349        Arc::new(Self {
350            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
351                name("total_handler_checkpoints_received"),
352                "Total number of checkpoints received by this handler",
353                &["pipeline"],
354                registry,
355            )
356            .unwrap(),
357            total_handler_processor_retries: register_int_counter_vec_with_registry!(
358                name("total_handler_processor_retries"),
359                "Total number of handler retries after transient processing failures",
360                &["pipeline"],
361                registry,
362            )
363            .unwrap(),
364            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
365                name("total_handler_checkpoints_processed"),
366                "Total number of checkpoints processed (converted into rows) by this handler",
367                &["pipeline"],
368                registry,
369            )
370            .unwrap(),
371            total_handler_rows_created: register_int_counter_vec_with_registry!(
372                name("total_handler_rows_created"),
373                "Total number of rows created by this handler",
374                &["pipeline"],
375                registry,
376            )
377            .unwrap(),
378            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
379                name("latest_processed_checkpoint"),
380                "Latest checkpoint sequence number processed by this handler",
381                &["pipeline"],
382                registry,
383            )
384            .unwrap(),
385            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
386                name("latest_processed_checkpoint_timestamp_lag_ms"),
387                "Difference between the system timestamp when the latest checkpoint was processed and the \
388                 timestamp in the checkpoint, in milliseconds",
389                &["pipeline"],
390                registry,
391            )
392            .unwrap(),
393            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
394                name("processed_checkpoint_timestamp_lag"),
395                "Difference between the system timestamp when a checkpoint was processed and the \
396                 timestamp in each checkpoint, in seconds",
397                &["pipeline"],
398                LAG_SEC_BUCKETS.to_vec(),
399                registry,
400            )
401            .unwrap(),
402            handler_checkpoint_latency: register_histogram_vec_with_registry!(
403                name("handler_checkpoint_latency"),
404                "Time taken to process a checkpoint by this handler",
405                &["pipeline"],
406                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
407                registry,
408            )
409            .unwrap(),
410            processor_concurrency_limit: register_int_gauge_vec_with_registry!(
411                name("processor_concurrency_limit"),
412                "Current adaptive concurrency limit for this processor",
413                &["pipeline"],
414                registry,
415            )
416            .unwrap(),
417            processor_concurrency_inflight: register_int_gauge_vec_with_registry!(
418                name("processor_concurrency_inflight"),
419                "Current number of in-flight processor tasks for this pipeline",
420                &["pipeline"],
421                registry,
422            )
423            .unwrap(),
424            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
425                name("total_collector_checkpoints_received"),
426                "Total number of checkpoints received by this collector",
427                &["pipeline"],
428                registry,
429            )
430            .unwrap(),
431            total_collector_rows_received: register_int_counter_vec_with_registry!(
432                name("total_collector_rows_received"),
433                "Total number of rows received by this collector",
434                &["pipeline"],
435                registry,
436            )
437            .unwrap(),
438            total_collector_batches_created: register_int_counter_vec_with_registry!(
439                name("total_collector_batches_created"),
440                "Total number of batches created by this collector",
441                &["pipeline"],
442                registry,
443            )
444            .unwrap(),
445            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
446                name("total_committer_batches_attempted"),
447                "Total number of batches writes attempted by this committer",
448                &["pipeline"],
449                registry,
450            )
451            .unwrap(),
452            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
453                name("total_committer_batches_succeeded"),
454                "Total number of successful batches writes by this committer",
455                &["pipeline"],
456                registry,
457            )
458            .unwrap(),
459            total_committer_batches_failed: register_int_counter_vec_with_registry!(
460                name("total_committer_batches_failed"),
461                "Total number of failed batches writes by this committer",
462                &["pipeline"],
463                registry,
464            )
465            .unwrap(),
466            total_committer_rows_committed: register_int_counter_vec_with_registry!(
467                name("total_committer_rows_committed"),
468                "Total number of rows sent to the database by this committer",
469                &["pipeline"],
470                registry,
471            )
472            .unwrap(),
473            total_committer_rows_affected: register_int_counter_vec_with_registry!(
474                name("total_committer_rows_affected"),
475                "Total number of rows actually written to the database by this committer",
476                &["pipeline"],
477                registry,
478            )
479            .unwrap(),
480            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
481                name("watermark_out_of_order"),
482                "Number of times this committer encountered a batch for a checkpoint before its watermark",
483                &["pipeline"],
484                registry,
485            )
486            .unwrap(),
487            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
488                name("pruner_chunks_attempted"),
489                "Number of chunks this pruner attempted to delete",
490                &["pipeline"],
491                registry,
492            )
493            .unwrap(),
494            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
495                name("pruner_chunks_deleted"),
496                "Number of chunks this pruner successfully deleted",
497                &["pipeline"],
498                registry,
499            )
500            .unwrap(),
501            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
502                name("pruner_rows_deleted"),
503                "Number of rows this pruner successfully deleted",
504                &["pipeline"],
505                registry,
506            )
507            .unwrap(),
508            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
509                name("latest_collected_checkpoint"),
510                "Latest checkpoint sequence number collected by this collector",
511                &["pipeline"],
512                registry,
513            )
514            .unwrap(),
515            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
516                name("latest_collected_checkpoint_timestamp_lag_ms"),
517                "Difference between the system timestamp when the latest checkpoint was collected and the \
518                 timestamp in the checkpoint, in milliseconds",
519                &["pipeline"],
520                registry,
521            )
522            .unwrap(),
523            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
524                name("collected_checkpoint_timestamp_lag"),
525                "Difference between the system timestamp when a checkpoint was collected and the \
526                 timestamp in each checkpoint, in seconds",
527                &["pipeline"],
528                LAG_SEC_BUCKETS.to_vec(),
529                registry,
530            )
531            .unwrap(),
532            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
533                name("latest_partially_committed_checkpoint"),
534                "Latest checkpoint sequence number partially committed by this collector",
535                &["pipeline"],
536                registry,
537            )
538            .unwrap(),
539            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
540                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
541                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
542                 timestamp in the checkpoint, in milliseconds",
543                &["pipeline"],
544                registry,
545            )
546            .unwrap(),
547            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
548                name("partially_committed_checkpoint_timestamp_lag"),
549                "Difference between the system timestamp when a checkpoint was partially committed and the \
550                 timestamp in each checkpoint, in seconds",
551                &["pipeline"],
552                LAG_SEC_BUCKETS.to_vec(),
553                registry,
554            )
555            .unwrap(),
556            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
557                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
558                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
559                 timestamp in the checkpoint, in milliseconds",
560                &["pipeline"],
561                registry,
562            )
563            .unwrap(),
564            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
565                name("watermarked_checkpoint_timestamp_lag"),
566                "Difference between the system timestamp when a checkpoint was watermarked and the \
567                 timestamp in each checkpoint, in seconds",
568                &["pipeline"],
569                LAG_SEC_BUCKETS.to_vec(),
570                registry,
571            )
572            .unwrap(),
573            collector_gather_latency: register_histogram_vec_with_registry!(
574                name("collector_gather_latency"),
575                "Time taken to gather rows into a batch by this collector",
576                &["pipeline"],
577                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
578                registry,
579            )
580            .unwrap(),
581            collector_batch_size: register_histogram_vec_with_registry!(
582                name("collector_batch_size"),
583                "Number of rows in a batch written to the database by this collector",
584                &["pipeline"],
585                BATCH_SIZE_BUCKETS.to_vec(),
586                registry,
587            )
588            .unwrap(),
589            total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
590                name("total_collector_skipped_checkpoints"),
591                "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
592                &["pipeline"],
593                registry,
594            ).unwrap(),
595            collector_reader_lo: register_int_gauge_vec_with_registry!(
596                name("collector_reader_lo"),
597                "Reader low watermark as observed by the collector",
598                &["pipeline"],
599                registry,
600            )
601            .unwrap(),
602            committer_commit_latency: register_histogram_vec_with_registry!(
603                name("committer_commit_latency"),
604                "Time taken to write a batch of rows to the database by this committer",
605                &["pipeline"],
606                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
607                registry,
608            )
609            .unwrap(),
610            committer_tx_rows: register_histogram_vec_with_registry!(
611                name("committer_tx_rows"),
612                "Number of rows written to the database in a single database transaction by this committer",
613                &["pipeline"],
614                BATCH_SIZE_BUCKETS.to_vec(),
615                registry,
616            )
617            .unwrap(),
618            watermark_gather_latency: register_histogram_vec_with_registry!(
619                name("watermark_gather_latency"),
620                "Time taken to calculate the new high watermark after a write by this committer",
621                &["pipeline"],
622                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
623                registry,
624            )
625            .unwrap(),
626            watermark_commit_latency: register_histogram_vec_with_registry!(
627                name("watermark_commit_latency"),
628                "Time taken to write the new high watermark to the database by this committer",
629                &["pipeline"],
630                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
631                registry,
632            )
633            .unwrap(),
634            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
635                name("watermark_pruner_read_latency"),
636                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
637                &["pipeline"],
638                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
639                registry,
640            )
641            .unwrap(),
642            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
643                name("watermark_pruner_write_latency"),
644                "Time taken to write the pruner's new upperbound to the database by this pruner",
645                &["pipeline"],
646                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
647                registry,
648            )
649            .unwrap(),
650            pruner_delete_latency: register_histogram_vec_with_registry!(
651                name("pruner_delete_latency"),
652                "Time taken to delete a chunk of data from the database by this pruner",
653                &["pipeline"],
654                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
655                registry,
656            )
657            .unwrap(),
658            watermark_epoch: register_int_gauge_vec_with_registry!(
659                name("watermark_epoch"),
660                "Current epoch high watermark for this committer",
661                &["pipeline"],
662                registry,
663            )
664            .unwrap(),
665            watermark_checkpoint: register_int_gauge_vec_with_registry!(
666                name("watermark_checkpoint"),
667                "Current checkpoint high watermark for this committer",
668                &["pipeline"],
669                registry,
670            )
671            .unwrap(),
672            watermark_transaction: register_int_gauge_vec_with_registry!(
673                name("watermark_transaction"),
674                "Current transaction high watermark for this committer",
675                &["pipeline"],
676                registry,
677            )
678            .unwrap(),
679            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
680                name("watermark_timestamp_ms"),
681                "Current timestamp high watermark for this committer, in milliseconds",
682                &["pipeline"],
683                registry,
684            )
685            .unwrap(),
686            watermark_reader_lo: register_int_gauge_vec_with_registry!(
687                name("watermark_reader_lo"),
688                "Current reader low watermark for this pruner",
689                &["pipeline"],
690                registry,
691            )
692            .unwrap(),
693            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
694                name("watermark_pruner_hi"),
695                "Current pruner high watermark for this pruner",
696                &["pipeline"],
697                registry,
698            )
699            .unwrap(),
700            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
701                name("watermark_epoch_in_db"),
702                "Last epoch high watermark this committer wrote to the DB",
703                &["pipeline"],
704                registry,
705            )
706            .unwrap(),
707            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
708                name("watermark_checkpoint_in_db"),
709                "Last checkpoint high watermark this committer wrote to the DB",
710                &["pipeline"],
711                registry,
712            )
713            .unwrap(),
714            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
715                name("watermark_transaction_in_db"),
716                "Last transaction high watermark this committer wrote to the DB",
717                &["pipeline"],
718                registry,
719            )
720            .unwrap(),
721            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
722                name("watermark_timestamp_ms_in_db"),
723                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
724                &["pipeline"],
725                registry,
726            )
727            .unwrap(),
728            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
729                name("watermark_reader_lo_in_db"),
730                "Last reader low watermark this pruner wrote to the DB",
731                &["pipeline"],
732                registry,
733            )
734            .unwrap(),
735            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
736                name("watermark_pruner_hi_in_db"),
737                "Last pruner high watermark this pruner wrote to the DB",
738                &["pipeline"],
739                registry,
740            )
741            .unwrap(),
742        })
743    }
744
745    pub(crate) fn inc_processor_retry<P: Processor>(
746        &self,
747        checkpoint: u64,
748        error: &anyhow::Error,
749        delay: Duration,
750    ) {
751        warn!(
752            pipeline = P::NAME,
753            checkpoint,
754            retry_delay_ms = delay.as_millis(),
755            "Retrying processor after error: {error:?}",
756        );
757
758        self.total_handler_processor_retries
759            .with_label_values(&[P::NAME])
760            .inc();
761    }
762}
763
764impl CheckpointLagMetricReporter {
765    pub fn new(
766        checkpoint_time_lag_histogram: Histogram,
767        latest_checkpoint_time_lag_gauge: IntGauge,
768        latest_checkpoint_sequence_number_gauge: IntGauge,
769    ) -> Arc<Self> {
770        Arc::new(Self {
771            checkpoint_time_lag_histogram,
772            latest_checkpoint_time_lag_gauge,
773            latest_checkpoint_sequence_number_gauge,
774            latest_reported_checkpoint: AtomicU64::new(0),
775        })
776    }
777
778    pub fn new_for_pipeline<P: Processor>(
779        checkpoint_time_lag_histogram: &HistogramVec,
780        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
781        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
782    ) -> Arc<Self> {
783        Self::new(
784            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
785            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
786            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
787        )
788    }
789
790    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
791        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
792        self.checkpoint_time_lag_histogram
793            .observe((lag as f64) / 1000.0);
794
795        let prev = self
796            .latest_reported_checkpoint
797            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
798        if cp_sequence_number > prev {
799            self.latest_checkpoint_sequence_number_gauge
800                .set(cp_sequence_number as i64);
801            self.latest_checkpoint_time_lag_gauge.set(lag);
802        }
803    }
804}
805
806#[cfg(test)]
807pub(crate) mod tests {
808    use std::sync::Arc;
809
810    use prometheus::Registry;
811
812    use super::*;
813
814    /// Construct IndexerMetrics for test purposes.
815    pub fn test_metrics() -> Arc<IndexerMetrics> {
816        IndexerMetrics::new(None, &Registry::new())
817    }
818
819    /// Construct IngestionMetrics for test purposes.
820    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
821        IngestionMetrics::new(None, &Registry::new())
822    }
823}