1use std::sync::Arc;
5use std::sync::atomic::AtomicU64;
6use std::time::Duration;
7
8use prometheus::Histogram;
9use prometheus::HistogramVec;
10use prometheus::IntCounter;
11use prometheus::IntCounterVec;
12use prometheus::IntGauge;
13use prometheus::IntGaugeVec;
14use prometheus::Registry;
15use prometheus::register_histogram_vec_with_registry;
16use prometheus::register_histogram_with_registry;
17use prometheus::register_int_counter_vec_with_registry;
18use prometheus::register_int_counter_with_registry;
19use prometheus::register_int_gauge_vec_with_registry;
20use prometheus::register_int_gauge_with_registry;
21use tracing::warn;
22
23use crate::ingestion::error::Error;
24use crate::pipeline::Processor;
25
26const INGESTION_LATENCY_SEC_BUCKETS: &[f64] = &[
28 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
29];
30
31const LAG_SEC_BUCKETS: &[f64] = &[
34 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9,
35 0.95, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0,
36];
37
38const PROCESSING_LATENCY_SEC_BUCKETS: &[f64] = &[
41 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0,
42];
43
44const DB_UPDATE_LATENCY_SEC_BUCKETS: &[f64] = &[
46 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0,
47 2000.0, 5000.0, 10000.0,
48];
49
50const BATCH_SIZE_BUCKETS: &[f64] = &[
52 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0,
53];
54
55#[derive(Clone)]
57pub struct IngestionMetrics {
58 pub total_ingested_checkpoints: IntCounter,
60 pub total_ingested_transactions: IntCounter,
61 pub total_ingested_events: IntCounter,
62 pub total_ingested_objects: IntCounter,
63 pub total_ingested_bytes: IntCounter,
64 pub total_ingested_transient_retries: IntCounterVec,
65 pub total_ingested_not_found_retries: IntCounter,
66 pub total_streamed_checkpoints: IntCounter,
67 pub total_skipped_streamed_checkpoints: IntCounter,
68 pub total_out_of_order_streamed_checkpoints: IntCounter,
69 pub total_stream_disconnections: IntCounter,
70 pub total_streaming_connection_failures: IntCounter,
71
72 pub latest_ingested_checkpoint: IntGauge,
74 pub latest_streamed_checkpoint: IntGauge,
75 pub latest_skipped_streamed_checkpoint: IntGauge,
76 pub latest_ingested_checkpoint_timestamp_lag_ms: IntGauge,
77 pub ingested_checkpoint_timestamp_lag: Histogram,
78
79 pub ingested_checkpoint_latency: Histogram,
80 pub ingested_chain_id_latency: Histogram,
81 pub ingested_latest_checkpoint_latency: Histogram,
82
83 pub ingestion_concurrency_limit: IntGauge,
84 pub ingestion_concurrency_inflight: IntGauge,
85}
86
87#[derive(Clone)]
88pub struct IndexerMetrics {
89 pub total_handler_checkpoints_received: IntCounterVec,
91 pub total_handler_processor_retries: IntCounterVec,
92 pub total_handler_checkpoints_processed: IntCounterVec,
93 pub total_handler_rows_created: IntCounterVec,
94
95 pub latest_processed_checkpoint: IntGaugeVec,
96 pub latest_processed_checkpoint_timestamp_lag_ms: IntGaugeVec,
97 pub processed_checkpoint_timestamp_lag: HistogramVec,
98
99 pub handler_checkpoint_latency: HistogramVec,
100
101 pub processor_concurrency_limit: IntGaugeVec,
102 pub processor_concurrency_inflight: IntGaugeVec,
103
104 pub total_collector_checkpoints_received: IntCounterVec,
106 pub total_collector_rows_received: IntCounterVec,
107 pub total_collector_batches_created: IntCounterVec,
108 pub total_committer_batches_attempted: IntCounterVec,
109 pub total_committer_batches_succeeded: IntCounterVec,
110 pub total_committer_batches_failed: IntCounterVec,
111 pub total_committer_rows_committed: IntCounterVec,
112 pub total_committer_rows_affected: IntCounterVec,
113 pub total_watermarks_out_of_order: IntCounterVec,
114 pub total_pruner_chunks_attempted: IntCounterVec,
115 pub total_pruner_chunks_deleted: IntCounterVec,
116 pub total_pruner_rows_deleted: IntCounterVec,
117
118 pub latest_collected_checkpoint: IntGaugeVec,
120 pub latest_collected_checkpoint_timestamp_lag_ms: IntGaugeVec,
121 pub collected_checkpoint_timestamp_lag: HistogramVec,
122
123 pub latest_partially_committed_checkpoint: IntGaugeVec,
128 pub latest_partially_committed_checkpoint_timestamp_lag_ms: IntGaugeVec,
129 pub partially_committed_checkpoint_timestamp_lag: HistogramVec,
130
131 pub latest_watermarked_checkpoint_timestamp_lag_ms: IntGaugeVec,
136 pub watermarked_checkpoint_timestamp_lag: HistogramVec,
137
138 pub collector_gather_latency: HistogramVec,
139 pub collector_batch_size: HistogramVec,
140 pub total_collector_skipped_checkpoints: IntCounterVec,
141 pub collector_reader_lo: IntGaugeVec,
142 pub committer_commit_latency: HistogramVec,
143 pub committer_tx_rows: HistogramVec,
144 pub watermark_gather_latency: HistogramVec,
145 pub watermark_commit_latency: HistogramVec,
146 pub watermark_pruner_read_latency: HistogramVec,
147 pub watermark_pruner_write_latency: HistogramVec,
148 pub pruner_delete_latency: HistogramVec,
149
150 pub watermark_epoch: IntGaugeVec,
151 pub watermark_checkpoint: IntGaugeVec,
152 pub watermark_transaction: IntGaugeVec,
153 pub watermark_timestamp_ms: IntGaugeVec,
154 pub watermark_reader_lo: IntGaugeVec,
155 pub watermark_pruner_hi: IntGaugeVec,
156
157 pub watermark_epoch_in_db: IntGaugeVec,
158 pub watermark_checkpoint_in_db: IntGaugeVec,
159 pub watermark_transaction_in_db: IntGaugeVec,
160 pub watermark_timestamp_in_db_ms: IntGaugeVec,
161 pub watermark_reader_lo_in_db: IntGaugeVec,
162 pub watermark_pruner_hi_in_db: IntGaugeVec,
163}
164
165pub(crate) struct CheckpointLagMetricReporter {
167 checkpoint_time_lag_histogram: Histogram,
169 latest_checkpoint_time_lag_gauge: IntGauge,
172 latest_checkpoint_sequence_number_gauge: IntGauge,
174 latest_reported_checkpoint: AtomicU64,
176}
177
178impl IngestionMetrics {
179 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
180 let prefix = prefix.unwrap_or("indexer");
181 let name = |n| format!("{prefix}_{n}");
182 Arc::new(Self {
183 total_ingested_checkpoints: register_int_counter_with_registry!(
184 name("total_ingested_checkpoints"),
185 "Total number of checkpoints fetched from the remote store",
186 registry,
187 )
188 .unwrap(),
189 total_ingested_transactions: register_int_counter_with_registry!(
190 name("total_ingested_transactions"),
191 "Total number of transactions fetched from the remote store",
192 registry,
193 )
194 .unwrap(),
195 total_ingested_events: register_int_counter_with_registry!(
196 name("total_ingested_events"),
197 "Total number of events fetched from the remote store",
198 registry,
199 )
200 .unwrap(),
201 total_ingested_objects: register_int_counter_with_registry!(
202 name("total_ingested_objects"),
203 "Total number of objects in checkpoints fetched from the remote store",
204 registry,
205 )
206 .unwrap(),
207 total_ingested_bytes: register_int_counter_with_registry!(
208 name("total_ingested_bytes"),
209 "Total number of bytes fetched from the remote store",
210 registry,
211 )
212 .unwrap(),
213 total_ingested_transient_retries: register_int_counter_vec_with_registry!(
214 name("total_ingested_retries"),
215 "Total number of retries due to transient errors while fetching data from the \
216 remote store",
217 &["reason"],
218 registry,
219 )
220 .unwrap(),
221 total_ingested_not_found_retries: register_int_counter_with_registry!(
222 name("total_ingested_not_found_retries"),
223 "Total number of retries due to the not found errors while fetching data from the \
224 remote store",
225 registry,
226 )
227 .unwrap(),
228 total_streamed_checkpoints: register_int_counter_with_registry!(
229 name("total_streamed_checkpoints"),
230 "Total number of checkpoints received from gRPC streaming",
231 registry,
232 )
233 .unwrap(),
234 total_skipped_streamed_checkpoints: register_int_counter_with_registry!(
235 name("total_skipped_streamed_checkpoints"),
236 "Total number of streamed checkpoints skipped because they were already processed",
237 registry,
238 )
239 .unwrap(),
240 total_out_of_order_streamed_checkpoints: register_int_counter_with_registry!(
241 name("total_out_of_order_streamed_checkpoints"),
242 "Total number of streamed checkpoints received out of order",
243 registry,
244 )
245 .unwrap(),
246 total_stream_disconnections: register_int_counter_with_registry!(
247 name("total_stream_disconnections"),
248 "Total number of times the gRPC stream was disconnected",
249 registry,
250 )
251 .unwrap(),
252 total_streaming_connection_failures: register_int_counter_with_registry!(
253 name("total_streaming_connection_failures"),
254 "Total number of failures due to streaming service connection or peek failures",
255 registry,
256 )
257 .unwrap(),
258 latest_ingested_checkpoint: register_int_gauge_with_registry!(
259 name("latest_ingested_checkpoint"),
260 "Latest checkpoint sequence number fetched from the remote store",
261 registry,
262 )
263 .unwrap(),
264 latest_streamed_checkpoint: register_int_gauge_with_registry!(
265 name("latest_streamed_checkpoint"),
266 "Latest checkpoint sequence number received from gRPC streaming",
267 registry,
268 )
269 .unwrap(),
270 latest_skipped_streamed_checkpoint: register_int_gauge_with_registry!(
271 name("latest_skipped_streamed_checkpoint"),
272 "Latest streamed checkpoint sequence number skipped because it was already processed",
273 registry,
274 )
275 .unwrap(),
276 latest_ingested_checkpoint_timestamp_lag_ms: register_int_gauge_with_registry!(
277 name("latest_ingested_checkpoint_timestamp_lag_ms"),
278 "Difference between the system timestamp when the latest checkpoint was fetched and the \
279 timestamp in the checkpoint, in milliseconds",
280 registry,
281 )
282 .unwrap(),
283 ingested_checkpoint_timestamp_lag: register_histogram_with_registry!(
284 name("ingested_checkpoint_timestamp_lag"),
285 "Difference between the system timestamp when a checkpoint was fetched and the \
286 timestamp in each checkpoint, in seconds",
287 LAG_SEC_BUCKETS.to_vec(),
288 registry,
289 )
290 .unwrap(),
291 ingested_checkpoint_latency: register_histogram_with_registry!(
292 name("ingested_checkpoint_latency"),
293 "Time taken to fetch a checkpoint from the remote store, including retries",
294 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
295 registry,
296 )
297 .unwrap(),
298 ingested_chain_id_latency: register_histogram_with_registry!(
299 name("ingested_chain_id_latency"),
300 "Time taken to fetch the chain identifier, including retries",
301 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
302 registry,
303 )
304 .unwrap(),
305 ingested_latest_checkpoint_latency: register_histogram_with_registry!(
306 name("ingested_latest_checkpoint_latency"),
307 "Time taken to fetch the latest checkpoint number, including retries",
308 INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
309 registry,
310 )
311 .unwrap(),
312 ingestion_concurrency_limit: register_int_gauge_with_registry!(
313 name("ingestion_concurrency_limit"),
314 "Current adaptive concurrency limit for checkpoint ingestion",
315 registry,
316 )
317 .unwrap(),
318 ingestion_concurrency_inflight: register_int_gauge_with_registry!(
319 name("ingestion_concurrency_inflight"),
320 "Current number of in-flight checkpoint ingestion tasks",
321 registry,
322 )
323 .unwrap(),
324 })
325 }
326
327 pub(crate) fn inc_retry(
330 &self,
331 checkpoint: u64,
332 reason: &str,
333 error: Error,
334 ) -> backoff::Error<Error> {
335 warn!(checkpoint, reason, "Retrying due to error: {error}");
336
337 self.total_ingested_transient_retries
338 .with_label_values(&[reason])
339 .inc();
340
341 backoff::Error::transient(error)
342 }
343}
344
345impl IndexerMetrics {
346 pub fn new(prefix: Option<&str>, registry: &Registry) -> Arc<Self> {
347 let prefix = prefix.unwrap_or("indexer");
348 let name = |n| format!("{prefix}_{n}");
349 Arc::new(Self {
350 total_handler_checkpoints_received: register_int_counter_vec_with_registry!(
351 name("total_handler_checkpoints_received"),
352 "Total number of checkpoints received by this handler",
353 &["pipeline"],
354 registry,
355 )
356 .unwrap(),
357 total_handler_processor_retries: register_int_counter_vec_with_registry!(
358 name("total_handler_processor_retries"),
359 "Total number of handler retries after transient processing failures",
360 &["pipeline"],
361 registry,
362 )
363 .unwrap(),
364 total_handler_checkpoints_processed: register_int_counter_vec_with_registry!(
365 name("total_handler_checkpoints_processed"),
366 "Total number of checkpoints processed (converted into rows) by this handler",
367 &["pipeline"],
368 registry,
369 )
370 .unwrap(),
371 total_handler_rows_created: register_int_counter_vec_with_registry!(
372 name("total_handler_rows_created"),
373 "Total number of rows created by this handler",
374 &["pipeline"],
375 registry,
376 )
377 .unwrap(),
378 latest_processed_checkpoint: register_int_gauge_vec_with_registry!(
379 name("latest_processed_checkpoint"),
380 "Latest checkpoint sequence number processed by this handler",
381 &["pipeline"],
382 registry,
383 )
384 .unwrap(),
385 latest_processed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
386 name("latest_processed_checkpoint_timestamp_lag_ms"),
387 "Difference between the system timestamp when the latest checkpoint was processed and the \
388 timestamp in the checkpoint, in milliseconds",
389 &["pipeline"],
390 registry,
391 )
392 .unwrap(),
393 processed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
394 name("processed_checkpoint_timestamp_lag"),
395 "Difference between the system timestamp when a checkpoint was processed and the \
396 timestamp in each checkpoint, in seconds",
397 &["pipeline"],
398 LAG_SEC_BUCKETS.to_vec(),
399 registry,
400 )
401 .unwrap(),
402 handler_checkpoint_latency: register_histogram_vec_with_registry!(
403 name("handler_checkpoint_latency"),
404 "Time taken to process a checkpoint by this handler",
405 &["pipeline"],
406 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
407 registry,
408 )
409 .unwrap(),
410 processor_concurrency_limit: register_int_gauge_vec_with_registry!(
411 name("processor_concurrency_limit"),
412 "Current adaptive concurrency limit for this processor",
413 &["pipeline"],
414 registry,
415 )
416 .unwrap(),
417 processor_concurrency_inflight: register_int_gauge_vec_with_registry!(
418 name("processor_concurrency_inflight"),
419 "Current number of in-flight processor tasks for this pipeline",
420 &["pipeline"],
421 registry,
422 )
423 .unwrap(),
424 total_collector_checkpoints_received: register_int_counter_vec_with_registry!(
425 name("total_collector_checkpoints_received"),
426 "Total number of checkpoints received by this collector",
427 &["pipeline"],
428 registry,
429 )
430 .unwrap(),
431 total_collector_rows_received: register_int_counter_vec_with_registry!(
432 name("total_collector_rows_received"),
433 "Total number of rows received by this collector",
434 &["pipeline"],
435 registry,
436 )
437 .unwrap(),
438 total_collector_batches_created: register_int_counter_vec_with_registry!(
439 name("total_collector_batches_created"),
440 "Total number of batches created by this collector",
441 &["pipeline"],
442 registry,
443 )
444 .unwrap(),
445 total_committer_batches_attempted: register_int_counter_vec_with_registry!(
446 name("total_committer_batches_attempted"),
447 "Total number of batches writes attempted by this committer",
448 &["pipeline"],
449 registry,
450 )
451 .unwrap(),
452 total_committer_batches_succeeded: register_int_counter_vec_with_registry!(
453 name("total_committer_batches_succeeded"),
454 "Total number of successful batches writes by this committer",
455 &["pipeline"],
456 registry,
457 )
458 .unwrap(),
459 total_committer_batches_failed: register_int_counter_vec_with_registry!(
460 name("total_committer_batches_failed"),
461 "Total number of failed batches writes by this committer",
462 &["pipeline"],
463 registry,
464 )
465 .unwrap(),
466 total_committer_rows_committed: register_int_counter_vec_with_registry!(
467 name("total_committer_rows_committed"),
468 "Total number of rows sent to the database by this committer",
469 &["pipeline"],
470 registry,
471 )
472 .unwrap(),
473 total_committer_rows_affected: register_int_counter_vec_with_registry!(
474 name("total_committer_rows_affected"),
475 "Total number of rows actually written to the database by this committer",
476 &["pipeline"],
477 registry,
478 )
479 .unwrap(),
480 total_watermarks_out_of_order: register_int_counter_vec_with_registry!(
481 name("watermark_out_of_order"),
482 "Number of times this committer encountered a batch for a checkpoint before its watermark",
483 &["pipeline"],
484 registry,
485 )
486 .unwrap(),
487 total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
488 name("pruner_chunks_attempted"),
489 "Number of chunks this pruner attempted to delete",
490 &["pipeline"],
491 registry,
492 )
493 .unwrap(),
494 total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
495 name("pruner_chunks_deleted"),
496 "Number of chunks this pruner successfully deleted",
497 &["pipeline"],
498 registry,
499 )
500 .unwrap(),
501 total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
502 name("pruner_rows_deleted"),
503 "Number of rows this pruner successfully deleted",
504 &["pipeline"],
505 registry,
506 )
507 .unwrap(),
508 latest_collected_checkpoint: register_int_gauge_vec_with_registry!(
509 name("latest_collected_checkpoint"),
510 "Latest checkpoint sequence number collected by this collector",
511 &["pipeline"],
512 registry,
513 )
514 .unwrap(),
515 latest_collected_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
516 name("latest_collected_checkpoint_timestamp_lag_ms"),
517 "Difference between the system timestamp when the latest checkpoint was collected and the \
518 timestamp in the checkpoint, in milliseconds",
519 &["pipeline"],
520 registry,
521 )
522 .unwrap(),
523 collected_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
524 name("collected_checkpoint_timestamp_lag"),
525 "Difference between the system timestamp when a checkpoint was collected and the \
526 timestamp in each checkpoint, in seconds",
527 &["pipeline"],
528 LAG_SEC_BUCKETS.to_vec(),
529 registry,
530 )
531 .unwrap(),
532 latest_partially_committed_checkpoint: register_int_gauge_vec_with_registry!(
533 name("latest_partially_committed_checkpoint"),
534 "Latest checkpoint sequence number partially committed by this collector",
535 &["pipeline"],
536 registry,
537 )
538 .unwrap(),
539 latest_partially_committed_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
540 name("latest_partially_committed_checkpoint_timestamp_lag_ms"),
541 "Difference between the system timestamp when the latest checkpoint was partially committed and the \
542 timestamp in the checkpoint, in milliseconds",
543 &["pipeline"],
544 registry,
545 )
546 .unwrap(),
547 partially_committed_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
548 name("partially_committed_checkpoint_timestamp_lag"),
549 "Difference between the system timestamp when a checkpoint was partially committed and the \
550 timestamp in each checkpoint, in seconds",
551 &["pipeline"],
552 LAG_SEC_BUCKETS.to_vec(),
553 registry,
554 )
555 .unwrap(),
556 latest_watermarked_checkpoint_timestamp_lag_ms: register_int_gauge_vec_with_registry!(
557 name("latest_watermarked_checkpoint_timestamp_lag_ms"),
558 "Difference between the system timestamp when the latest checkpoint was watermarked and the \
559 timestamp in the checkpoint, in milliseconds",
560 &["pipeline"],
561 registry,
562 )
563 .unwrap(),
564 watermarked_checkpoint_timestamp_lag: register_histogram_vec_with_registry!(
565 name("watermarked_checkpoint_timestamp_lag"),
566 "Difference between the system timestamp when a checkpoint was watermarked and the \
567 timestamp in each checkpoint, in seconds",
568 &["pipeline"],
569 LAG_SEC_BUCKETS.to_vec(),
570 registry,
571 )
572 .unwrap(),
573 collector_gather_latency: register_histogram_vec_with_registry!(
574 name("collector_gather_latency"),
575 "Time taken to gather rows into a batch by this collector",
576 &["pipeline"],
577 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
578 registry,
579 )
580 .unwrap(),
581 collector_batch_size: register_histogram_vec_with_registry!(
582 name("collector_batch_size"),
583 "Number of rows in a batch written to the database by this collector",
584 &["pipeline"],
585 BATCH_SIZE_BUCKETS.to_vec(),
586 registry,
587 )
588 .unwrap(),
589 total_collector_skipped_checkpoints: register_int_counter_vec_with_registry!(
590 name("total_collector_skipped_checkpoints"),
591 "Number of checkpoints skipped by the tasked pipeline's collector due to being below the main reader lo watermark",
592 &["pipeline"],
593 registry,
594 ).unwrap(),
595 collector_reader_lo: register_int_gauge_vec_with_registry!(
596 name("collector_reader_lo"),
597 "Reader low watermark as observed by the collector",
598 &["pipeline"],
599 registry,
600 )
601 .unwrap(),
602 committer_commit_latency: register_histogram_vec_with_registry!(
603 name("committer_commit_latency"),
604 "Time taken to write a batch of rows to the database by this committer",
605 &["pipeline"],
606 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
607 registry,
608 )
609 .unwrap(),
610 committer_tx_rows: register_histogram_vec_with_registry!(
611 name("committer_tx_rows"),
612 "Number of rows written to the database in a single database transaction by this committer",
613 &["pipeline"],
614 BATCH_SIZE_BUCKETS.to_vec(),
615 registry,
616 )
617 .unwrap(),
618 watermark_gather_latency: register_histogram_vec_with_registry!(
619 name("watermark_gather_latency"),
620 "Time taken to calculate the new high watermark after a write by this committer",
621 &["pipeline"],
622 PROCESSING_LATENCY_SEC_BUCKETS.to_vec(),
623 registry,
624 )
625 .unwrap(),
626 watermark_commit_latency: register_histogram_vec_with_registry!(
627 name("watermark_commit_latency"),
628 "Time taken to write the new high watermark to the database by this committer",
629 &["pipeline"],
630 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
631 registry,
632 )
633 .unwrap(),
634 watermark_pruner_read_latency: register_histogram_vec_with_registry!(
635 name("watermark_pruner_read_latency"),
636 "Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
637 &["pipeline"],
638 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
639 registry,
640 )
641 .unwrap(),
642 watermark_pruner_write_latency: register_histogram_vec_with_registry!(
643 name("watermark_pruner_write_latency"),
644 "Time taken to write the pruner's new upperbound to the database by this pruner",
645 &["pipeline"],
646 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
647 registry,
648 )
649 .unwrap(),
650 pruner_delete_latency: register_histogram_vec_with_registry!(
651 name("pruner_delete_latency"),
652 "Time taken to delete a chunk of data from the database by this pruner",
653 &["pipeline"],
654 DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
655 registry,
656 )
657 .unwrap(),
658 watermark_epoch: register_int_gauge_vec_with_registry!(
659 name("watermark_epoch"),
660 "Current epoch high watermark for this committer",
661 &["pipeline"],
662 registry,
663 )
664 .unwrap(),
665 watermark_checkpoint: register_int_gauge_vec_with_registry!(
666 name("watermark_checkpoint"),
667 "Current checkpoint high watermark for this committer",
668 &["pipeline"],
669 registry,
670 )
671 .unwrap(),
672 watermark_transaction: register_int_gauge_vec_with_registry!(
673 name("watermark_transaction"),
674 "Current transaction high watermark for this committer",
675 &["pipeline"],
676 registry,
677 )
678 .unwrap(),
679 watermark_timestamp_ms: register_int_gauge_vec_with_registry!(
680 name("watermark_timestamp_ms"),
681 "Current timestamp high watermark for this committer, in milliseconds",
682 &["pipeline"],
683 registry,
684 )
685 .unwrap(),
686 watermark_reader_lo: register_int_gauge_vec_with_registry!(
687 name("watermark_reader_lo"),
688 "Current reader low watermark for this pruner",
689 &["pipeline"],
690 registry,
691 )
692 .unwrap(),
693 watermark_pruner_hi: register_int_gauge_vec_with_registry!(
694 name("watermark_pruner_hi"),
695 "Current pruner high watermark for this pruner",
696 &["pipeline"],
697 registry,
698 )
699 .unwrap(),
700 watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
701 name("watermark_epoch_in_db"),
702 "Last epoch high watermark this committer wrote to the DB",
703 &["pipeline"],
704 registry,
705 )
706 .unwrap(),
707 watermark_checkpoint_in_db: register_int_gauge_vec_with_registry!(
708 name("watermark_checkpoint_in_db"),
709 "Last checkpoint high watermark this committer wrote to the DB",
710 &["pipeline"],
711 registry,
712 )
713 .unwrap(),
714 watermark_transaction_in_db: register_int_gauge_vec_with_registry!(
715 name("watermark_transaction_in_db"),
716 "Last transaction high watermark this committer wrote to the DB",
717 &["pipeline"],
718 registry,
719 )
720 .unwrap(),
721 watermark_timestamp_in_db_ms: register_int_gauge_vec_with_registry!(
722 name("watermark_timestamp_ms_in_db"),
723 "Last timestamp high watermark this committer wrote to the DB, in milliseconds",
724 &["pipeline"],
725 registry,
726 )
727 .unwrap(),
728 watermark_reader_lo_in_db: register_int_gauge_vec_with_registry!(
729 name("watermark_reader_lo_in_db"),
730 "Last reader low watermark this pruner wrote to the DB",
731 &["pipeline"],
732 registry,
733 )
734 .unwrap(),
735 watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
736 name("watermark_pruner_hi_in_db"),
737 "Last pruner high watermark this pruner wrote to the DB",
738 &["pipeline"],
739 registry,
740 )
741 .unwrap(),
742 })
743 }
744
745 pub(crate) fn inc_processor_retry<P: Processor>(
746 &self,
747 checkpoint: u64,
748 error: &anyhow::Error,
749 delay: Duration,
750 ) {
751 warn!(
752 pipeline = P::NAME,
753 checkpoint,
754 retry_delay_ms = delay.as_millis(),
755 "Retrying processor after error: {error:?}",
756 );
757
758 self.total_handler_processor_retries
759 .with_label_values(&[P::NAME])
760 .inc();
761 }
762}
763
764impl CheckpointLagMetricReporter {
765 pub fn new(
766 checkpoint_time_lag_histogram: Histogram,
767 latest_checkpoint_time_lag_gauge: IntGauge,
768 latest_checkpoint_sequence_number_gauge: IntGauge,
769 ) -> Arc<Self> {
770 Arc::new(Self {
771 checkpoint_time_lag_histogram,
772 latest_checkpoint_time_lag_gauge,
773 latest_checkpoint_sequence_number_gauge,
774 latest_reported_checkpoint: AtomicU64::new(0),
775 })
776 }
777
778 pub fn new_for_pipeline<P: Processor>(
779 checkpoint_time_lag_histogram: &HistogramVec,
780 latest_checkpoint_time_lag_gauge: &IntGaugeVec,
781 latest_checkpoint_sequence_number_gauge: &IntGaugeVec,
782 ) -> Arc<Self> {
783 Self::new(
784 checkpoint_time_lag_histogram.with_label_values(&[P::NAME]),
785 latest_checkpoint_time_lag_gauge.with_label_values(&[P::NAME]),
786 latest_checkpoint_sequence_number_gauge.with_label_values(&[P::NAME]),
787 )
788 }
789
790 pub fn report_lag(&self, cp_sequence_number: u64, checkpoint_timestamp_ms: u64) {
791 let lag = chrono::Utc::now().timestamp_millis() - checkpoint_timestamp_ms as i64;
792 self.checkpoint_time_lag_histogram
793 .observe((lag as f64) / 1000.0);
794
795 let prev = self
796 .latest_reported_checkpoint
797 .fetch_max(cp_sequence_number, std::sync::atomic::Ordering::Relaxed);
798 if cp_sequence_number > prev {
799 self.latest_checkpoint_sequence_number_gauge
800 .set(cp_sequence_number as i64);
801 self.latest_checkpoint_time_lag_gauge.set(lag);
802 }
803 }
804}
805
806#[cfg(test)]
807pub(crate) mod tests {
808 use std::sync::Arc;
809
810 use prometheus::Registry;
811
812 use super::*;
813
814 pub fn test_metrics() -> Arc<IndexerMetrics> {
816 IndexerMetrics::new(None, &Registry::new())
817 }
818
819 pub fn test_ingestion_metrics() -> Arc<IngestionMetrics> {
821 IngestionMetrics::new(None, &Registry::new())
822 }
823}