1use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6
7use prometheus::Histogram;
8use prometheus::HistogramVec;
9use prometheus::IntCounter;
10use prometheus::IntCounterVec;
11use prometheus::IntGauge;
12use prometheus::IntGaugeVec;
13use prometheus::Registry;
14use prometheus::register_histogram_vec_with_registry;
15use prometheus::register_histogram_with_registry;
16use prometheus::register_int_counter_vec_with_registry;
17use prometheus::register_int_counter_with_registry;
18use prometheus::register_int_gauge_vec_with_registry;
19use prometheus::register_int_gauge_with_registry;
20use tracing::warn;
21
22use crate::ingestion::error::Error;
23use crate::pipeline::Processor;
24
25const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
27 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
28];
29
30const LAG_SEC_BUCKETS: &[f64] = &[
33 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
34 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
35];
36
37const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
40 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
41];
42
43const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
45 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
46 2000.0, 5000.0, 10000.0,
47];
48
49const BATCH_SIZE_BUCKETS: &[f64] = &[
51 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
52];
53
54#[derive(Clone)]
56pub struct IngestionMetrics {
57 pub total_ingested_checkpoints: IntCounter,
59 pub total_ingested_transactions: IntCounter,
60 pub total_ingested_events: IntCounter,
61 pub total_ingested_objects: IntCounter,
62 pub total_ingested_bytes: IntCounter,
63 pub total_ingested_transient_retries: IntCounterVec,
64 pub total_ingested_not_found_retries: IntCounter,
65 pub total_ingested_permanent_errors: IntCounterVec,
66 pub total_streamed_checkpoints: IntCounter,
67 pub total_stream_disconnections: IntCounter,
68 pub total_streaming_connection_failures: IntCounter,
69
70 pub latest_ingested_checkpoint: IntGauge,
72 pub latest_streamed_checkpoint: IntGauge,
73 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
74 pub ingested_checkpoint_timestamp_lag: Histogram,
75
76 pub ingested_checkpoint_latency: Histogram,
77}
78
79#[derive(Clone)]
80pub struct IndexerMetrics {
81 pub total_handler_checkpoints_received: IntCounterVec,
83 pub total_handler_checkpoints_processed: IntCounterVec,
84 pub total_handler_rows_created: IntCounterVec,
85
86 pub latest_processed_checkpoint: IntGaugeVec,
87 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
88 pub processed_checkpoint_timestamp_lag: HistogramVec,
89
90 pub handler_checkpoint_latency: HistogramVec,
91
92 pub total_collector_checkpoints_received: IntCounterVec,
94 pub total_collector_rows_received: IntCounterVec,
95 pub total_collector_batches_created: IntCounterVec,
96 pub total_committer_batches_attempted: IntCounterVec,
97 pub total_committer_batches_succeeded: IntCounterVec,
98 pub total_committer_batches_failed: IntCounterVec,
99 pub total_committer_rows_committed: IntCounterVec,
100 pub total_committer_rows_affected: IntCounterVec,
101 pub total_watermarks_out_of_order: IntCounterVec,
102 pub total_pruner_chunks_attempted: IntCounterVec,
103 pub total_pruner_chunks_deleted: IntCounterVec,
104 pub total_pruner_rows_deleted: IntCounterVec,
105
106 pub latest_collected_checkpoint: IntGaugeVec,
108 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
109 pub collected_checkpoint_timestamp_lag: HistogramVec,
110
111 pub latest_partially_committed_checkpoint: IntGaugeVec,
116 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
117 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
118
119 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
124 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
125
126 pub collector_gather_latency: HistogramVec,
127 pub collector_batch_size: HistogramVec,
128 pub total_collector_skipped_checkpoints: IntCounterVec,
129 pub collector_reader_lo: IntGaugeVec,
130 pub committer_commit_latency: HistogramVec,
131 pub committer_tx_rows: HistogramVec,
132 pub watermark_gather_latency: HistogramVec,
133 pub watermark_commit_latency: HistogramVec,
134 pub watermark_pruner_read_latency: HistogramVec,
135 pub watermark_pruner_write_latency: HistogramVec,
136 pub pruner_delete_latency: HistogramVec,
137
138 pub watermark_epoch: IntGaugeVec,
139 pub watermark_checkpoint: IntGaugeVec,
140 pub watermark_transaction: IntGaugeVec,
141 pub watermark_timestamp_ms: IntGaugeVec,
142 pub watermark_reader_lo: IntGaugeVec,
143 pub watermark_pruner_hi: IntGaugeVec,
144
145 pub watermark_epoch_in_db: IntGaugeVec,
146 pub watermark_checkpoint_in_db: IntGaugeVec,
147 pub watermark_transaction_in_db: IntGaugeVec,
148 pub watermark_timestamp_in_db_ms: IntGaugeVec,
149 pub watermark_reader_lo_in_db: IntGaugeVec,
150 pub watermark_pruner_hi_in_db: IntGaugeVec,
151}
152
153pub(crate) struct CheckpointLagMetricReporter {
155 checkpoint_time_lag_histogram: Histogram,
157 latest_checkpoint_time_lag_gauge: IntGauge,
160 latest_checkpoint_sequence_number_gauge: IntGauge,
162 latest_reported_checkpoint: AtomicU64,
164}
165
166impl IngestionMetrics {
167 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
168 let prefix = prefix.unwrap_or("indexer");
169 let name = |n| format!("{prefix}_{n}");
170 Arc::new(Self {
171 total_ingested_checkpoints: register_int_counter_with_registry!(
172 name("total_ingested_checkpoints"),
173 "Total number of checkpoints fetched from the remote store",
174 registry,
175 )
176 .unwrap(),
177 total_ingested_transactions: register_int_counter_with_registry!(
178 name("total_ingested_transactions"),
179 "Total number of transactions fetched from the remote store",
180 registry,
181 )
182 .unwrap(),
183 total_ingested_events: register_int_counter_with_registry!(
184 name("total_ingested_events"),
185 "Total number of events fetched from the remote store",
186 registry,
187 )
188 .unwrap(),
189 total_ingested_objects: register_int_counter_with_registry!(
190 name("total_ingested_objects"),
191 "Total number of objects in checkpoints fetched from the remote store",
192 registry,
193 )
194 .unwrap(),
195 total_ingested_bytes: register_int_counter_with_registry!(
196 name("total_ingested_bytes"),
197 "Total number of bytes fetched from the remote store, this metric will not \
198 be updated when data are fetched over gRPC.",
199 registry,
200 )
201 .unwrap(),
202 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
203 name("total_ingested_retries"),
204 "Total number of retries due to transient errors while fetching data from the \
205 remote store",
206 &["reason"],
207 registry,
208 )
209 .unwrap(),
210 total_ingested_not_found_retries: register_int_counter_with_registry!(
211 name("total_ingested_not_found_retries"),
212 "Total number of retries due to the not found errors while fetching data from the \
213 remote store",
214 registry,
215 )
216 .unwrap(),
217 total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
218 name("total_ingested_permanent_errors"),
219 "Total number of permanent errors encountered while fetching data from the \
220 remote store, which cause the ingestion service to shutdown",
221 &["reason"],
222 registry,
223 )
224 .unwrap(),
225 total_streamed_checkpoints: register_int_counter_with_registry!(
226 name("total_streamed_checkpoints"),
227 "Total number of checkpoints received from gRPC streaming",
228 registry,
229 )
230 .unwrap(),
231 total_stream_disconnections: register_int_counter_with_registry!(
232 name("total_stream_disconnections"),
233 "Total number of times the gRPC stream was disconnected",
234 registry,
235 )
236 .unwrap(),
237 total_streaming_connection_failures: register_int_counter_with_registry!(
238 name("total_streaming_connection_failures"),
239 "Total number of failures due to streaming service connection or peek failures",
240 registry,
241 )
242 .unwrap(),
243 latest_ingested_checkpoint: register_int_gauge_with_registry!(
244 name("latest_ingested_checkpoint"),
245 "Latest checkpoint sequence number fetched from the remote store",
246 registry,
247 )
248 .unwrap(),
249 latest_streamed_checkpoint: register_int_gauge_with_registry!(
250 name("latest_streamed_checkpoint"),
251 "Latest checkpoint sequence number received from gRPC streaming",
252 registry,
253 )
254 .unwrap(),
255 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
256 name("latest_ingested_checkpoint_timestamp_lag_ms"),
257 "Difference between the system timestamp when the latest checkpoint was fetched and the \
258 timestamp in the checkpoint, in milliseconds",
259 registry,
260 )
261 .unwrap(),
262 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
263 name("ingested_checkpoint_timestamp_lag"),
264 "Difference between the system timestamp when a checkpoint was fetched and the \
265 timestamp in each checkpoint, in seconds",
266 LAG_SEC_BUCKETS.to_vec(),
267 registry,
268 )
269 .unwrap(),
270 ingested_checkpoint_latency: register_histogram_with_registry!(
271 name("ingested_checkpoint_latency"),
272 "Time taken to fetch a checkpoint from the remote store, including retries",
273 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
274 registry,
275 )
276 .unwrap(),
277 })
278 }
279
280 pub(crate) fn inc_retry(
283 &self,
284 checkpoint: u64,
285 reason: &str,
286 error: Error,
287 ) -> backoff::Error<Error> {
288 warn!(checkpoint, reason, "Retrying due to error: {error}");
289
290 self.total_ingested_transient_retries
291 .with_label_values(&[reason])
292 .inc();
293
294 backoff::Error::transient(error)
295 }
296}
297
298impl IndexerMetrics {
299 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
300 let prefix = prefix.unwrap_or("indexer");
301 let name = |n| format!("{prefix}_{n}");
302 Arc::new(Self {
303 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
304 name("total_handler_checkpoints_received"),
305 "Total number of checkpoints received by this handler",
306 &["pipeline"],
307 registry,
308 )
309 .unwrap(),
310 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
311 name("total_handler_checkpoints_processed"),
312 "Total number of checkpoints processed (converted into rows) by this handler",
313 &["pipeline"],
314 registry,
315 )
316 .unwrap(),
317 total_handler_rows_created: register_int_counter_vec_with_registry!(
318 name("total_handler_rows_created"),
319 "Total number of rows created by this handler",
320 &["pipeline"],
321 registry,
322 )
323 .unwrap(),
324 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
325 name("latest_processed_checkpoint"),
326 "Latest checkpoint sequence number processed by this handler",
327 &["pipeline"],
328 registry,
329 )
330 .unwrap(),
331 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
332 name("latest_processed_checkpoint_timestamp_lag_ms"),
333 "Difference between the system timestamp when the latest checkpoint was processed and the \
334 timestamp in the checkpoint, in milliseconds",
335 &["pipeline"],
336 registry,
337 )
338 .unwrap(),
339 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
340 name("processed_checkpoint_timestamp_lag"),
341 "Difference between the system timestamp when a checkpoint was processed and the \
342 timestamp in each checkpoint, in seconds",
343 &["pipeline"],
344 LAG_SEC_BUCKETS.to_vec(),
345 registry,
346 )
347 .unwrap(),
348 handler_checkpoint_latency: register_histogram_vec_with_registry!(
349 name("handler_checkpoint_latency"),
350 "Time taken to process a checkpoint by this handler",
351 &["pipeline"],
352 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
353 registry,
354 )
355 .unwrap(),
356 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
357 name("total_collector_checkpoints_received"),
358 "Total number of checkpoints received by this collector",
359 &["pipeline"],
360 registry,
361 )
362 .unwrap(),
363 total_collector_rows_received: register_int_counter_vec_with_registry!(
364 name("total_collector_rows_received"),
365 "Total number of rows received by this collector",
366 &["pipeline"],
367 registry,
368 )
369 .unwrap(),
370 total_collector_batches_created: register_int_counter_vec_with_registry!(
371 name("total_collector_batches_created"),
372 "Total number of batches created by this collector",
373 &["pipeline"],
374 registry,
375 )
376 .unwrap(),
377 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
378 name("total_committer_batches_attempted"),
379 "Total number of batches writes attempted by this committer",
380 &["pipeline"],
381 registry,
382 )
383 .unwrap(),
384 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
385 name("total_committer_batches_succeeded"),
386 "Total number of successful batches writes by this committer",
387 &["pipeline"],
388 registry,
389 )
390 .unwrap(),
391 total_committer_batches_failed: register_int_counter_vec_with_registry!(
392 name("total_committer_batches_failed"),
393 "Total number of failed batches writes by this committer",
394 &["pipeline"],
395 registry,
396 )
397 .unwrap(),
398 total_committer_rows_committed: register_int_counter_vec_with_registry!(
399 name("total_committer_rows_committed"),
400 "Total number of rows sent to the database by this committer",
401 &["pipeline"],
402 registry,
403 )
404 .unwrap(),
405 total_committer_rows_affected: register_int_counter_vec_with_registry!(
406 name("total_committer_rows_affected"),
407 "Total number of rows actually written to the database by this committer",
408 &["pipeline"],
409 registry,
410 )
411 .unwrap(),
412 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
413 name("watermark_out_of_order"),
414 "Number of times this committer encountered a batch for a checkpoint before its watermark",
415 &["pipeline"],
416 registry,
417 )
418 .unwrap(),
419 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
420 name("pruner_chunks_attempted"),
421 "Number of chunks this pruner attempted to delete",
422 &["pipeline"],
423 registry,
424 )
425 .unwrap(),
426 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
427 name("pruner_chunks_deleted"),
428 "Number of chunks this pruner successfully deleted",
429 &["pipeline"],
430 registry,
431 )
432 .unwrap(),
433 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
434 name("pruner_rows_deleted"),
435 "Number of rows this pruner successfully deleted",
436 &["pipeline"],
437 registry,
438 )
439 .unwrap(),
440 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
441 name("latest_collected_checkpoint"),
442 "Latest checkpoint sequence number collected by this collector",
443 &["pipeline"],
444 registry,
445 )
446 .unwrap(),
447 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
448 name("latest_collected_checkpoint_timestamp_lag_ms"),
449 "Difference between the system timestamp when the latest checkpoint was collected and the \
450 timestamp in the checkpoint, in milliseconds",
451 &["pipeline"],
452 registry,
453 )
454 .unwrap(),
455 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
456 name("collected_checkpoint_timestamp_lag"),
457 "Difference between the system timestamp when a checkpoint was collected and the \
458 timestamp in each checkpoint, in seconds",
459 &["pipeline"],
460 LAG_SEC_BUCKETS.to_vec(),
461 registry,
462 )
463 .unwrap(),
464 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
465 name("latest_partially_committed_checkpoint"),
466 "Latest checkpoint sequence number partially committed by this collector",
467 &["pipeline"],
468 registry,
469 )
470 .unwrap(),
471 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
472 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
473 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
474 timestamp in the checkpoint, in milliseconds",
475 &["pipeline"],
476 registry,
477 )
478 .unwrap(),
479 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
480 name("partially_committed_checkpoint_timestamp_lag"),
481 "Difference between the system timestamp when a checkpoint was partially committed and the \
482 timestamp in each checkpoint, in seconds",
483 &["pipeline"],
484 LAG_SEC_BUCKETS.to_vec(),
485 registry,
486 )
487 .unwrap(),
488 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
489 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
490 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
491 timestamp in the checkpoint, in milliseconds",
492 &["pipeline"],
493 registry,
494 )
495 .unwrap(),
496 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
497 name("watermarked_checkpoint_timestamp_lag"),
498 "Difference between the system timestamp when a checkpoint was watermarked and the \
499 timestamp in each checkpoint, in seconds",
500 &["pipeline"],
501 LAG_SEC_BUCKETS.to_vec(),
502 registry,
503 )
504 .unwrap(),
505 collector_gather_latency: register_histogram_vec_with_registry!(
506 name("collector_gather_latency"),
507 "Time taken to gather rows into a batch by this collector",
508 &["pipeline"],
509 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
510 registry,
511 )
512 .unwrap(),
513 collector_batch_size: register_histogram_vec_with_registry!(
514 name("collector_batch_size"),
515 "Number of rows in a batch written to the database by this collector",
516 &["pipeline"],
517 BATCH_SIZE_BUCKETS.to_vec(),
518 registry,
519 )
520 .unwrap(),
521 total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
522 name("total_collector_skipped_checkpoints"),
523 "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
524 &["pipeline"],
525 registry,
526 ).unwrap(),
527 collector_reader_lo: register_int_gauge_vec_with_registry!(
528 name("collector_reader_lo"),
529 "Reader low watermark as observed by the collector",
530 &["pipeline"],
531 registry,
532 )
533 .unwrap(),
534 committer_commit_latency: register_histogram_vec_with_registry!(
535 name("committer_commit_latency"),
536 "Time taken to write a batch of rows to the database by this committer",
537 &["pipeline"],
538 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
539 registry,
540 )
541 .unwrap(),
542 committer_tx_rows: register_histogram_vec_with_registry!(
543 name("committer_tx_rows"),
544 "Number of rows written to the database in a single database transaction by this committer",
545 &["pipeline"],
546 BATCH_SIZE_BUCKETS.to_vec(),
547 registry,
548 )
549 .unwrap(),
550 watermark_gather_latency: register_histogram_vec_with_registry!(
551 name("watermark_gather_latency"),
552 "Time taken to calculate the new high watermark after a write by this committer",
553 &["pipeline"],
554 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
555 registry,
556 )
557 .unwrap(),
558 watermark_commit_latency: register_histogram_vec_with_registry!(
559 name("watermark_commit_latency"),
560 "Time taken to write the new high watermark to the database by this committer",
561 &["pipeline"],
562 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
563 registry,
564 )
565 .unwrap(),
566 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
567 name("watermark_pruner_read_latency"),
568 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
569 &["pipeline"],
570 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
571 registry,
572 )
573 .unwrap(),
574 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
575 name("watermark_pruner_write_latency"),
576 "Time taken to write the pruner's new upperbound to the database by this pruner",
577 &["pipeline"],
578 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
579 registry,
580 )
581 .unwrap(),
582 pruner_delete_latency: register_histogram_vec_with_registry!(
583 name("pruner_delete_latency"),
584 "Time taken to delete a chunk of data from the database by this pruner",
585 &["pipeline"],
586 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
587 registry,
588 )
589 .unwrap(),
590 watermark_epoch: register_int_gauge_vec_with_registry!(
591 name("watermark_epoch"),
592 "Current epoch high watermark for this committer",
593 &["pipeline"],
594 registry,
595 )
596 .unwrap(),
597 watermark_checkpoint: register_int_gauge_vec_with_registry!(
598 name("watermark_checkpoint"),
599 "Current checkpoint high watermark for this committer",
600 &["pipeline"],
601 registry,
602 )
603 .unwrap(),
604 watermark_transaction: register_int_gauge_vec_with_registry!(
605 name("watermark_transaction"),
606 "Current transaction high watermark for this committer",
607 &["pipeline"],
608 registry,
609 )
610 .unwrap(),
611 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
612 name("watermark_timestamp_ms"),
613 "Current timestamp high watermark for this committer, in milliseconds",
614 &["pipeline"],
615 registry,
616 )
617 .unwrap(),
618 watermark_reader_lo: register_int_gauge_vec_with_registry!(
619 name("watermark_reader_lo"),
620 "Current reader low watermark for this pruner",
621 &["pipeline"],
622 registry,
623 )
624 .unwrap(),
625 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
626 name("watermark_pruner_hi"),
627 "Current pruner high watermark for this pruner",
628 &["pipeline"],
629 registry,
630 )
631 .unwrap(),
632 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
633 name("watermark_epoch_in_db"),
634 "Last epoch high watermark this committer wrote to the DB",
635 &["pipeline"],
636 registry,
637 )
638 .unwrap(),
639 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
640 name("watermark_checkpoint_in_db"),
641 "Last checkpoint high watermark this committer wrote to the DB",
642 &["pipeline"],
643 registry,
644 )
645 .unwrap(),
646 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
647 name("watermark_transaction_in_db"),
648 "Last transaction high watermark this committer wrote to the DB",
649 &["pipeline"],
650 registry,
651 )
652 .unwrap(),
653 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
654 name("watermark_timestamp_ms_in_db"),
655 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
656 &["pipeline"],
657 registry,
658 )
659 .unwrap(),
660 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
661 name("watermark_reader_lo_in_db"),
662 "Last reader low watermark this pruner wrote to the DB",
663 &["pipeline"],
664 registry,
665 )
666 .unwrap(),
667 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
668 name("watermark_pruner_hi_in_db"),
669 "Last pruner high watermark this pruner wrote to the DB",
670 &["pipeline"],
671 registry,
672 )
673 .unwrap(),
674 })
675 }
676}
677
678impl CheckpointLagMetricReporter {
679 pub fn new(
680 checkpoint_time_lag_histogram: Histogram,
681 latest_checkpoint_time_lag_gauge: IntGauge,
682 latest_checkpoint_sequence_number_gauge: IntGauge,
683 ) -> Arc<Self> {
684 Arc::new(Self {
685 checkpoint_time_lag_histogram,
686 latest_checkpoint_time_lag_gauge,
687 latest_checkpoint_sequence_number_gauge,
688 latest_reported_checkpoint: AtomicU64::new(0),
689 })
690 }
691
692 pub fn new_for_pipeline<P: Processor>(
693 checkpoint_time_lag_histogram: &HistogramVec,
694 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
695 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
696 ) -> Arc<Self> {
697 Self::new(
698 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
699 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
700 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
701 )
702 }
703
704 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
705 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
706 self.checkpoint_time_lag_histogram
707 .observe((lag as f64) / 1000.0);
708
709 let prev = self
710 .latest_reported_checkpoint
711 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
712 if cp_sequence_number > prev {
713 self.latest_checkpoint_sequence_number_gauge
714 .set(cp_sequence_number as i64);
715 self.latest_checkpoint_time_lag_gauge.set(lag);
716 }
717 }
718}
719
720#[cfg(test)]
721pub(crate) mod tests {
722 use std::sync::Arc;
723
724 use prometheus::Registry;
725
726 use super::*;
727
728 pub fn test_metrics() -> Arc<IndexerMetrics> {
730 IndexerMetrics::new(None, &Registry::new())
731 }
732
733 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
735 IngestionMetrics::new(None, &Registry::new())
736 }
737}