sui_indexer_alt_framework/pipeline/sequential/
committer.rs1use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::info;
13use tracing::warn;
14
15use crate::metrics::CheckpointLagMetricReporter;
16use crate::metrics::IndexerMetrics;
17use crate::pipeline::logging::WatermarkLogger;
18use crate::pipeline::sequential::Handler;
19use crate::pipeline::sequential::collector::BatchedRows;
20use crate::store::Connection;
21use crate::store::SequentialStore;
22
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
25
26pub(super) fn committer<H: Handler>(
31 handler: Arc<H>,
32 store: H::Store,
33 metrics: Arc<IndexerMetrics>,
34 mut rx: mpsc::Receiver<BatchedRows<H>>,
35) -> Service {
36 Service::new().spawn_aborting(async move {
37 info!(pipeline = H::NAME, "Starting committer");
38
39 let mut logger = WatermarkLogger::new("sequential_committer");
40 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
41 &metrics.watermarked_checkpoint_timestamp_lag,
42 &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
43 &metrics.watermark_checkpoint_in_db,
44 );
45
46 while let Some(batched) = rx.recv().await {
47 let BatchedRows {
48 batch,
49 watermark,
50 batch_rows,
51 } = batched;
52
53 let backoff = ExponentialBackoff {
54 initial_interval: INITIAL_RETRY_INTERVAL,
55 current_interval: INITIAL_RETRY_INTERVAL,
56 max_interval: MAX_RETRY_INTERVAL,
57 max_elapsed_time: None,
58 ..Default::default()
59 };
60
61 let commit = || async {
62 metrics
63 .total_committer_batches_attempted
64 .with_label_values(&[H::NAME])
65 .inc();
66
67 let guard = metrics
68 .committer_commit_latency
69 .with_label_values(&[H::NAME])
70 .start_timer();
71
72 let result = store
73 .transaction(|conn| {
74 async {
75 conn.set_committer_watermark(H::NAME, watermark).await?;
76 handler.commit(&batch, conn).await
77 }
78 .scope_boxed()
79 })
80 .await;
81
82 let elapsed = guard.stop_and_record();
83
84 match result {
85 Ok(affected) => Ok((affected, elapsed)),
86 Err(e) => {
87 warn!(
88 pipeline = H::NAME,
89 elapsed_ms = elapsed * 1000.0,
90 committed = batch_rows,
91 "Error writing batch: {e}",
92 );
93 metrics
94 .total_committer_batches_failed
95 .with_label_values(&[H::NAME])
96 .inc();
97 Err(backoff::Error::transient(e))
98 }
99 }
100 };
101
102 let (affected, elapsed) = backoff::future::retry(backoff, commit).await?;
103
104 debug!(
105 pipeline = H::NAME,
106 affected,
107 committed = batch_rows,
108 "Wrote batch",
109 );
110 logger.log::<H>(&watermark, elapsed);
111
112 checkpoint_lag_reporter.report_lag(
113 watermark.checkpoint_hi_inclusive,
114 watermark.timestamp_ms_hi_inclusive,
115 );
116
117 metrics
118 .total_committer_batches_succeeded
119 .with_label_values(&[H::NAME])
120 .inc();
121
122 metrics
123 .total_committer_rows_committed
124 .with_label_values(&[H::NAME])
125 .inc_by(batch_rows as u64);
126
127 metrics
128 .total_committer_rows_affected
129 .with_label_values(&[H::NAME])
130 .inc_by(affected as u64);
131
132 metrics
133 .committer_tx_rows
134 .with_label_values(&[H::NAME])
135 .observe(affected as f64);
136
137 metrics
138 .watermark_epoch_in_db
139 .with_label_values(&[H::NAME])
140 .set(watermark.epoch_hi_inclusive as i64);
141
142 metrics
143 .watermark_checkpoint_in_db
144 .with_label_values(&[H::NAME])
145 .set(watermark.checkpoint_hi_inclusive as i64);
146
147 metrics
148 .watermark_transaction_in_db
149 .with_label_values(&[H::NAME])
150 .set(watermark.tx_hi as i64);
151
152 metrics
153 .watermark_timestamp_in_db_ms
154 .with_label_values(&[H::NAME])
155 .set(watermark.timestamp_ms_hi_inclusive as i64);
156 }
157
158 info!(pipeline = H::NAME, "Stopping committer");
159 Ok(())
160 })
161}