1use std::sync::{Arc, atomic::AtomicU64};
5
6use prometheus::{
7 Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
8 register_histogram_vec_with_registry, register_histogram_with_registry,
9 register_int_counter_vec_with_registry, register_int_counter_with_registry,
10 register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
11};
12use tracing::warn;
13
14use crate::{ingestion::error::Error, pipeline::Processor};
15
16const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
18 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
19];
20
21const LAG_SEC_BUCKETS: &[f64] = &[
24 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
25 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
26];
27
28const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
31 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
32];
33
34const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
36 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
37 2000.0, 5000.0, 10000.0,
38];
39
40const BATCH_SIZE_BUCKETS: &[f64] = &[
42 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
43];
44
45#[derive(Clone)]
47pub struct IngestionMetrics {
48 pub total_ingested_checkpoints: IntCounter,
50 pub total_ingested_transactions: IntCounter,
51 pub total_ingested_events: IntCounter,
52 pub total_ingested_objects: IntCounter,
53 pub total_ingested_bytes: IntCounter,
54 pub total_ingested_transient_retries: IntCounterVec,
55 pub total_ingested_not_found_retries: IntCounter,
56 pub total_ingested_permanent_errors: IntCounterVec,
57 pub total_streamed_checkpoints: IntCounter,
58 pub total_stream_disconnections: IntCounter,
59 pub total_streaming_connection_failures: IntCounter,
60
61 pub latest_ingested_checkpoint: IntGauge,
63 pub latest_streamed_checkpoint: IntGauge,
64 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
65 pub ingested_checkpoint_timestamp_lag: Histogram,
66
67 pub ingested_checkpoint_latency: Histogram,
68}
69
70#[derive(Clone)]
71pub struct IndexerMetrics {
72 pub total_handler_checkpoints_received: IntCounterVec,
74 pub total_handler_checkpoints_processed: IntCounterVec,
75 pub total_handler_rows_created: IntCounterVec,
76
77 pub latest_processed_checkpoint: IntGaugeVec,
78 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
79 pub processed_checkpoint_timestamp_lag: HistogramVec,
80
81 pub handler_checkpoint_latency: HistogramVec,
82
83 pub total_collector_checkpoints_received: IntCounterVec,
85 pub total_collector_rows_received: IntCounterVec,
86 pub total_collector_batches_created: IntCounterVec,
87 pub total_committer_batches_attempted: IntCounterVec,
88 pub total_committer_batches_succeeded: IntCounterVec,
89 pub total_committer_batches_failed: IntCounterVec,
90 pub total_committer_rows_committed: IntCounterVec,
91 pub total_committer_rows_affected: IntCounterVec,
92 pub total_watermarks_out_of_order: IntCounterVec,
93 pub total_pruner_chunks_attempted: IntCounterVec,
94 pub total_pruner_chunks_deleted: IntCounterVec,
95 pub total_pruner_rows_deleted: IntCounterVec,
96
97 pub latest_collected_checkpoint: IntGaugeVec,
99 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
100 pub collected_checkpoint_timestamp_lag: HistogramVec,
101
102 pub latest_partially_committed_checkpoint: IntGaugeVec,
107 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
108 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
109
110 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
115 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
116
117 pub collector_gather_latency: HistogramVec,
118 pub collector_batch_size: HistogramVec,
119 pub total_collector_skipped_checkpoints: IntCounterVec,
120 pub committer_commit_latency: HistogramVec,
121 pub committer_tx_rows: HistogramVec,
122 pub watermark_gather_latency: HistogramVec,
123 pub watermark_commit_latency: HistogramVec,
124 pub watermark_pruner_read_latency: HistogramVec,
125 pub watermark_pruner_write_latency: HistogramVec,
126 pub pruner_delete_latency: HistogramVec,
127
128 pub watermark_epoch: IntGaugeVec,
129 pub watermark_checkpoint: IntGaugeVec,
130 pub watermark_transaction: IntGaugeVec,
131 pub watermark_timestamp_ms: IntGaugeVec,
132 pub watermark_reader_lo: IntGaugeVec,
133 pub watermark_pruner_hi: IntGaugeVec,
134
135 pub watermark_epoch_in_db: IntGaugeVec,
136 pub watermark_checkpoint_in_db: IntGaugeVec,
137 pub watermark_transaction_in_db: IntGaugeVec,
138 pub watermark_timestamp_in_db_ms: IntGaugeVec,
139 pub watermark_reader_lo_in_db: IntGaugeVec,
140 pub watermark_pruner_hi_in_db: IntGaugeVec,
141}
142
143pub(crate) struct CheckpointLagMetricReporter {
145 checkpoint_time_lag_histogram: Histogram,
147 latest_checkpoint_time_lag_gauge: IntGauge,
150 latest_checkpoint_sequence_number_gauge: IntGauge,
152 latest_reported_checkpoint: AtomicU64,
154}
155
156impl IngestionMetrics {
157 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
158 let prefix = prefix.unwrap_or("indexer");
159 let name = |n| format!("{prefix}_{n}");
160 Arc::new(Self {
161 total_ingested_checkpoints: register_int_counter_with_registry!(
162 name("total_ingested_checkpoints"),
163 "Total number of checkpoints fetched from the remote store",
164 registry,
165 )
166 .unwrap(),
167 total_ingested_transactions: register_int_counter_with_registry!(
168 name("total_ingested_transactions"),
169 "Total number of transactions fetched from the remote store",
170 registry,
171 )
172 .unwrap(),
173 total_ingested_events: register_int_counter_with_registry!(
174 name("total_ingested_events"),
175 "Total number of events fetched from the remote store",
176 registry,
177 )
178 .unwrap(),
179 total_ingested_objects: register_int_counter_with_registry!(
180 name("total_ingested_objects"),
181 "Total number of objects in checkpoints fetched from the remote store",
182 registry,
183 )
184 .unwrap(),
185 total_ingested_bytes: register_int_counter_with_registry!(
186 name("total_ingested_bytes"),
187 "Total number of bytes fetched from the remote store, this metric will not \
188 be updated when data are fetched over gRPC.",
189 registry,
190 )
191 .unwrap(),
192 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
193 name("total_ingested_retries"),
194 "Total number of retries due to transient errors while fetching data from the \
195 remote store",
196 &["reason"],
197 registry,
198 )
199 .unwrap(),
200 total_ingested_not_found_retries: register_int_counter_with_registry!(
201 name("total_ingested_not_found_retries"),
202 "Total number of retries due to the not found errors while fetching data from the \
203 remote store",
204 registry,
205 )
206 .unwrap(),
207 total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
208 name("total_ingested_permanent_errors"),
209 "Total number of permanent errors encountered while fetching data from the \
210 remote store, which cause the ingestion service to shutdown",
211 &["reason"],
212 registry,
213 )
214 .unwrap(),
215 total_streamed_checkpoints: register_int_counter_with_registry!(
216 name("total_streamed_checkpoints"),
217 "Total number of checkpoints received from gRPC streaming",
218 registry,
219 )
220 .unwrap(),
221 total_stream_disconnections: register_int_counter_with_registry!(
222 name("total_stream_disconnections"),
223 "Total number of times the gRPC stream was disconnected",
224 registry,
225 )
226 .unwrap(),
227 total_streaming_connection_failures: register_int_counter_with_registry!(
228 name("total_streaming_connection_failures"),
229 "Total number of failures due to streaming service connection or peek failures",
230 registry,
231 )
232 .unwrap(),
233 latest_ingested_checkpoint: register_int_gauge_with_registry!(
234 name("latest_ingested_checkpoint"),
235 "Latest checkpoint sequence number fetched from the remote store",
236 registry,
237 )
238 .unwrap(),
239 latest_streamed_checkpoint: register_int_gauge_with_registry!(
240 name("latest_streamed_checkpoint"),
241 "Latest checkpoint sequence number received from gRPC streaming",
242 registry,
243 )
244 .unwrap(),
245 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
246 name("latest_ingested_checkpoint_timestamp_lag_ms"),
247 "Difference between the system timestamp when the latest checkpoint was fetched and the \
248 timestamp in the checkpoint, in milliseconds",
249 registry,
250 )
251 .unwrap(),
252 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
253 name("ingested_checkpoint_timestamp_lag"),
254 "Difference between the system timestamp when a checkpoint was fetched and the \
255 timestamp in each checkpoint, in seconds",
256 LAG_SEC_BUCKETS.to_vec(),
257 registry,
258 )
259 .unwrap(),
260 ingested_checkpoint_latency: register_histogram_with_registry!(
261 name("ingested_checkpoint_latency"),
262 "Time taken to fetch a checkpoint from the remote store, including retries",
263 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
264 registry,
265 )
266 .unwrap(),
267 })
268 }
269
270 pub(crate) fn inc_retry(
273 &self,
274 checkpoint: u64,
275 reason: &str,
276 error: Error,
277 ) -> backoff::Error<Error> {
278 warn!(checkpoint, reason, "Retrying due to error: {error}");
279
280 self.total_ingested_transient_retries
281 .with_label_values(&[reason])
282 .inc();
283
284 backoff::Error::transient(error)
285 }
286}
287
288impl IndexerMetrics {
289 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
290 let prefix = prefix.unwrap_or("indexer");
291 let name = |n| format!("{prefix}_{n}");
292 Arc::new(Self {
293 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
294 name("total_handler_checkpoints_received"),
295 "Total number of checkpoints received by this handler",
296 &["pipeline"],
297 registry,
298 )
299 .unwrap(),
300 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
301 name("total_handler_checkpoints_processed"),
302 "Total number of checkpoints processed (converted into rows) by this handler",
303 &["pipeline"],
304 registry,
305 )
306 .unwrap(),
307 total_handler_rows_created: register_int_counter_vec_with_registry!(
308 name("total_handler_rows_created"),
309 "Total number of rows created by this handler",
310 &["pipeline"],
311 registry,
312 )
313 .unwrap(),
314 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
315 name("latest_processed_checkpoint"),
316 "Latest checkpoint sequence number processed by this handler",
317 &["pipeline"],
318 registry,
319 )
320 .unwrap(),
321 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
322 name("latest_processed_checkpoint_timestamp_lag_ms"),
323 "Difference between the system timestamp when the latest checkpoint was processed and the \
324 timestamp in the checkpoint, in milliseconds",
325 &["pipeline"],
326 registry,
327 )
328 .unwrap(),
329 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
330 name("processed_checkpoint_timestamp_lag"),
331 "Difference between the system timestamp when a checkpoint was processed and the \
332 timestamp in each checkpoint, in seconds",
333 &["pipeline"],
334 LAG_SEC_BUCKETS.to_vec(),
335 registry,
336 )
337 .unwrap(),
338 handler_checkpoint_latency: register_histogram_vec_with_registry!(
339 name("handler_checkpoint_latency"),
340 "Time taken to process a checkpoint by this handler",
341 &["pipeline"],
342 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
343 registry,
344 )
345 .unwrap(),
346 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
347 name("total_collector_checkpoints_received"),
348 "Total number of checkpoints received by this collector",
349 &["pipeline"],
350 registry,
351 )
352 .unwrap(),
353 total_collector_rows_received: register_int_counter_vec_with_registry!(
354 name("total_collector_rows_received"),
355 "Total number of rows received by this collector",
356 &["pipeline"],
357 registry,
358 )
359 .unwrap(),
360 total_collector_batches_created: register_int_counter_vec_with_registry!(
361 name("total_collector_batches_created"),
362 "Total number of batches created by this collector",
363 &["pipeline"],
364 registry,
365 )
366 .unwrap(),
367 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
368 name("total_committer_batches_attempted"),
369 "Total number of batches writes attempted by this committer",
370 &["pipeline"],
371 registry,
372 )
373 .unwrap(),
374 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
375 name("total_committer_batches_succeeded"),
376 "Total number of successful batches writes by this committer",
377 &["pipeline"],
378 registry,
379 )
380 .unwrap(),
381 total_committer_batches_failed: register_int_counter_vec_with_registry!(
382 name("total_committer_batches_failed"),
383 "Total number of failed batches writes by this committer",
384 &["pipeline"],
385 registry,
386 )
387 .unwrap(),
388 total_committer_rows_committed: register_int_counter_vec_with_registry!(
389 name("total_committer_rows_committed"),
390 "Total number of rows sent to the database by this committer",
391 &["pipeline"],
392 registry,
393 )
394 .unwrap(),
395 total_committer_rows_affected: register_int_counter_vec_with_registry!(
396 name("total_committer_rows_affected"),
397 "Total number of rows actually written to the database by this committer",
398 &["pipeline"],
399 registry,
400 )
401 .unwrap(),
402 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
403 name("watermark_out_of_order"),
404 "Number of times this committer encountered a batch for a checkpoint before its watermark",
405 &["pipeline"],
406 registry,
407 )
408 .unwrap(),
409 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
410 name("pruner_chunks_attempted"),
411 "Number of chunks this pruner attempted to delete",
412 &["pipeline"],
413 registry,
414 )
415 .unwrap(),
416 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
417 name("pruner_chunks_deleted"),
418 "Number of chunks this pruner successfully deleted",
419 &["pipeline"],
420 registry,
421 )
422 .unwrap(),
423 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
424 name("pruner_rows_deleted"),
425 "Number of rows this pruner successfully deleted",
426 &["pipeline"],
427 registry,
428 )
429 .unwrap(),
430 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
431 name("latest_collected_checkpoint"),
432 "Latest checkpoint sequence number collected by this collector",
433 &["pipeline"],
434 registry,
435 )
436 .unwrap(),
437 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
438 name("latest_collected_checkpoint_timestamp_lag_ms"),
439 "Difference between the system timestamp when the latest checkpoint was collected and the \
440 timestamp in the checkpoint, in milliseconds",
441 &["pipeline"],
442 registry,
443 )
444 .unwrap(),
445 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
446 name("collected_checkpoint_timestamp_lag"),
447 "Difference between the system timestamp when a checkpoint was collected and the \
448 timestamp in each checkpoint, in seconds",
449 &["pipeline"],
450 LAG_SEC_BUCKETS.to_vec(),
451 registry,
452 )
453 .unwrap(),
454 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
455 name("latest_partially_committed_checkpoint"),
456 "Latest checkpoint sequence number partially committed by this collector",
457 &["pipeline"],
458 registry,
459 )
460 .unwrap(),
461 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
462 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
463 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
464 timestamp in the checkpoint, in milliseconds",
465 &["pipeline"],
466 registry,
467 )
468 .unwrap(),
469 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
470 name("partially_committed_checkpoint_timestamp_lag"),
471 "Difference between the system timestamp when a checkpoint was partially committed and the \
472 timestamp in each checkpoint, in seconds",
473 &["pipeline"],
474 LAG_SEC_BUCKETS.to_vec(),
475 registry,
476 )
477 .unwrap(),
478 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
479 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
480 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
481 timestamp in the checkpoint, in milliseconds",
482 &["pipeline"],
483 registry,
484 )
485 .unwrap(),
486 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
487 name("watermarked_checkpoint_timestamp_lag"),
488 "Difference between the system timestamp when a checkpoint was watermarked and the \
489 timestamp in each checkpoint, in seconds",
490 &["pipeline"],
491 LAG_SEC_BUCKETS.to_vec(),
492 registry,
493 )
494 .unwrap(),
495 collector_gather_latency: register_histogram_vec_with_registry!(
496 name("collector_gather_latency"),
497 "Time taken to gather rows into a batch by this collector",
498 &["pipeline"],
499 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
500 registry,
501 )
502 .unwrap(),
503 collector_batch_size: register_histogram_vec_with_registry!(
504 name("collector_batch_size"),
505 "Number of rows in a batch written to the database by this collector",
506 &["pipeline"],
507 BATCH_SIZE_BUCKETS.to_vec(),
508 registry,
509 )
510 .unwrap(),
511 total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
512 name("total_collector_skipped_checkpoints"),
513 "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
514 &["pipeline"],
515 registry,
516 ).unwrap(),
517 committer_commit_latency: register_histogram_vec_with_registry!(
518 name("committer_commit_latency"),
519 "Time taken to write a batch of rows to the database by this committer",
520 &["pipeline"],
521 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
522 registry,
523 )
524 .unwrap(),
525 committer_tx_rows: register_histogram_vec_with_registry!(
526 name("committer_tx_rows"),
527 "Number of rows written to the database in a single database transaction by this committer",
528 &["pipeline"],
529 BATCH_SIZE_BUCKETS.to_vec(),
530 registry,
531 )
532 .unwrap(),
533 watermark_gather_latency: register_histogram_vec_with_registry!(
534 name("watermark_gather_latency"),
535 "Time taken to calculate the new high watermark after a write by this committer",
536 &["pipeline"],
537 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
538 registry,
539 )
540 .unwrap(),
541 watermark_commit_latency: register_histogram_vec_with_registry!(
542 name("watermark_commit_latency"),
543 "Time taken to write the new high watermark to the database by this committer",
544 &["pipeline"],
545 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
546 registry,
547 )
548 .unwrap(),
549 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
550 name("watermark_pruner_read_latency"),
551 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
552 &["pipeline"],
553 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
554 registry,
555 )
556 .unwrap(),
557 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
558 name("watermark_pruner_write_latency"),
559 "Time taken to write the pruner's new upperbound to the database by this pruner",
560 &["pipeline"],
561 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
562 registry,
563 )
564 .unwrap(),
565 pruner_delete_latency: register_histogram_vec_with_registry!(
566 name("pruner_delete_latency"),
567 "Time taken to delete a chunk of data from the database by this pruner",
568 &["pipeline"],
569 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
570 registry,
571 )
572 .unwrap(),
573 watermark_epoch: register_int_gauge_vec_with_registry!(
574 name("watermark_epoch"),
575 "Current epoch high watermark for this committer",
576 &["pipeline"],
577 registry,
578 )
579 .unwrap(),
580 watermark_checkpoint: register_int_gauge_vec_with_registry!(
581 name("watermark_checkpoint"),
582 "Current checkpoint high watermark for this committer",
583 &["pipeline"],
584 registry,
585 )
586 .unwrap(),
587 watermark_transaction: register_int_gauge_vec_with_registry!(
588 name("watermark_transaction"),
589 "Current transaction high watermark for this committer",
590 &["pipeline"],
591 registry,
592 )
593 .unwrap(),
594 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
595 name("watermark_timestamp_ms"),
596 "Current timestamp high watermark for this committer, in milliseconds",
597 &["pipeline"],
598 registry,
599 )
600 .unwrap(),
601 watermark_reader_lo: register_int_gauge_vec_with_registry!(
602 name("watermark_reader_lo"),
603 "Current reader low watermark for this pruner",
604 &["pipeline"],
605 registry,
606 )
607 .unwrap(),
608 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
609 name("watermark_pruner_hi"),
610 "Current pruner high watermark for this pruner",
611 &["pipeline"],
612 registry,
613 )
614 .unwrap(),
615 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
616 name("watermark_epoch_in_db"),
617 "Last epoch high watermark this committer wrote to the DB",
618 &["pipeline"],
619 registry,
620 )
621 .unwrap(),
622 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
623 name("watermark_checkpoint_in_db"),
624 "Last checkpoint high watermark this committer wrote to the DB",
625 &["pipeline"],
626 registry,
627 )
628 .unwrap(),
629 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
630 name("watermark_transaction_in_db"),
631 "Last transaction high watermark this committer wrote to the DB",
632 &["pipeline"],
633 registry,
634 )
635 .unwrap(),
636 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
637 name("watermark_timestamp_ms_in_db"),
638 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
639 &["pipeline"],
640 registry,
641 )
642 .unwrap(),
643 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
644 name("watermark_reader_lo_in_db"),
645 "Last reader low watermark this pruner wrote to the DB",
646 &["pipeline"],
647 registry,
648 )
649 .unwrap(),
650 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
651 name("watermark_pruner_hi_in_db"),
652 "Last pruner high watermark this pruner wrote to the DB",
653 &["pipeline"],
654 registry,
655 )
656 .unwrap(),
657 })
658 }
659}
660
661impl CheckpointLagMetricReporter {
662 pub fn new(
663 checkpoint_time_lag_histogram: Histogram,
664 latest_checkpoint_time_lag_gauge: IntGauge,
665 latest_checkpoint_sequence_number_gauge: IntGauge,
666 ) -> Arc<Self> {
667 Arc::new(Self {
668 checkpoint_time_lag_histogram,
669 latest_checkpoint_time_lag_gauge,
670 latest_checkpoint_sequence_number_gauge,
671 latest_reported_checkpoint: AtomicU64::new(0),
672 })
673 }
674
675 pub fn new_for_pipeline<P: Processor>(
676 checkpoint_time_lag_histogram: &HistogramVec,
677 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
678 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
679 ) -> Arc<Self> {
680 Self::new(
681 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
682 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
683 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
684 )
685 }
686
687 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
688 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
689 self.checkpoint_time_lag_histogram
690 .observe((lag as f64) / 1000.0);
691
692 let prev = self
693 .latest_reported_checkpoint
694 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
695 if cp_sequence_number > prev {
696 self.latest_checkpoint_sequence_number_gauge
697 .set(cp_sequence_number as i64);
698 self.latest_checkpoint_time_lag_gauge.set(lag);
699 }
700 }
701}
702
703#[cfg(test)]
704pub(crate) mod tests {
705 use std::sync::Arc;
706
707 use prometheus::Registry;
708
709 use super::{IndexerMetrics, IngestionMetrics};
710
711 pub fn test_metrics() -> Arc<IndexerMetrics> {
713 IndexerMetrics::new(None, &Registry::new())
714 }
715
716 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
718 IngestionMetrics::new(None, &Registry::new())
719 }
720}