1use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6use std::time::Duration;
7
8use prometheus::Histogram;
9use prometheus::HistogramVec;
10use prometheus::IntCounter;
11use prometheus::IntCounterVec;
12use prometheus::IntGauge;
13use prometheus::IntGaugeVec;
14use prometheus::Registry;
15use prometheus::register_histogram_vec_with_registry;
16use prometheus::register_histogram_with_registry;
17use prometheus::register_int_counter_vec_with_registry;
18use prometheus::register_int_counter_with_registry;
19use prometheus::register_int_gauge_vec_with_registry;
20use prometheus::register_int_gauge_with_registry;
21use tracing::warn;
22
23use crate::ingestion::error::Error;
24use crate::pipeline::Processor;
25
26const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
28 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
29];
30
31const LAG_SEC_BUCKETS: &[f64] = &[
34 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
35 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
36];
37
38const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
41 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
42];
43
44const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
46 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
47 2000.0, 5000.0, 10000.0,
48];
49
50const BATCH_SIZE_BUCKETS: &[f64] = &[
52 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
53];
54
55#[derive(Clone)]
57pub struct IngestionMetrics {
58 pub total_ingested_checkpoints: IntCounter,
60 pub total_ingested_transactions: IntCounter,
61 pub total_ingested_events: IntCounter,
62 pub total_ingested_objects: IntCounter,
63 pub total_ingested_bytes: IntCounter,
64 pub total_ingested_transient_retries: IntCounterVec,
65 pub total_ingested_not_found_retries: IntCounter,
66 pub total_ingested_permanent_errors: IntCounterVec,
67 pub total_streamed_checkpoints: IntCounter,
68 pub total_skipped_streamed_checkpoints: IntCounter,
69 pub total_out_of_order_streamed_checkpoints: IntCounter,
70 pub total_stream_disconnections: IntCounter,
71 pub total_streaming_connection_failures: IntCounter,
72
73 pub latest_ingested_checkpoint: IntGauge,
75 pub latest_streamed_checkpoint: IntGauge,
76 pub latest_skipped_streamed_checkpoint: IntGauge,
77 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
78 pub ingested_checkpoint_timestamp_lag: Histogram,
79
80 pub ingested_checkpoint_latency: Histogram,
81
82 pub ingestion_concurrency_limit: IntGauge,
83 pub ingestion_concurrency_inflight: IntGauge,
84}
85
86#[derive(Clone)]
87pub struct IndexerMetrics {
88 pub total_handler_checkpoints_received: IntCounterVec,
90 pub total_handler_processor_retries: IntCounterVec,
91 pub total_handler_checkpoints_processed: IntCounterVec,
92 pub total_handler_rows_created: IntCounterVec,
93
94 pub latest_processed_checkpoint: IntGaugeVec,
95 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
96 pub processed_checkpoint_timestamp_lag: HistogramVec,
97
98 pub handler_checkpoint_latency: HistogramVec,
99
100 pub processor_concurrency_limit: IntGaugeVec,
101 pub processor_concurrency_inflight: IntGaugeVec,
102
103 pub total_collector_checkpoints_received: IntCounterVec,
105 pub total_collector_rows_received: IntCounterVec,
106 pub total_collector_batches_created: IntCounterVec,
107 pub total_committer_batches_attempted: IntCounterVec,
108 pub total_committer_batches_succeeded: IntCounterVec,
109 pub total_committer_batches_failed: IntCounterVec,
110 pub total_committer_rows_committed: IntCounterVec,
111 pub total_committer_rows_affected: IntCounterVec,
112 pub total_watermarks_out_of_order: IntCounterVec,
113 pub total_pruner_chunks_attempted: IntCounterVec,
114 pub total_pruner_chunks_deleted: IntCounterVec,
115 pub total_pruner_rows_deleted: IntCounterVec,
116
117 pub latest_collected_checkpoint: IntGaugeVec,
119 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
120 pub collected_checkpoint_timestamp_lag: HistogramVec,
121
122 pub latest_partially_committed_checkpoint: IntGaugeVec,
127 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
128 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
129
130 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
135 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
136
137 pub collector_gather_latency: HistogramVec,
138 pub collector_batch_size: HistogramVec,
139 pub total_collector_skipped_checkpoints: IntCounterVec,
140 pub collector_reader_lo: IntGaugeVec,
141 pub committer_commit_latency: HistogramVec,
142 pub committer_tx_rows: HistogramVec,
143 pub watermark_gather_latency: HistogramVec,
144 pub watermark_commit_latency: HistogramVec,
145 pub watermark_pruner_read_latency: HistogramVec,
146 pub watermark_pruner_write_latency: HistogramVec,
147 pub pruner_delete_latency: HistogramVec,
148
149 pub watermark_epoch: IntGaugeVec,
150 pub watermark_checkpoint: IntGaugeVec,
151 pub watermark_transaction: IntGaugeVec,
152 pub watermark_timestamp_ms: IntGaugeVec,
153 pub watermark_reader_lo: IntGaugeVec,
154 pub watermark_pruner_hi: IntGaugeVec,
155
156 pub watermark_epoch_in_db: IntGaugeVec,
157 pub watermark_checkpoint_in_db: IntGaugeVec,
158 pub watermark_transaction_in_db: IntGaugeVec,
159 pub watermark_timestamp_in_db_ms: IntGaugeVec,
160 pub watermark_reader_lo_in_db: IntGaugeVec,
161 pub watermark_pruner_hi_in_db: IntGaugeVec,
162}
163
164pub(crate) struct CheckpointLagMetricReporter {
166 checkpoint_time_lag_histogram: Histogram,
168 latest_checkpoint_time_lag_gauge: IntGauge,
171 latest_checkpoint_sequence_number_gauge: IntGauge,
173 latest_reported_checkpoint: AtomicU64,
175}
176
177impl IngestionMetrics {
178 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
179 let prefix = prefix.unwrap_or("indexer");
180 let name = |n| format!("{prefix}_{n}");
181 Arc::new(Self {
182 total_ingested_checkpoints: register_int_counter_with_registry!(
183 name("total_ingested_checkpoints"),
184 "Total number of checkpoints fetched from the remote store",
185 registry,
186 )
187 .unwrap(),
188 total_ingested_transactions: register_int_counter_with_registry!(
189 name("total_ingested_transactions"),
190 "Total number of transactions fetched from the remote store",
191 registry,
192 )
193 .unwrap(),
194 total_ingested_events: register_int_counter_with_registry!(
195 name("total_ingested_events"),
196 "Total number of events fetched from the remote store",
197 registry,
198 )
199 .unwrap(),
200 total_ingested_objects: register_int_counter_with_registry!(
201 name("total_ingested_objects"),
202 "Total number of objects in checkpoints fetched from the remote store",
203 registry,
204 )
205 .unwrap(),
206 total_ingested_bytes: register_int_counter_with_registry!(
207 name("total_ingested_bytes"),
208 "Total number of bytes fetched from the remote store, this metric will not \
209 be updated when data are fetched over gRPC.",
210 registry,
211 )
212 .unwrap(),
213 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
214 name("total_ingested_retries"),
215 "Total number of retries due to transient errors while fetching data from the \
216 remote store",
217 &["reason"],
218 registry,
219 )
220 .unwrap(),
221 total_ingested_not_found_retries: register_int_counter_with_registry!(
222 name("total_ingested_not_found_retries"),
223 "Total number of retries due to the not found errors while fetching data from the \
224 remote store",
225 registry,
226 )
227 .unwrap(),
228 total_ingested_permanent_errors: register_int_counter_vec_with_registry!(
229 name("total_ingested_permanent_errors"),
230 "Total number of permanent errors encountered while fetching data from the \
231 remote store, which cause the ingestion service to shutdown",
232 &["reason"],
233 registry,
234 )
235 .unwrap(),
236 total_streamed_checkpoints: register_int_counter_with_registry!(
237 name("total_streamed_checkpoints"),
238 "Total number of checkpoints received from gRPC streaming",
239 registry,
240 )
241 .unwrap(),
242 total_skipped_streamed_checkpoints: register_int_counter_with_registry!(
243 name("total_skipped_streamed_checkpoints"),
244 "Total number of streamed checkpoints skipped because they were already processed",
245 registry,
246 )
247 .unwrap(),
248 total_out_of_order_streamed_checkpoints: register_int_counter_with_registry!(
249 name("total_out_of_order_streamed_checkpoints"),
250 "Total number of streamed checkpoints received out of order",
251 registry,
252 )
253 .unwrap(),
254 total_stream_disconnections: register_int_counter_with_registry!(
255 name("total_stream_disconnections"),
256 "Total number of times the gRPC stream was disconnected",
257 registry,
258 )
259 .unwrap(),
260 total_streaming_connection_failures: register_int_counter_with_registry!(
261 name("total_streaming_connection_failures"),
262 "Total number of failures due to streaming service connection or peek failures",
263 registry,
264 )
265 .unwrap(),
266 latest_ingested_checkpoint: register_int_gauge_with_registry!(
267 name("latest_ingested_checkpoint"),
268 "Latest checkpoint sequence number fetched from the remote store",
269 registry,
270 )
271 .unwrap(),
272 latest_streamed_checkpoint: register_int_gauge_with_registry!(
273 name("latest_streamed_checkpoint"),
274 "Latest checkpoint sequence number received from gRPC streaming",
275 registry,
276 )
277 .unwrap(),
278 latest_skipped_streamed_checkpoint: register_int_gauge_with_registry!(
279 name("latest_skipped_streamed_checkpoint"),
280 "Latest streamed checkpoint sequence number skipped because it was already processed",
281 registry,
282 )
283 .unwrap(),
284 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
285 name("latest_ingested_checkpoint_timestamp_lag_ms"),
286 "Difference between the system timestamp when the latest checkpoint was fetched and the \
287 timestamp in the checkpoint, in milliseconds",
288 registry,
289 )
290 .unwrap(),
291 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
292 name("ingested_checkpoint_timestamp_lag"),
293 "Difference between the system timestamp when a checkpoint was fetched and the \
294 timestamp in each checkpoint, in seconds",
295 LAG_SEC_BUCKETS.to_vec(),
296 registry,
297 )
298 .unwrap(),
299 ingested_checkpoint_latency: register_histogram_with_registry!(
300 name("ingested_checkpoint_latency"),
301 "Time taken to fetch a checkpoint from the remote store, including retries",
302 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
303 registry,
304 )
305 .unwrap(),
306 ingestion_concurrency_limit: register_int_gauge_with_registry!(
307 name("ingestion_concurrency_limit"),
308 "Current adaptive concurrency limit for checkpoint ingestion",
309 registry,
310 )
311 .unwrap(),
312 ingestion_concurrency_inflight: register_int_gauge_with_registry!(
313 name("ingestion_concurrency_inflight"),
314 "Current number of in-flight checkpoint ingestion tasks",
315 registry,
316 )
317 .unwrap(),
318 })
319 }
320
321 pub(crate) fn inc_retry(
324 &self,
325 checkpoint: u64,
326 reason: &str,
327 error: Error,
328 ) -> backoff::Error<Error> {
329 warn!(checkpoint, reason, "Retrying due to error: {error}");
330
331 self.total_ingested_transient_retries
332 .with_label_values(&[reason])
333 .inc();
334
335 backoff::Error::transient(error)
336 }
337}
338
339impl IndexerMetrics {
340 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
341 let prefix = prefix.unwrap_or("indexer");
342 let name = |n| format!("{prefix}_{n}");
343 Arc::new(Self {
344 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
345 name("total_handler_checkpoints_received"),
346 "Total number of checkpoints received by this handler",
347 &["pipeline"],
348 registry,
349 )
350 .unwrap(),
351 total_handler_processor_retries: register_int_counter_vec_with_registry!(
352 name("total_handler_processor_retries"),
353 "Total number of handler retries after transient processing failures",
354 &["pipeline"],
355 registry,
356 )
357 .unwrap(),
358 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
359 name("total_handler_checkpoints_processed"),
360 "Total number of checkpoints processed (converted into rows) by this handler",
361 &["pipeline"],
362 registry,
363 )
364 .unwrap(),
365 total_handler_rows_created: register_int_counter_vec_with_registry!(
366 name("total_handler_rows_created"),
367 "Total number of rows created by this handler",
368 &["pipeline"],
369 registry,
370 )
371 .unwrap(),
372 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
373 name("latest_processed_checkpoint"),
374 "Latest checkpoint sequence number processed by this handler",
375 &["pipeline"],
376 registry,
377 )
378 .unwrap(),
379 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
380 name("latest_processed_checkpoint_timestamp_lag_ms"),
381 "Difference between the system timestamp when the latest checkpoint was processed and the \
382 timestamp in the checkpoint, in milliseconds",
383 &["pipeline"],
384 registry,
385 )
386 .unwrap(),
387 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
388 name("processed_checkpoint_timestamp_lag"),
389 "Difference between the system timestamp when a checkpoint was processed and the \
390 timestamp in each checkpoint, in seconds",
391 &["pipeline"],
392 LAG_SEC_BUCKETS.to_vec(),
393 registry,
394 )
395 .unwrap(),
396 handler_checkpoint_latency: register_histogram_vec_with_registry!(
397 name("handler_checkpoint_latency"),
398 "Time taken to process a checkpoint by this handler",
399 &["pipeline"],
400 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
401 registry,
402 )
403 .unwrap(),
404 processor_concurrency_limit: register_int_gauge_vec_with_registry!(
405 name("processor_concurrency_limit"),
406 "Current adaptive concurrency limit for this processor",
407 &["pipeline"],
408 registry,
409 )
410 .unwrap(),
411 processor_concurrency_inflight: register_int_gauge_vec_with_registry!(
412 name("processor_concurrency_inflight"),
413 "Current number of in-flight processor tasks for this pipeline",
414 &["pipeline"],
415 registry,
416 )
417 .unwrap(),
418 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
419 name("total_collector_checkpoints_received"),
420 "Total number of checkpoints received by this collector",
421 &["pipeline"],
422 registry,
423 )
424 .unwrap(),
425 total_collector_rows_received: register_int_counter_vec_with_registry!(
426 name("total_collector_rows_received"),
427 "Total number of rows received by this collector",
428 &["pipeline"],
429 registry,
430 )
431 .unwrap(),
432 total_collector_batches_created: register_int_counter_vec_with_registry!(
433 name("total_collector_batches_created"),
434 "Total number of batches created by this collector",
435 &["pipeline"],
436 registry,
437 )
438 .unwrap(),
439 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
440 name("total_committer_batches_attempted"),
441 "Total number of batches writes attempted by this committer",
442 &["pipeline"],
443 registry,
444 )
445 .unwrap(),
446 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
447 name("total_committer_batches_succeeded"),
448 "Total number of successful batches writes by this committer",
449 &["pipeline"],
450 registry,
451 )
452 .unwrap(),
453 total_committer_batches_failed: register_int_counter_vec_with_registry!(
454 name("total_committer_batches_failed"),
455 "Total number of failed batches writes by this committer",
456 &["pipeline"],
457 registry,
458 )
459 .unwrap(),
460 total_committer_rows_committed: register_int_counter_vec_with_registry!(
461 name("total_committer_rows_committed"),
462 "Total number of rows sent to the database by this committer",
463 &["pipeline"],
464 registry,
465 )
466 .unwrap(),
467 total_committer_rows_affected: register_int_counter_vec_with_registry!(
468 name("total_committer_rows_affected"),
469 "Total number of rows actually written to the database by this committer",
470 &["pipeline"],
471 registry,
472 )
473 .unwrap(),
474 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
475 name("watermark_out_of_order"),
476 "Number of times this committer encountered a batch for a checkpoint before its watermark",
477 &["pipeline"],
478 registry,
479 )
480 .unwrap(),
481 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
482 name("pruner_chunks_attempted"),
483 "Number of chunks this pruner attempted to delete",
484 &["pipeline"],
485 registry,
486 )
487 .unwrap(),
488 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
489 name("pruner_chunks_deleted"),
490 "Number of chunks this pruner successfully deleted",
491 &["pipeline"],
492 registry,
493 )
494 .unwrap(),
495 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
496 name("pruner_rows_deleted"),
497 "Number of rows this pruner successfully deleted",
498 &["pipeline"],
499 registry,
500 )
501 .unwrap(),
502 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
503 name("latest_collected_checkpoint"),
504 "Latest checkpoint sequence number collected by this collector",
505 &["pipeline"],
506 registry,
507 )
508 .unwrap(),
509 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
510 name("latest_collected_checkpoint_timestamp_lag_ms"),
511 "Difference between the system timestamp when the latest checkpoint was collected and the \
512 timestamp in the checkpoint, in milliseconds",
513 &["pipeline"],
514 registry,
515 )
516 .unwrap(),
517 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
518 name("collected_checkpoint_timestamp_lag"),
519 "Difference between the system timestamp when a checkpoint was collected and the \
520 timestamp in each checkpoint, in seconds",
521 &["pipeline"],
522 LAG_SEC_BUCKETS.to_vec(),
523 registry,
524 )
525 .unwrap(),
526 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
527 name("latest_partially_committed_checkpoint"),
528 "Latest checkpoint sequence number partially committed by this collector",
529 &["pipeline"],
530 registry,
531 )
532 .unwrap(),
533 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
534 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
535 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
536 timestamp in the checkpoint, in milliseconds",
537 &["pipeline"],
538 registry,
539 )
540 .unwrap(),
541 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
542 name("partially_committed_checkpoint_timestamp_lag"),
543 "Difference between the system timestamp when a checkpoint was partially committed and the \
544 timestamp in each checkpoint, in seconds",
545 &["pipeline"],
546 LAG_SEC_BUCKETS.to_vec(),
547 registry,
548 )
549 .unwrap(),
550 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
551 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
552 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
553 timestamp in the checkpoint, in milliseconds",
554 &["pipeline"],
555 registry,
556 )
557 .unwrap(),
558 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
559 name("watermarked_checkpoint_timestamp_lag"),
560 "Difference between the system timestamp when a checkpoint was watermarked and the \
561 timestamp in each checkpoint, in seconds",
562 &["pipeline"],
563 LAG_SEC_BUCKETS.to_vec(),
564 registry,
565 )
566 .unwrap(),
567 collector_gather_latency: register_histogram_vec_with_registry!(
568 name("collector_gather_latency"),
569 "Time taken to gather rows into a batch by this collector",
570 &["pipeline"],
571 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
572 registry,
573 )
574 .unwrap(),
575 collector_batch_size: register_histogram_vec_with_registry!(
576 name("collector_batch_size"),
577 "Number of rows in a batch written to the database by this collector",
578 &["pipeline"],
579 BATCH_SIZE_BUCKETS.to_vec(),
580 registry,
581 )
582 .unwrap(),
583 total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
584 name("total_collector_skipped_checkpoints"),
585 "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
586 &["pipeline"],
587 registry,
588 ).unwrap(),
589 collector_reader_lo: register_int_gauge_vec_with_registry!(
590 name("collector_reader_lo"),
591 "Reader low watermark as observed by the collector",
592 &["pipeline"],
593 registry,
594 )
595 .unwrap(),
596 committer_commit_latency: register_histogram_vec_with_registry!(
597 name("committer_commit_latency"),
598 "Time taken to write a batch of rows to the database by this committer",
599 &["pipeline"],
600 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
601 registry,
602 )
603 .unwrap(),
604 committer_tx_rows: register_histogram_vec_with_registry!(
605 name("committer_tx_rows"),
606 "Number of rows written to the database in a single database transaction by this committer",
607 &["pipeline"],
608 BATCH_SIZE_BUCKETS.to_vec(),
609 registry,
610 )
611 .unwrap(),
612 watermark_gather_latency: register_histogram_vec_with_registry!(
613 name("watermark_gather_latency"),
614 "Time taken to calculate the new high watermark after a write by this committer",
615 &["pipeline"],
616 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
617 registry,
618 )
619 .unwrap(),
620 watermark_commit_latency: register_histogram_vec_with_registry!(
621 name("watermark_commit_latency"),
622 "Time taken to write the new high watermark to the database by this committer",
623 &["pipeline"],
624 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
625 registry,
626 )
627 .unwrap(),
628 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
629 name("watermark_pruner_read_latency"),
630 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
631 &["pipeline"],
632 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
633 registry,
634 )
635 .unwrap(),
636 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
637 name("watermark_pruner_write_latency"),
638 "Time taken to write the pruner's new upperbound to the database by this pruner",
639 &["pipeline"],
640 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
641 registry,
642 )
643 .unwrap(),
644 pruner_delete_latency: register_histogram_vec_with_registry!(
645 name("pruner_delete_latency"),
646 "Time taken to delete a chunk of data from the database by this pruner",
647 &["pipeline"],
648 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
649 registry,
650 )
651 .unwrap(),
652 watermark_epoch: register_int_gauge_vec_with_registry!(
653 name("watermark_epoch"),
654 "Current epoch high watermark for this committer",
655 &["pipeline"],
656 registry,
657 )
658 .unwrap(),
659 watermark_checkpoint: register_int_gauge_vec_with_registry!(
660 name("watermark_checkpoint"),
661 "Current checkpoint high watermark for this committer",
662 &["pipeline"],
663 registry,
664 )
665 .unwrap(),
666 watermark_transaction: register_int_gauge_vec_with_registry!(
667 name("watermark_transaction"),
668 "Current transaction high watermark for this committer",
669 &["pipeline"],
670 registry,
671 )
672 .unwrap(),
673 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
674 name("watermark_timestamp_ms"),
675 "Current timestamp high watermark for this committer, in milliseconds",
676 &["pipeline"],
677 registry,
678 )
679 .unwrap(),
680 watermark_reader_lo: register_int_gauge_vec_with_registry!(
681 name("watermark_reader_lo"),
682 "Current reader low watermark for this pruner",
683 &["pipeline"],
684 registry,
685 )
686 .unwrap(),
687 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
688 name("watermark_pruner_hi"),
689 "Current pruner high watermark for this pruner",
690 &["pipeline"],
691 registry,
692 )
693 .unwrap(),
694 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
695 name("watermark_epoch_in_db"),
696 "Last epoch high watermark this committer wrote to the DB",
697 &["pipeline"],
698 registry,
699 )
700 .unwrap(),
701 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
702 name("watermark_checkpoint_in_db"),
703 "Last checkpoint high watermark this committer wrote to the DB",
704 &["pipeline"],
705 registry,
706 )
707 .unwrap(),
708 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
709 name("watermark_transaction_in_db"),
710 "Last transaction high watermark this committer wrote to the DB",
711 &["pipeline"],
712 registry,
713 )
714 .unwrap(),
715 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
716 name("watermark_timestamp_ms_in_db"),
717 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
718 &["pipeline"],
719 registry,
720 )
721 .unwrap(),
722 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
723 name("watermark_reader_lo_in_db"),
724 "Last reader low watermark this pruner wrote to the DB",
725 &["pipeline"],
726 registry,
727 )
728 .unwrap(),
729 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
730 name("watermark_pruner_hi_in_db"),
731 "Last pruner high watermark this pruner wrote to the DB",
732 &["pipeline"],
733 registry,
734 )
735 .unwrap(),
736 })
737 }
738
739 pub(crate) fn inc_processor_retry<P: Processor>(
740 &self,
741 checkpoint: u64,
742 error: &anyhow::Error,
743 delay: Duration,
744 ) {
745 warn!(
746 pipeline = P::NAME,
747 checkpoint,
748 retry_delay_ms = delay.as_millis(),
749 "Retrying processor after error: {error:?}",
750 );
751
752 self.total_handler_processor_retries
753 .with_label_values(&[P::NAME])
754 .inc();
755 }
756}
757
758impl CheckpointLagMetricReporter {
759 pub fn new(
760 checkpoint_time_lag_histogram: Histogram,
761 latest_checkpoint_time_lag_gauge: IntGauge,
762 latest_checkpoint_sequence_number_gauge: IntGauge,
763 ) -> Arc<Self> {
764 Arc::new(Self {
765 checkpoint_time_lag_histogram,
766 latest_checkpoint_time_lag_gauge,
767 latest_checkpoint_sequence_number_gauge,
768 latest_reported_checkpoint: AtomicU64::new(0),
769 })
770 }
771
772 pub fn new_for_pipeline<P: Processor>(
773 checkpoint_time_lag_histogram: &HistogramVec,
774 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
775 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
776 ) -> Arc<Self> {
777 Self::new(
778 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
779 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
780 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
781 )
782 }
783
784 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
785 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
786 self.checkpoint_time_lag_histogram
787 .observe((lag as f64) / 1000.0);
788
789 let prev = self
790 .latest_reported_checkpoint
791 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
792 if cp_sequence_number > prev {
793 self.latest_checkpoint_sequence_number_gauge
794 .set(cp_sequence_number as i64);
795 self.latest_checkpoint_time_lag_gauge.set(lag);
796 }
797 }
798}
799
800#[cfg(test)]
801pub(crate) mod tests {
802 use std::sync::Arc;
803
804 use prometheus::Registry;
805
806 use super::*;
807
808 pub fn test_metrics() -> Arc<IndexerMetrics> {
810 IndexerMetrics::new(None, &Registry::new())
811 }
812
813 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
815 IngestionMetrics::new(None, &Registry::new())
816 }
817}