sui_indexer_alt_framework/pipeline/sequential/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tracing::debug;
12use tracing::info;
13use tracing::warn;
14
15use crate::metrics::CheckpointLagMetricReporter;
16use crate::metrics::IndexerMetrics;
17use crate::pipeline::logging::WatermarkLogger;
18use crate::pipeline::sequential::Handler;
19use crate::pipeline::sequential::collector::BatchedRows;
20use crate::store::Connection;
21use crate::store::SequentialStore;
22
23const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
24const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
25
26/// Committer task — receives fully-assembled batches from the collector and commits them in
27/// order (one at a time; watermark ordering requires strict serialisation). On commit failure
28/// it retries the same batch under exponential backoff. The collector is free to build the
29/// next batch in the meantime, bounded by `pipeline_depth`.
30pub(super) fn committer<H: Handler>(
31    handler: Arc<H>,
32    store: H::Store,
33    metrics: Arc<IndexerMetrics>,
34    mut rx: mpsc::Receiver<BatchedRows<H>>,
35) -> Service {
36    Service::new().spawn_aborting(async move {
37        info!(pipeline = H::NAME, "Starting committer");
38
39        let mut logger = WatermarkLogger::new("sequential_committer");
40        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
41            &metrics.watermarked_checkpoint_timestamp_lag,
42            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
43            &metrics.watermark_checkpoint_in_db,
44        );
45
46        while let Some(batched) = rx.recv().await {
47            let BatchedRows {
48                batch,
49                watermark,
50                batch_rows,
51            } = batched;
52
53            let backoff = ExponentialBackoff {
54                initial_interval: INITIAL_RETRY_INTERVAL,
55                current_interval: INITIAL_RETRY_INTERVAL,
56                max_interval: MAX_RETRY_INTERVAL,
57                max_elapsed_time: None,
58                ..Default::default()
59            };
60
61            let commit = || async {
62                metrics
63                    .total_committer_batches_attempted
64                    .with_label_values(&[H::NAME])
65                    .inc();
66
67                let guard = metrics
68                    .committer_commit_latency
69                    .with_label_values(&[H::NAME])
70                    .start_timer();
71
72                let result = store
73                    .transaction(|conn| {
74                        async {
75                            conn.set_committer_watermark(H::NAME, watermark).await?;
76                            handler.commit(&batch, conn).await
77                        }
78                        .scope_boxed()
79                    })
80                    .await;
81
82                let elapsed = guard.stop_and_record();
83
84                match result {
85                    Ok(affected) => Ok((affected, elapsed)),
86                    Err(e) => {
87                        warn!(
88                            pipeline = H::NAME,
89                            elapsed_ms = elapsed * 1000.0,
90                            committed = batch_rows,
91                            "Error writing batch: {e}",
92                        );
93                        metrics
94                            .total_committer_batches_failed
95                            .with_label_values(&[H::NAME])
96                            .inc();
97                        Err(backoff::Error::transient(e))
98                    }
99                }
100            };
101
102            let (affected, elapsed) = backoff::future::retry(backoff, commit).await?;
103
104            debug!(
105                pipeline = H::NAME,
106                affected,
107                committed = batch_rows,
108                "Wrote batch",
109            );
110            logger.log::<H>(&watermark, elapsed);
111
112            checkpoint_lag_reporter.report_lag(
113                watermark.checkpoint_hi_inclusive,
114                watermark.timestamp_ms_hi_inclusive,
115            );
116
117            metrics
118                .total_committer_batches_succeeded
119                .with_label_values(&[H::NAME])
120                .inc();
121
122            metrics
123                .total_committer_rows_committed
124                .with_label_values(&[H::NAME])
125                .inc_by(batch_rows as u64);
126
127            metrics
128                .total_committer_rows_affected
129                .with_label_values(&[H::NAME])
130                .inc_by(affected as u64);
131
132            metrics
133                .committer_tx_rows
134                .with_label_values(&[H::NAME])
135                .observe(affected as f64);
136
137            metrics
138                .watermark_epoch_in_db
139                .with_label_values(&[H::NAME])
140                .set(watermark.epoch_hi_inclusive as i64);
141
142            metrics
143                .watermark_checkpoint_in_db
144                .with_label_values(&[H::NAME])
145                .set(watermark.checkpoint_hi_inclusive as i64);
146
147            metrics
148                .watermark_transaction_in_db
149                .with_label_values(&[H::NAME])
150                .set(watermark.tx_hi as i64);
151
152            metrics
153                .watermark_timestamp_in_db_ms
154                .with_label_values(&[H::NAME])
155                .set(watermark.timestamp_ms_hi_inclusive as i64);
156        }
157
158        info!(pipeline = H::NAME, "Stopping committer");
159        Ok(())
160    })
161}