sui_indexer_alt_framework/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7    Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8    register_histogram_vec_with_registry, register_histogram_with_registry,
9    register_int_counter_vec_with_registry, register_int_counter_with_registry,
10    register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16/// Histogram buckets for the distribution of checkpoint fetching latencies.
17const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21/// Histogram buckets for the distribution of checkpoint lag (difference between the system time and
22/// the timestamp in the checkpoint).
23const LAG_SEC_BUCKETS: &[f64] = &[
24    0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25    0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28/// Histogram buckets for the distribution of latencies for processing a checkpoint in the indexer
29/// (without having to call out to other services).
30const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31    0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34/// Histogram buckets for the distribution of latencies for writing to the database.
35const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36    0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37    2000.0, 5000.0, 10000.0,
38];
39
40/// Histogram buckets for the distribution of batch sizes (number of rows) written to the database.
41const BATCH_SIZE_BUCKETS: &[f64] = &[
42    1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45/// Metrics specific to the ingestion service.
46#[derive(Clone)]
47pub struct IngestionMetrics {
48    // Statistics related to fetching data from the remote store.
49    pub total_ingested_checkpoints: IntCounter,
50    pub total_ingested_transactions: IntCounter,
51    pub total_ingested_events: IntCounter,
52    pub total_ingested_objects: IntCounter,
53    pub total_ingested_bytes: IntCounter,
54    pub total_ingested_transient_retries: IntCounterVec,
55    pub total_ingested_not_found_retries: IntCounter,
56    pub total_ingested_permanent_errors: IntCounterVec,
57    pub total_streamed_checkpoints: IntCounter,
58    pub total_stream_disconnections: IntCounter,
59    pub total_streaming_connection_failures: IntCounter,
60
61    // Checkpoint lag metrics for the ingestion pipeline.
62    pub latest_ingested_checkpoint: IntGauge,
63    pub latest_streamed_checkpoint: IntGauge,
64    pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65    pub ingested_checkpoint_timestamp_lag: Histogram,
66
67    pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72    // Statistics related to individual ingestion pipelines' handlers.
73    pub total_handler_checkpoints_received: IntCounterVec,
74    pub total_handler_checkpoints_processed: IntCounterVec,
75    pub total_handler_rows_created: IntCounterVec,
76
77    pub latest_processed_checkpoint: IntGaugeVec,
78    pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79    pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81    pub handler_checkpoint_latency: HistogramVec,
82
83    // Statistics related to individual ingestion pipelines.
84    pub total_collector_checkpoints_received: IntCounterVec,
85    pub total_collector_rows_received: IntCounterVec,
86    pub total_collector_batches_created: IntCounterVec,
87    pub total_committer_batches_attempted: IntCounterVec,
88    pub total_committer_batches_succeeded: IntCounterVec,
89    pub total_committer_batches_failed: IntCounterVec,
90    pub total_committer_rows_committed: IntCounterVec,
91    pub total_committer_rows_affected: IntCounterVec,
92    pub total_watermarks_out_of_order: IntCounterVec,
93    pub total_pruner_chunks_attempted: IntCounterVec,
94    pub total_pruner_chunks_deleted: IntCounterVec,
95    pub total_pruner_rows_deleted: IntCounterVec,
96
97    // Checkpoint lag metrics for the collector.
98    pub latest_collected_checkpoint: IntGaugeVec,
99    pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100    pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102    // Checkpoint lag metrics for the committer.
103    // We can only report partially committed checkpoints, since the concurrent committer isn't aware of
104    // when a checkpoint is fully committed. So we report whenever we see a checkpoint. Since data from
105    // the same checkpoint is batched continuously, this is a good proxy for the last committed checkpoint.
106    pub latest_partially_committed_checkpoint: IntGaugeVec,
107    pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108    pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110    // Checkpoint lag metrics for the watermarker.
111    // The latest watermarked checkpoint metric is already covered by watermark_checkpoint_in_db.
112    // While we already have watermark_timestamp_in_db_ms metric, reporting the lag explicitly
113    // for consistency.
114    pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115    pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117    pub collector_gather_latency: HistogramVec,
118    pub collector_batch_size: HistogramVec,
119    pub committer_commit_latency: HistogramVec,
120    pub committer_tx_rows: HistogramVec,
121    pub watermark_gather_latency: HistogramVec,
122    pub watermark_commit_latency: HistogramVec,
123    pub watermark_pruner_read_latency: HistogramVec,
124    pub watermark_pruner_write_latency: HistogramVec,
125    pub pruner_delete_latency: HistogramVec,
126
127    pub watermark_epoch: IntGaugeVec,
128    pub watermark_checkpoint: IntGaugeVec,
129    pub watermark_transaction: IntGaugeVec,
130    pub watermark_timestamp_ms: IntGaugeVec,
131    pub watermark_reader_lo: IntGaugeVec,
132    pub watermark_pruner_hi: IntGaugeVec,
133
134    pub watermark_epoch_in_db: IntGaugeVec,
135    pub watermark_checkpoint_in_db: IntGaugeVec,
136    pub watermark_transaction_in_db: IntGaugeVec,
137    pub watermark_timestamp_in_db_ms: IntGaugeVec,
138    pub watermark_reader_lo_in_db: IntGaugeVec,
139    pub watermark_pruner_hi_in_db: IntGaugeVec,
140}
141
142/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
143pub(crate) struct CheckpointLagMetricReporter {
144    /// Metric to report the lag distribution of each checkpoint.
145    checkpoint_time_lag_histogram: Histogram,
146    /// Metric to report the lag of the checkpoint with the highest sequence number observed so far.
147    /// This is needed since concurrent pipelines observe checkpoints out of order.
148    latest_checkpoint_time_lag_gauge: IntGauge,
149    /// Metric to report the sequence number of the checkpoint with the highest sequence number observed so far.
150    latest_checkpoint_sequence_number_gauge: IntGauge,
151    // Internal state to keep track of the highest checkpoint sequence number reported so far.
152    latest_reported_checkpoint: AtomicU64,
153}
154
155impl IngestionMetrics {
156    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
157        let prefix = prefix.unwrap_or("indexer");
158        let name = |n| format!("{prefix}_{n}");
159        Arc::new(Self {
160            total_ingested_checkpoints: register_int_counter_with_registry!(
161                name("total_ingested_checkpoints"),
162                "Total number of checkpoints fetched from the remote store",
163                registry,
164            )
165            .unwrap(),
166            total_ingested_transactions: register_int_counter_with_registry!(
167                name("total_ingested_transactions"),
168                "Total number of transactions fetched from the remote store",
169                registry,
170            )
171            .unwrap(),
172            total_ingested_events: register_int_counter_with_registry!(
173                name("total_ingested_events"),
174                "Total number of events fetched from the remote store",
175                registry,
176            )
177            .unwrap(),
178            total_ingested_objects: register_int_counter_with_registry!(
179                name("total_ingested_objects"),
180                "Total number of objects in checkpoints fetched from the remote store",
181                registry,
182            )
183            .unwrap(),
184            total_ingested_bytes: register_int_counter_with_registry!(
185                name("total_ingested_bytes"),
186                "Total number of bytes fetched from the remote store, this metric will not \
187                be updated when data are fetched over gRPC.",
188                registry,
189            )
190            .unwrap(),
191            total_ingested_transient_retries: register_int_counter_vec_with_registry!(
192                name("total_ingested_retries"),
193                "Total number of retries due to transient errors while fetching data from the \
194                 remote store",
195                &["reason"],
196                registry,
197            )
198            .unwrap(),
199            total_ingested_not_found_retries: register_int_counter_with_registry!(
200                name("total_ingested_not_found_retries"),
201                "Total number of retries due to the not found errors while fetching data from the \
202                 remote store",
203                registry,
204            )
205            .unwrap(),
206            total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
207                name("total_ingested_permanent_errors"),
208                "Total number of permanent errors encountered while fetching data from the \
209                 remote store, which cause the ingestion service to shutdown",
210                &["reason"],
211                registry,
212            )
213            .unwrap(),
214            total_streamed_checkpoints: register_int_counter_with_registry!(
215                name("total_streamed_checkpoints"),
216                "Total number of checkpoints received from gRPC streaming",
217                registry,
218            )
219            .unwrap(),
220            total_stream_disconnections: register_int_counter_with_registry!(
221                name("total_stream_disconnections"),
222                "Total number of times the gRPC stream was disconnected",
223                registry,
224            )
225            .unwrap(),
226            total_streaming_connection_failures: register_int_counter_with_registry!(
227                name("total_streaming_connection_failures"),
228                "Total number of failures due to streaming service connection or peek failures",
229                registry,
230            )
231            .unwrap(),
232            latest_ingested_checkpoint: register_int_gauge_with_registry!(
233                name("latest_ingested_checkpoint"),
234                "Latest checkpoint sequence number fetched from the remote store",
235                registry,
236            )
237            .unwrap(),
238            latest_streamed_checkpoint: register_int_gauge_with_registry!(
239                name("latest_streamed_checkpoint"),
240                "Latest checkpoint sequence number received from gRPC streaming",
241                registry,
242            )
243            .unwrap(),
244            latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
245                name("latest_ingested_checkpoint_timestamp_lag_ms"),
246                "Difference between the system timestamp when the latest checkpoint was fetched and the \
247                 timestamp in the checkpoint, in milliseconds",
248                registry,
249            )
250            .unwrap(),
251            ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
252                name("ingested_checkpoint_timestamp_lag"),
253                "Difference between the system timestamp when a checkpoint was fetched and the \
254                 timestamp in each checkpoint, in seconds",
255                LAG_SEC_BUCKETS.to_vec(),
256                registry,
257            )
258            .unwrap(),
259            ingested_checkpoint_latency: register_histogram_with_registry!(
260                name("ingested_checkpoint_latency"),
261                "Time taken to fetch a checkpoint from the remote store, including retries",
262                INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
263                registry,
264            )
265            .unwrap(),
266        })
267    }
268
269    /// Register that we're retrying a checkpoint fetch due to a transient error, logging the
270    /// reason and error.
271    pub(crate) fn inc_retry(
272        &self,
273        checkpoint: u64,
274        reason: &str,
275        error: Error,
276    ) -> backoff::Error<Error> {
277        warn!(checkpoint, reason, "Retrying due to error: {error}");
278
279        self.total_ingested_transient_retries
280            .with_label_values(&[reason])
281            .inc();
282
283        backoff::Error::transient(error)
284    }
285}
286
287impl IndexerMetrics {
288    pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
289        let prefix = prefix.unwrap_or("indexer");
290        let name = |n| format!("{prefix}_{n}");
291        Arc::new(Self {
292            total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
293                name("total_handler_checkpoints_received"),
294                "Total number of checkpoints received by this handler",
295                &["pipeline"],
296                registry,
297            )
298            .unwrap(),
299            total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
300                name("total_handler_checkpoints_processed"),
301                "Total number of checkpoints processed (converted into rows) by this handler",
302                &["pipeline"],
303                registry,
304            )
305            .unwrap(),
306            total_handler_rows_created: register_int_counter_vec_with_registry!(
307                name("total_handler_rows_created"),
308                "Total number of rows created by this handler",
309                &["pipeline"],
310                registry,
311            )
312            .unwrap(),
313            latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
314                name("latest_processed_checkpoint"),
315                "Latest checkpoint sequence number processed by this handler",
316                &["pipeline"],
317                registry,
318            )
319            .unwrap(),
320            latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
321                name("latest_processed_checkpoint_timestamp_lag_ms"),
322                "Difference between the system timestamp when the latest checkpoint was processed and the \
323                 timestamp in the checkpoint, in milliseconds",
324                &["pipeline"],
325                registry,
326            )
327            .unwrap(),
328            processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
329                name("processed_checkpoint_timestamp_lag"),
330                "Difference between the system timestamp when a checkpoint was processed and the \
331                 timestamp in each checkpoint, in seconds",
332                &["pipeline"],
333                LAG_SEC_BUCKETS.to_vec(),
334                registry,
335            )
336            .unwrap(),
337            handler_checkpoint_latency: register_histogram_vec_with_registry!(
338                name("handler_checkpoint_latency"),
339                "Time taken to process a checkpoint by this handler",
340                &["pipeline"],
341                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
342                registry,
343            )
344            .unwrap(),
345            total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
346                name("total_collector_checkpoints_received"),
347                "Total number of checkpoints received by this collector",
348                &["pipeline"],
349                registry,
350            )
351            .unwrap(),
352            total_collector_rows_received: register_int_counter_vec_with_registry!(
353                name("total_collector_rows_received"),
354                "Total number of rows received by this collector",
355                &["pipeline"],
356                registry,
357            )
358            .unwrap(),
359            total_collector_batches_created: register_int_counter_vec_with_registry!(
360                name("total_collector_batches_created"),
361                "Total number of batches created by this collector",
362                &["pipeline"],
363                registry,
364            )
365            .unwrap(),
366            total_committer_batches_attempted: register_int_counter_vec_with_registry!(
367                name("total_committer_batches_attempted"),
368                "Total number of batches writes attempted by this committer",
369                &["pipeline"],
370                registry,
371            )
372            .unwrap(),
373            total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
374                name("total_committer_batches_succeeded"),
375                "Total number of successful batches writes by this committer",
376                &["pipeline"],
377                registry,
378            )
379            .unwrap(),
380            total_committer_batches_failed: register_int_counter_vec_with_registry!(
381                name("total_committer_batches_failed"),
382                "Total number of failed batches writes by this committer",
383                &["pipeline"],
384                registry,
385            )
386            .unwrap(),
387            total_committer_rows_committed: register_int_counter_vec_with_registry!(
388                name("total_committer_rows_committed"),
389                "Total number of rows sent to the database by this committer",
390                &["pipeline"],
391                registry,
392            )
393            .unwrap(),
394            total_committer_rows_affected: register_int_counter_vec_with_registry!(
395                name("total_committer_rows_affected"),
396                "Total number of rows actually written to the database by this committer",
397                &["pipeline"],
398                registry,
399            )
400            .unwrap(),
401            total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
402                name("watermark_out_of_order"),
403                "Number of times this committer encountered a batch for a checkpoint before its watermark",
404                &["pipeline"],
405                registry,
406            )
407            .unwrap(),
408            total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
409                name("pruner_chunks_attempted"),
410                "Number of chunks this pruner attempted to delete",
411                &["pipeline"],
412                registry,
413            )
414            .unwrap(),
415            total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
416                name("pruner_chunks_deleted"),
417                "Number of chunks this pruner successfully deleted",
418                &["pipeline"],
419                registry,
420            )
421            .unwrap(),
422            total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
423                name("pruner_rows_deleted"),
424                "Number of rows this pruner successfully deleted",
425                &["pipeline"],
426                registry,
427            )
428            .unwrap(),
429            latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
430                name("latest_collected_checkpoint"),
431                "Latest checkpoint sequence number collected by this collector",
432                &["pipeline"],
433                registry,
434            )
435            .unwrap(),
436            latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
437                name("latest_collected_checkpoint_timestamp_lag_ms"),
438                "Difference between the system timestamp when the latest checkpoint was collected and the \
439                 timestamp in the checkpoint, in milliseconds",
440                &["pipeline"],
441                registry,
442            )
443            .unwrap(),
444            collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
445                name("collected_checkpoint_timestamp_lag"),
446                "Difference between the system timestamp when a checkpoint was collected and the \
447                 timestamp in each checkpoint, in seconds",
448                &["pipeline"],
449                LAG_SEC_BUCKETS.to_vec(),
450                registry,
451            )
452            .unwrap(),
453            latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
454                name("latest_partially_committed_checkpoint"),
455                "Latest checkpoint sequence number partially committed by this collector",
456                &["pipeline"],
457                registry,
458            )
459            .unwrap(),
460            latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
461                name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
462                "Difference between the system timestamp when the latest checkpoint was partially committed and the \
463                 timestamp in the checkpoint, in milliseconds",
464                &["pipeline"],
465                registry,
466            )
467            .unwrap(),
468            partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
469                name("partially_committed_checkpoint_timestamp_lag"),
470                "Difference between the system timestamp when a checkpoint was partially committed and the \
471                 timestamp in each checkpoint, in seconds",
472                &["pipeline"],
473                LAG_SEC_BUCKETS.to_vec(),
474                registry,
475            )
476            .unwrap(),
477            latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
478                name("latest_watermarked_checkpoint_timestamp_lag_ms"),
479                "Difference between the system timestamp when the latest checkpoint was watermarked and the \
480                 timestamp in the checkpoint, in milliseconds",
481                &["pipeline"],
482                registry,
483            )
484            .unwrap(),
485            watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
486                name("watermarked_checkpoint_timestamp_lag"),
487                "Difference between the system timestamp when a checkpoint was watermarked and the \
488                 timestamp in each checkpoint, in seconds",
489                &["pipeline"],
490                LAG_SEC_BUCKETS.to_vec(),
491                registry,
492            )
493            .unwrap(),
494            collector_gather_latency: register_histogram_vec_with_registry!(
495                name("collector_gather_latency"),
496                "Time taken to gather rows into a batch by this collector",
497                &["pipeline"],
498                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
499                registry,
500            )
501            .unwrap(),
502            collector_batch_size: register_histogram_vec_with_registry!(
503                name("collector_batch_size"),
504                "Number of rows in a batch written to the database by this collector",
505                &["pipeline"],
506                BATCH_SIZE_BUCKETS.to_vec(),
507                registry,
508            )
509            .unwrap(),
510            committer_commit_latency: register_histogram_vec_with_registry!(
511                name("committer_commit_latency"),
512                "Time taken to write a batch of rows to the database by this committer",
513                &["pipeline"],
514                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
515                registry,
516            )
517            .unwrap(),
518            committer_tx_rows: register_histogram_vec_with_registry!(
519                name("committer_tx_rows"),
520                "Number of rows written to the database in a single database transaction by this committer",
521                &["pipeline"],
522                BATCH_SIZE_BUCKETS.to_vec(),
523                registry,
524            )
525            .unwrap(),
526            watermark_gather_latency: register_histogram_vec_with_registry!(
527                name("watermark_gather_latency"),
528                "Time taken to calculate the new high watermark after a write by this committer",
529                &["pipeline"],
530                PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
531                registry,
532            )
533            .unwrap(),
534            watermark_commit_latency: register_histogram_vec_with_registry!(
535                name("watermark_commit_latency"),
536                "Time taken to write the new high watermark to the database by this committer",
537                &["pipeline"],
538                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
539                registry,
540            )
541            .unwrap(),
542            watermark_pruner_read_latency: register_histogram_vec_with_registry!(
543                name("watermark_pruner_read_latency"),
544                "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
545                &["pipeline"],
546                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
547                registry,
548            )
549            .unwrap(),
550            watermark_pruner_write_latency: register_histogram_vec_with_registry!(
551                name("watermark_pruner_write_latency"),
552                "Time taken to write the pruner's new upperbound to the database by this pruner",
553                &["pipeline"],
554                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
555                registry,
556            )
557            .unwrap(),
558            pruner_delete_latency: register_histogram_vec_with_registry!(
559                name("pruner_delete_latency"),
560                "Time taken to delete a chunk of data from the database by this pruner",
561                &["pipeline"],
562                DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
563                registry,
564            )
565            .unwrap(),
566            watermark_epoch: register_int_gauge_vec_with_registry!(
567                name("watermark_epoch"),
568                "Current epoch high watermark for this committer",
569                &["pipeline"],
570                registry,
571            )
572            .unwrap(),
573            watermark_checkpoint: register_int_gauge_vec_with_registry!(
574                name("watermark_checkpoint"),
575                "Current checkpoint high watermark for this committer",
576                &["pipeline"],
577                registry,
578            )
579            .unwrap(),
580            watermark_transaction: register_int_gauge_vec_with_registry!(
581                name("watermark_transaction"),
582                "Current transaction high watermark for this committer",
583                &["pipeline"],
584                registry,
585            )
586            .unwrap(),
587            watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
588                name("watermark_timestamp_ms"),
589                "Current timestamp high watermark for this committer, in milliseconds",
590                &["pipeline"],
591                registry,
592            )
593            .unwrap(),
594            watermark_reader_lo: register_int_gauge_vec_with_registry!(
595                name("watermark_reader_lo"),
596                "Current reader low watermark for this pruner",
597                &["pipeline"],
598                registry,
599            )
600            .unwrap(),
601            watermark_pruner_hi: register_int_gauge_vec_with_registry!(
602                name("watermark_pruner_hi"),
603                "Current pruner high watermark for this pruner",
604                &["pipeline"],
605                registry,
606            )
607            .unwrap(),
608            watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
609                name("watermark_epoch_in_db"),
610                "Last epoch high watermark this committer wrote to the DB",
611                &["pipeline"],
612                registry,
613            )
614            .unwrap(),
615            watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
616                name("watermark_checkpoint_in_db"),
617                "Last checkpoint high watermark this committer wrote to the DB",
618                &["pipeline"],
619                registry,
620            )
621            .unwrap(),
622            watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
623                name("watermark_transaction_in_db"),
624                "Last transaction high watermark this committer wrote to the DB",
625                &["pipeline"],
626                registry,
627            )
628            .unwrap(),
629            watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
630                name("watermark_timestamp_ms_in_db"),
631                "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
632                &["pipeline"],
633                registry,
634            )
635            .unwrap(),
636            watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
637                name("watermark_reader_lo_in_db"),
638                "Last reader low watermark this pruner wrote to the DB",
639                &["pipeline"],
640                registry,
641            )
642            .unwrap(),
643            watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
644                name("watermark_pruner_hi_in_db"),
645                "Last pruner high watermark this pruner wrote to the DB",
646                &["pipeline"],
647                registry,
648            )
649            .unwrap(),
650        })
651    }
652}
653
654impl CheckpointLagMetricReporter {
655    pub fn new(
656        checkpoint_time_lag_histogram: Histogram,
657        latest_checkpoint_time_lag_gauge: IntGauge,
658        latest_checkpoint_sequence_number_gauge: IntGauge,
659    ) -> Arc<Self> {
660        Arc::new(Self {
661            checkpoint_time_lag_histogram,
662            latest_checkpoint_time_lag_gauge,
663            latest_checkpoint_sequence_number_gauge,
664            latest_reported_checkpoint: AtomicU64::new(0),
665        })
666    }
667
668    pub fn new_for_pipeline<P: Processor>(
669        checkpoint_time_lag_histogram: &HistogramVec,
670        latest_checkpoint_time_lag_gauge: &IntGaugeVec,
671        latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
672    ) -> Arc<Self> {
673        Self::new(
674            checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
675            latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
676            latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
677        )
678    }
679
680    pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
681        let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
682        self.checkpoint_time_lag_histogram
683            .observe((lag as f64) / 1000.0);
684
685        let prev = self
686            .latest_reported_checkpoint
687            .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
688        if cp_sequence_number > prev {
689            self.latest_checkpoint_sequence_number_gauge
690                .set(cp_sequence_number as i64);
691            self.latest_checkpoint_time_lag_gauge.set(lag);
692        }
693    }
694}
695
696#[cfg(test)]
697pub(crate) mod tests {
698    use std::sync::Arc;
699
700    use prometheus::Registry;
701
702    use super::{IndexerMetrics, IngestionMetrics};
703
704    /// Construct IndexerMetrics for test purposes.
705    pub fn test_metrics() -> Arc<IndexerMetrics> {
706        IndexerMetrics::new(None, &Registry::new())
707    }
708
709    /// Construct IngestionMetrics for test purposes.
710    pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
711        IngestionMetrics::new(None, &Registry::new())
712    }
713}