1use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8 register_histogram_vec_with_registry, register_histogram_with_registry,
9 register_int_counter_vec_with_registry, register_int_counter_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21const LAG_SEC_BUCKETS: &[f64] = &[
24 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37 2000.0, 5000.0, 10000.0,
38];
39
40const BATCH_SIZE_BUCKETS: &[f64] = &[
42 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45#[derive(Clone)]
47pub struct IngestionMetrics {
48 pub total_ingested_checkpoints: IntCounter,
50 pub total_ingested_transactions: IntCounter,
51 pub total_ingested_events: IntCounter,
52 pub total_ingested_objects: IntCounter,
53 pub total_ingested_bytes: IntCounter,
54 pub total_ingested_transient_retries: IntCounterVec,
55 pub total_ingested_not_found_retries: IntCounter,
56 pub total_ingested_permanent_errors: IntCounterVec,
57 pub total_streamed_checkpoints: IntCounter,
58 pub total_stream_disconnections: IntCounter,
59 pub total_streaming_connection_failures: IntCounter,
60
61 pub latest_ingested_checkpoint: IntGauge,
63 pub latest_streamed_checkpoint: IntGauge,
64 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65 pub ingested_checkpoint_timestamp_lag: Histogram,
66
67 pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72 pub total_handler_checkpoints_received: IntCounterVec,
74 pub total_handler_checkpoints_processed: IntCounterVec,
75 pub total_handler_rows_created: IntCounterVec,
76
77 pub latest_processed_checkpoint: IntGaugeVec,
78 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79 pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81 pub handler_checkpoint_latency: HistogramVec,
82
83 pub total_collector_checkpoints_received: IntCounterVec,
85 pub total_collector_rows_received: IntCounterVec,
86 pub total_collector_batches_created: IntCounterVec,
87 pub total_committer_batches_attempted: IntCounterVec,
88 pub total_committer_batches_succeeded: IntCounterVec,
89 pub total_committer_batches_failed: IntCounterVec,
90 pub total_committer_rows_committed: IntCounterVec,
91 pub total_committer_rows_affected: IntCounterVec,
92 pub total_watermarks_out_of_order: IntCounterVec,
93 pub total_pruner_chunks_attempted: IntCounterVec,
94 pub total_pruner_chunks_deleted: IntCounterVec,
95 pub total_pruner_rows_deleted: IntCounterVec,
96
97 pub latest_collected_checkpoint: IntGaugeVec,
99 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100 pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102 pub latest_partially_committed_checkpoint: IntGaugeVec,
107 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117 pub collector_gather_latency: HistogramVec,
118 pub collector_batch_size: HistogramVec,
119 pub total_collector_skipped_checkpoints: IntCounterVec,
120 pub collector_reader_lo: IntGaugeVec,
121 pub committer_commit_latency: HistogramVec,
122 pub committer_tx_rows: HistogramVec,
123 pub watermark_gather_latency: HistogramVec,
124 pub watermark_commit_latency: HistogramVec,
125 pub watermark_pruner_read_latency: HistogramVec,
126 pub watermark_pruner_write_latency: HistogramVec,
127 pub pruner_delete_latency: HistogramVec,
128
129 pub watermark_epoch: IntGaugeVec,
130 pub watermark_checkpoint: IntGaugeVec,
131 pub watermark_transaction: IntGaugeVec,
132 pub watermark_timestamp_ms: IntGaugeVec,
133 pub watermark_reader_lo: IntGaugeVec,
134 pub watermark_pruner_hi: IntGaugeVec,
135
136 pub watermark_epoch_in_db: IntGaugeVec,
137 pub watermark_checkpoint_in_db: IntGaugeVec,
138 pub watermark_transaction_in_db: IntGaugeVec,
139 pub watermark_timestamp_in_db_ms: IntGaugeVec,
140 pub watermark_reader_lo_in_db: IntGaugeVec,
141 pub watermark_pruner_hi_in_db: IntGaugeVec,
142}
143
144pub(crate) struct CheckpointLagMetricReporter {
146 checkpoint_time_lag_histogram: Histogram,
148 latest_checkpoint_time_lag_gauge: IntGauge,
151 latest_checkpoint_sequence_number_gauge: IntGauge,
153 latest_reported_checkpoint: AtomicU64,
155}
156
157impl IngestionMetrics {
158 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
159 let prefix = prefix.unwrap_or("indexer");
160 let name = |n| format!("{prefix}_{n}");
161 Arc::new(Self {
162 total_ingested_checkpoints: register_int_counter_with_registry!(
163 name("total_ingested_checkpoints"),
164 "Total number of checkpoints fetched from the remote store",
165 registry,
166 )
167 .unwrap(),
168 total_ingested_transactions: register_int_counter_with_registry!(
169 name("total_ingested_transactions"),
170 "Total number of transactions fetched from the remote store",
171 registry,
172 )
173 .unwrap(),
174 total_ingested_events: register_int_counter_with_registry!(
175 name("total_ingested_events"),
176 "Total number of events fetched from the remote store",
177 registry,
178 )
179 .unwrap(),
180 total_ingested_objects: register_int_counter_with_registry!(
181 name("total_ingested_objects"),
182 "Total number of objects in checkpoints fetched from the remote store",
183 registry,
184 )
185 .unwrap(),
186 total_ingested_bytes: register_int_counter_with_registry!(
187 name("total_ingested_bytes"),
188 "Total number of bytes fetched from the remote store, this metric will not \
189 be updated when data are fetched over gRPC.",
190 registry,
191 )
192 .unwrap(),
193 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
194 name("total_ingested_retries"),
195 "Total number of retries due to transient errors while fetching data from the \
196 remote store",
197 &["reason"],
198 registry,
199 )
200 .unwrap(),
201 total_ingested_not_found_retries: register_int_counter_with_registry!(
202 name("total_ingested_not_found_retries"),
203 "Total number of retries due to the not found errors while fetching data from the \
204 remote store",
205 registry,
206 )
207 .unwrap(),
208 total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
209 name("total_ingested_permanent_errors"),
210 "Total number of permanent errors encountered while fetching data from the \
211 remote store, which cause the ingestion service to shutdown",
212 &["reason"],
213 registry,
214 )
215 .unwrap(),
216 total_streamed_checkpoints: register_int_counter_with_registry!(
217 name("total_streamed_checkpoints"),
218 "Total number of checkpoints received from gRPC streaming",
219 registry,
220 )
221 .unwrap(),
222 total_stream_disconnections: register_int_counter_with_registry!(
223 name("total_stream_disconnections"),
224 "Total number of times the gRPC stream was disconnected",
225 registry,
226 )
227 .unwrap(),
228 total_streaming_connection_failures: register_int_counter_with_registry!(
229 name("total_streaming_connection_failures"),
230 "Total number of failures due to streaming service connection or peek failures",
231 registry,
232 )
233 .unwrap(),
234 latest_ingested_checkpoint: register_int_gauge_with_registry!(
235 name("latest_ingested_checkpoint"),
236 "Latest checkpoint sequence number fetched from the remote store",
237 registry,
238 )
239 .unwrap(),
240 latest_streamed_checkpoint: register_int_gauge_with_registry!(
241 name("latest_streamed_checkpoint"),
242 "Latest checkpoint sequence number received from gRPC streaming",
243 registry,
244 )
245 .unwrap(),
246 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
247 name("latest_ingested_checkpoint_timestamp_lag_ms"),
248 "Difference between the system timestamp when the latest checkpoint was fetched and the \
249 timestamp in the checkpoint, in milliseconds",
250 registry,
251 )
252 .unwrap(),
253 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
254 name("ingested_checkpoint_timestamp_lag"),
255 "Difference between the system timestamp when a checkpoint was fetched and the \
256 timestamp in each checkpoint, in seconds",
257 LAG_SEC_BUCKETS.to_vec(),
258 registry,
259 )
260 .unwrap(),
261 ingested_checkpoint_latency: register_histogram_with_registry!(
262 name("ingested_checkpoint_latency"),
263 "Time taken to fetch a checkpoint from the remote store, including retries",
264 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
265 registry,
266 )
267 .unwrap(),
268 })
269 }
270
271 pub(crate) fn inc_retry(
274 &self,
275 checkpoint: u64,
276 reason: &str,
277 error: Error,
278 ) -> backoff::Error<Error> {
279 warn!(checkpoint, reason, "Retrying due to error: {error}");
280
281 self.total_ingested_transient_retries
282 .with_label_values(&[reason])
283 .inc();
284
285 backoff::Error::transient(error)
286 }
287}
288
289impl IndexerMetrics {
290 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
291 let prefix = prefix.unwrap_or("indexer");
292 let name = |n| format!("{prefix}_{n}");
293 Arc::new(Self {
294 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
295 name("total_handler_checkpoints_received"),
296 "Total number of checkpoints received by this handler",
297 &["pipeline"],
298 registry,
299 )
300 .unwrap(),
301 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
302 name("total_handler_checkpoints_processed"),
303 "Total number of checkpoints processed (converted into rows) by this handler",
304 &["pipeline"],
305 registry,
306 )
307 .unwrap(),
308 total_handler_rows_created: register_int_counter_vec_with_registry!(
309 name("total_handler_rows_created"),
310 "Total number of rows created by this handler",
311 &["pipeline"],
312 registry,
313 )
314 .unwrap(),
315 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
316 name("latest_processed_checkpoint"),
317 "Latest checkpoint sequence number processed by this handler",
318 &["pipeline"],
319 registry,
320 )
321 .unwrap(),
322 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
323 name("latest_processed_checkpoint_timestamp_lag_ms"),
324 "Difference between the system timestamp when the latest checkpoint was processed and the \
325 timestamp in the checkpoint, in milliseconds",
326 &["pipeline"],
327 registry,
328 )
329 .unwrap(),
330 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
331 name("processed_checkpoint_timestamp_lag"),
332 "Difference between the system timestamp when a checkpoint was processed and the \
333 timestamp in each checkpoint, in seconds",
334 &["pipeline"],
335 LAG_SEC_BUCKETS.to_vec(),
336 registry,
337 )
338 .unwrap(),
339 handler_checkpoint_latency: register_histogram_vec_with_registry!(
340 name("handler_checkpoint_latency"),
341 "Time taken to process a checkpoint by this handler",
342 &["pipeline"],
343 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
344 registry,
345 )
346 .unwrap(),
347 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
348 name("total_collector_checkpoints_received"),
349 "Total number of checkpoints received by this collector",
350 &["pipeline"],
351 registry,
352 )
353 .unwrap(),
354 total_collector_rows_received: register_int_counter_vec_with_registry!(
355 name("total_collector_rows_received"),
356 "Total number of rows received by this collector",
357 &["pipeline"],
358 registry,
359 )
360 .unwrap(),
361 total_collector_batches_created: register_int_counter_vec_with_registry!(
362 name("total_collector_batches_created"),
363 "Total number of batches created by this collector",
364 &["pipeline"],
365 registry,
366 )
367 .unwrap(),
368 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
369 name("total_committer_batches_attempted"),
370 "Total number of batches writes attempted by this committer",
371 &["pipeline"],
372 registry,
373 )
374 .unwrap(),
375 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
376 name("total_committer_batches_succeeded"),
377 "Total number of successful batches writes by this committer",
378 &["pipeline"],
379 registry,
380 )
381 .unwrap(),
382 total_committer_batches_failed: register_int_counter_vec_with_registry!(
383 name("total_committer_batches_failed"),
384 "Total number of failed batches writes by this committer",
385 &["pipeline"],
386 registry,
387 )
388 .unwrap(),
389 total_committer_rows_committed: register_int_counter_vec_with_registry!(
390 name("total_committer_rows_committed"),
391 "Total number of rows sent to the database by this committer",
392 &["pipeline"],
393 registry,
394 )
395 .unwrap(),
396 total_committer_rows_affected: register_int_counter_vec_with_registry!(
397 name("total_committer_rows_affected"),
398 "Total number of rows actually written to the database by this committer",
399 &["pipeline"],
400 registry,
401 )
402 .unwrap(),
403 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
404 name("watermark_out_of_order"),
405 "Number of times this committer encountered a batch for a checkpoint before its watermark",
406 &["pipeline"],
407 registry,
408 )
409 .unwrap(),
410 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
411 name("pruner_chunks_attempted"),
412 "Number of chunks this pruner attempted to delete",
413 &["pipeline"],
414 registry,
415 )
416 .unwrap(),
417 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
418 name("pruner_chunks_deleted"),
419 "Number of chunks this pruner successfully deleted",
420 &["pipeline"],
421 registry,
422 )
423 .unwrap(),
424 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
425 name("pruner_rows_deleted"),
426 "Number of rows this pruner successfully deleted",
427 &["pipeline"],
428 registry,
429 )
430 .unwrap(),
431 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
432 name("latest_collected_checkpoint"),
433 "Latest checkpoint sequence number collected by this collector",
434 &["pipeline"],
435 registry,
436 )
437 .unwrap(),
438 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
439 name("latest_collected_checkpoint_timestamp_lag_ms"),
440 "Difference between the system timestamp when the latest checkpoint was collected and the \
441 timestamp in the checkpoint, in milliseconds",
442 &["pipeline"],
443 registry,
444 )
445 .unwrap(),
446 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
447 name("collected_checkpoint_timestamp_lag"),
448 "Difference between the system timestamp when a checkpoint was collected and the \
449 timestamp in each checkpoint, in seconds",
450 &["pipeline"],
451 LAG_SEC_BUCKETS.to_vec(),
452 registry,
453 )
454 .unwrap(),
455 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
456 name("latest_partially_committed_checkpoint"),
457 "Latest checkpoint sequence number partially committed by this collector",
458 &["pipeline"],
459 registry,
460 )
461 .unwrap(),
462 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
463 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
464 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
465 timestamp in the checkpoint, in milliseconds",
466 &["pipeline"],
467 registry,
468 )
469 .unwrap(),
470 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
471 name("partially_committed_checkpoint_timestamp_lag"),
472 "Difference between the system timestamp when a checkpoint was partially committed and the \
473 timestamp in each checkpoint, in seconds",
474 &["pipeline"],
475 LAG_SEC_BUCKETS.to_vec(),
476 registry,
477 )
478 .unwrap(),
479 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
480 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
481 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
482 timestamp in the checkpoint, in milliseconds",
483 &["pipeline"],
484 registry,
485 )
486 .unwrap(),
487 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
488 name("watermarked_checkpoint_timestamp_lag"),
489 "Difference between the system timestamp when a checkpoint was watermarked and the \
490 timestamp in each checkpoint, in seconds",
491 &["pipeline"],
492 LAG_SEC_BUCKETS.to_vec(),
493 registry,
494 )
495 .unwrap(),
496 collector_gather_latency: register_histogram_vec_with_registry!(
497 name("collector_gather_latency"),
498 "Time taken to gather rows into a batch by this collector",
499 &["pipeline"],
500 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
501 registry,
502 )
503 .unwrap(),
504 collector_batch_size: register_histogram_vec_with_registry!(
505 name("collector_batch_size"),
506 "Number of rows in a batch written to the database by this collector",
507 &["pipeline"],
508 BATCH_SIZE_BUCKETS.to_vec(),
509 registry,
510 )
511 .unwrap(),
512 total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
513 name("total_collector_skipped_checkpoints"),
514 "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
515 &["pipeline"],
516 registry,
517 ).unwrap(),
518 collector_reader_lo: register_int_gauge_vec_with_registry!(
519 name("collector_reader_lo"),
520 "Reader low watermark as observed by the collector",
521 &["pipeline"],
522 registry,
523 )
524 .unwrap(),
525 committer_commit_latency: register_histogram_vec_with_registry!(
526 name("committer_commit_latency"),
527 "Time taken to write a batch of rows to the database by this committer",
528 &["pipeline"],
529 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
530 registry,
531 )
532 .unwrap(),
533 committer_tx_rows: register_histogram_vec_with_registry!(
534 name("committer_tx_rows"),
535 "Number of rows written to the database in a single database transaction by this committer",
536 &["pipeline"],
537 BATCH_SIZE_BUCKETS.to_vec(),
538 registry,
539 )
540 .unwrap(),
541 watermark_gather_latency: register_histogram_vec_with_registry!(
542 name("watermark_gather_latency"),
543 "Time taken to calculate the new high watermark after a write by this committer",
544 &["pipeline"],
545 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
546 registry,
547 )
548 .unwrap(),
549 watermark_commit_latency: register_histogram_vec_with_registry!(
550 name("watermark_commit_latency"),
551 "Time taken to write the new high watermark to the database by this committer",
552 &["pipeline"],
553 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
554 registry,
555 )
556 .unwrap(),
557 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
558 name("watermark_pruner_read_latency"),
559 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
560 &["pipeline"],
561 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
562 registry,
563 )
564 .unwrap(),
565 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
566 name("watermark_pruner_write_latency"),
567 "Time taken to write the pruner's new upperbound to the database by this pruner",
568 &["pipeline"],
569 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
570 registry,
571 )
572 .unwrap(),
573 pruner_delete_latency: register_histogram_vec_with_registry!(
574 name("pruner_delete_latency"),
575 "Time taken to delete a chunk of data from the database by this pruner",
576 &["pipeline"],
577 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
578 registry,
579 )
580 .unwrap(),
581 watermark_epoch: register_int_gauge_vec_with_registry!(
582 name("watermark_epoch"),
583 "Current epoch high watermark for this committer",
584 &["pipeline"],
585 registry,
586 )
587 .unwrap(),
588 watermark_checkpoint: register_int_gauge_vec_with_registry!(
589 name("watermark_checkpoint"),
590 "Current checkpoint high watermark for this committer",
591 &["pipeline"],
592 registry,
593 )
594 .unwrap(),
595 watermark_transaction: register_int_gauge_vec_with_registry!(
596 name("watermark_transaction"),
597 "Current transaction high watermark for this committer",
598 &["pipeline"],
599 registry,
600 )
601 .unwrap(),
602 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
603 name("watermark_timestamp_ms"),
604 "Current timestamp high watermark for this committer, in milliseconds",
605 &["pipeline"],
606 registry,
607 )
608 .unwrap(),
609 watermark_reader_lo: register_int_gauge_vec_with_registry!(
610 name("watermark_reader_lo"),
611 "Current reader low watermark for this pruner",
612 &["pipeline"],
613 registry,
614 )
615 .unwrap(),
616 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
617 name("watermark_pruner_hi"),
618 "Current pruner high watermark for this pruner",
619 &["pipeline"],
620 registry,
621 )
622 .unwrap(),
623 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
624 name("watermark_epoch_in_db"),
625 "Last epoch high watermark this committer wrote to the DB",
626 &["pipeline"],
627 registry,
628 )
629 .unwrap(),
630 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
631 name("watermark_checkpoint_in_db"),
632 "Last checkpoint high watermark this committer wrote to the DB",
633 &["pipeline"],
634 registry,
635 )
636 .unwrap(),
637 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
638 name("watermark_transaction_in_db"),
639 "Last transaction high watermark this committer wrote to the DB",
640 &["pipeline"],
641 registry,
642 )
643 .unwrap(),
644 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
645 name("watermark_timestamp_ms_in_db"),
646 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
647 &["pipeline"],
648 registry,
649 )
650 .unwrap(),
651 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
652 name("watermark_reader_lo_in_db"),
653 "Last reader low watermark this pruner wrote to the DB",
654 &["pipeline"],
655 registry,
656 )
657 .unwrap(),
658 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
659 name("watermark_pruner_hi_in_db"),
660 "Last pruner high watermark this pruner wrote to the DB",
661 &["pipeline"],
662 registry,
663 )
664 .unwrap(),
665 })
666 }
667}
668
669impl CheckpointLagMetricReporter {
670 pub fn new(
671 checkpoint_time_lag_histogram: Histogram,
672 latest_checkpoint_time_lag_gauge: IntGauge,
673 latest_checkpoint_sequence_number_gauge: IntGauge,
674 ) -> Arc<Self> {
675 Arc::new(Self {
676 checkpoint_time_lag_histogram,
677 latest_checkpoint_time_lag_gauge,
678 latest_checkpoint_sequence_number_gauge,
679 latest_reported_checkpoint: AtomicU64::new(0),
680 })
681 }
682
683 pub fn new_for_pipeline<P: Processor>(
684 checkpoint_time_lag_histogram: &HistogramVec,
685 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
686 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
687 ) -> Arc<Self> {
688 Self::new(
689 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
690 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
691 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
692 )
693 }
694
695 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
696 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
697 self.checkpoint_time_lag_histogram
698 .observe((lag as f64) / 1000.0);
699
700 let prev = self
701 .latest_reported_checkpoint
702 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
703 if cp_sequence_number > prev {
704 self.latest_checkpoint_sequence_number_gauge
705 .set(cp_sequence_number as i64);
706 self.latest_checkpoint_time_lag_gauge.set(lag);
707 }
708 }
709}
710
711#[cfg(test)]
712pub(crate) mod tests {
713 use std::sync::Arc;
714
715 use prometheus::Registry;
716
717 use super::{IndexerMetrics, IngestionMetrics};
718
719 pub fn test_metrics() -> Arc<IndexerMetrics> {
721 IndexerMetrics::new(None, &Registry::new())
722 }
723
724 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
726 IngestionMetrics::new(None, &Registry::new())
727 }
728}