sui_indexer_alt_framework/pipeline/concurrent/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::CheckpointLagMetricReporter;
19use crate::metrics::IndexerMetrics;
20use crate::pipeline::CommitterConfig;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchedRows;
23use crate::pipeline::concurrent::Handler;
24use crate::store::Store;
25
26/// If the committer needs to retry a commit, it will wait this long initially.
27const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
28
29/// If the committer needs to retry a commit, it will wait at most this long between retries.
30const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
31
32/// The committer task is responsible for writing batches of rows to the database. It receives
33/// batches on `rx` and writes them out to the `db` concurrently (`config.write_concurrency`
34/// controls the degree of fan-out).
35///
36/// The writing of each batch will be repeatedly retried on an exponential back-off until it
37/// succeeds. Once the write succeeds, the [WatermarkPart]s for that batch are sent on `tx` to the
38/// watermark task.
39///
40/// This task will shutdown if its receiver or sender channels are closed.
41pub(super) fn committer<H: Handler + 'static>(
42    handler: Arc<H>,
43    config: CommitterConfig,
44    rx: mpsc::Receiver<BatchedRows<H>>,
45    tx: mpsc::Sender<Vec<WatermarkPart>>,
46    db: H::Store,
47    metrics: Arc<IndexerMetrics>,
48) -> Service {
49    Service::new().spawn_aborting(async move {
50        info!(pipeline = H::NAME, "Starting committer");
51        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
52            &metrics.partially_committed_checkpoint_timestamp_lag,
53            &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
54            &metrics.latest_partially_committed_checkpoint,
55        );
56
57        match ReceiverStream::new(rx)
58            .try_for_each_spawned(
59                config.write_concurrency,
60                |BatchedRows {
61                     batch,
62                     batch_len,
63                     watermark,
64                 }| {
65                    let batch = Arc::new(batch);
66                    let handler = handler.clone();
67                    let tx = tx.clone();
68                    let db = db.clone();
69                    let metrics = metrics.clone();
70                    let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
71
72                    // Repeatedly try to get a connection to the DB and write the batch. Use an
73                    // exponential backoff in case the failure is due to contention over the DB
74                    // connection pool.
75                    let backoff = ExponentialBackoff {
76                        initial_interval: INITIAL_RETRY_INTERVAL,
77                        current_interval: INITIAL_RETRY_INTERVAL,
78                        max_interval: MAX_RETRY_INTERVAL,
79                        max_elapsed_time: None,
80                        ..Default::default()
81                    };
82
83                    let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
84                    let highest_checkpoint_timestamp =
85                        watermark.iter().map(|w| w.timestamp_ms()).max();
86
87                    use backoff::Error as BE;
88                    let commit = move || {
89                        let batch = batch.clone();
90                        let handler = handler.clone();
91                        let db = db.clone();
92                        let metrics = metrics.clone();
93                        let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
94                        async move {
95                            if batch_len == 0 {
96                                return Ok(());
97                            }
98
99                            metrics
100                                .total_committer_batches_attempted
101                                .with_label_values(&[H::NAME])
102                                .inc();
103
104                            let guard = metrics
105                                .committer_commit_latency
106                                .with_label_values(&[H::NAME])
107                                .start_timer();
108
109                            let mut conn = db.connect().await.map_err(|e| {
110                                warn!(
111                                    pipeline = H::NAME,
112                                    "Committed failed to get connection for DB"
113                                );
114
115                                metrics
116                                    .total_committer_batches_failed
117                                    .with_label_values(&[H::NAME])
118                                    .inc();
119
120                                BE::transient(Break::Err(e))
121                            })?;
122
123                            let affected = handler.commit(&batch, &mut conn).await;
124                            let elapsed = guard.stop_and_record();
125
126                            match affected {
127                                Ok(affected) => {
128                                    debug!(
129                                        pipeline = H::NAME,
130                                        elapsed_ms = elapsed * 1000.0,
131                                        affected,
132                                        committed = batch_len,
133                                        "Wrote batch",
134                                    );
135
136                                    checkpoint_lag_reporter.report_lag(
137                                        // unwrap is safe because we would have returned if values is empty.
138                                        highest_checkpoint.unwrap(),
139                                        highest_checkpoint_timestamp.unwrap(),
140                                    );
141
142                                    metrics
143                                        .total_committer_batches_succeeded
144                                        .with_label_values(&[H::NAME])
145                                        .inc();
146
147                                    metrics
148                                        .total_committer_rows_committed
149                                        .with_label_values(&[H::NAME])
150                                        .inc_by(batch_len as u64);
151
152                                    metrics
153                                        .total_committer_rows_affected
154                                        .with_label_values(&[H::NAME])
155                                        .inc_by(affected as u64);
156
157                                    metrics
158                                        .committer_tx_rows
159                                        .with_label_values(&[H::NAME])
160                                        .observe(affected as f64);
161
162                                    Ok(())
163                                }
164
165                                Err(e) => {
166                                    warn!(
167                                        pipeline = H::NAME,
168                                        elapsed_ms = elapsed * 1000.0,
169                                        committed = batch_len,
170                                        "Error writing batch: {e}",
171                                    );
172
173                                    metrics
174                                        .total_committer_batches_failed
175                                        .with_label_values(&[H::NAME])
176                                        .inc();
177
178                                    Err(BE::transient(Break::Err(e)))
179                                }
180                            }
181                        }
182                    };
183
184                    async move {
185                        // Double check that the commit actually went through, (this backoff should
186                        // not produce any permanent errors, but if it does, we need to shutdown
187                        // the pipeline).
188                        backoff::future::retry(backoff, commit).await?;
189                        if tx.send(watermark).await.is_err() {
190                            info!(pipeline = H::NAME, "Watermark closed channel");
191                            return Err(Break::<anyhow::Error>::Break);
192                        }
193
194                        Ok(())
195                    }
196                },
197            )
198            .await
199        {
200            Ok(()) => {
201                info!(pipeline = H::NAME, "Batches done, stopping committer");
202                Ok(())
203            }
204
205            Err(Break::Break) => {
206                info!(pipeline = H::NAME, "Channels closed, stopping committer");
207                Ok(())
208            }
209
210            Err(Break::Err(e)) => {
211                error!(pipeline = H::NAME, "Error from committer: {e}");
212                Err(e.context(format!("Error from committer {}", H::NAME)))
213            }
214        }
215    })
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::Arc;
221    use std::sync::Mutex;
222    use std::sync::atomic::AtomicUsize;
223    use std::sync::atomic::Ordering;
224
225    use anyhow::ensure;
226    use async_trait::async_trait;
227    use sui_types::full_checkpoint_content::Checkpoint;
228    use tokio::sync::mpsc;
229
230    use crate::FieldCount;
231    use crate::metrics::IndexerMetrics;
232    use crate::mocks::store::*;
233    use crate::pipeline::Processor;
234    use crate::pipeline::WatermarkPart;
235    use crate::pipeline::concurrent::BatchStatus;
236    use crate::pipeline::concurrent::BatchedRows;
237    use crate::pipeline::concurrent::Handler;
238    use crate::store::CommitterWatermark;
239
240    use super::*;
241
242    #[derive(Clone, FieldCount, Default)]
243    pub struct StoredData {
244        pub cp_sequence_number: u64,
245        pub tx_sequence_numbers: Vec<u64>,
246        /// Tracks remaining commit failures for testing retry logic.
247        /// The committer spawns concurrent tasks that call H::commit,
248        /// so this needs to be thread-safe (hence Arc<AtomicUsize>).
249        pub commit_failure_remaining: Arc<AtomicUsize>,
250        pub commit_delay_ms: u64,
251    }
252
253    pub struct DataPipeline;
254
255    #[async_trait]
256    impl Processor for DataPipeline {
257        const NAME: &'static str = "data";
258
259        type Value = StoredData;
260
261        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
262            Ok(vec![])
263        }
264    }
265
266    #[async_trait]
267    impl Handler for DataPipeline {
268        type Store = MockStore;
269        type Batch = Vec<Self::Value>;
270
271        fn batch(
272            &self,
273            batch: &mut Self::Batch,
274            values: &mut std::vec::IntoIter<Self::Value>,
275        ) -> BatchStatus {
276            batch.extend(values);
277            BatchStatus::Pending
278        }
279
280        async fn commit<'a>(
281            &self,
282            batch: &Self::Batch,
283            conn: &mut MockConnection<'a>,
284        ) -> anyhow::Result<usize> {
285            for value in batch {
286                // If there's a delay, sleep for that duration
287                if value.commit_delay_ms > 0 {
288                    tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
289                }
290
291                // If there are remaining failures, fail the commit and decrement the counter
292                {
293                    let remaining = value
294                        .commit_failure_remaining
295                        .fetch_sub(1, Ordering::Relaxed);
296                    ensure!(
297                        remaining == 0,
298                        "Commit failed, remaining failures: {}",
299                        remaining - 1
300                    );
301                }
302
303                conn.0
304                    .commit_data(
305                        DataPipeline::NAME,
306                        value.cp_sequence_number,
307                        value.tx_sequence_numbers.clone(),
308                    )
309                    .await?;
310            }
311            Ok(batch.len())
312        }
313    }
314
315    struct TestSetup {
316        store: MockStore,
317        batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
318        watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
319        committer: Service,
320    }
321
322    /// Creates and spawns a committer task with the provided mock store, along with
323    /// all necessary channels and configuration. The committer runs in the background
324    /// and can be interacted with through the returned channels.
325    ///
326    /// # Arguments
327    /// * `store` - The mock store to use for testing
328    async fn setup_test(store: MockStore) -> TestSetup {
329        let config = CommitterConfig::default();
330        let metrics = IndexerMetrics::new(None, &Default::default());
331
332        let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
333        let (watermark_tx, watermark_rx) = mpsc::channel(10);
334
335        let store_clone = store.clone();
336        let handler = Arc::new(DataPipeline);
337        let committer = committer(
338            handler,
339            config,
340            batch_rx,
341            watermark_tx,
342            store_clone,
343            metrics,
344        );
345
346        TestSetup {
347            store,
348            batch_tx,
349            watermark_rx,
350            committer,
351        }
352    }
353
354    #[tokio::test]
355    async fn test_concurrent_batch_processing() {
356        let mut setup = setup_test(MockStore::default()).await;
357
358        // Send batches
359        let batch1 = BatchedRows::from_vec(
360            vec![
361                StoredData {
362                    cp_sequence_number: 1,
363                    tx_sequence_numbers: vec![1, 2, 3],
364                    ..Default::default()
365                },
366                StoredData {
367                    cp_sequence_number: 2,
368                    tx_sequence_numbers: vec![4, 5, 6],
369                    ..Default::default()
370                },
371            ],
372            vec![
373                WatermarkPart {
374                    watermark: CommitterWatermark {
375                        epoch_hi_inclusive: 0,
376                        checkpoint_hi_inclusive: 1,
377                        tx_hi: 3,
378                        timestamp_ms_hi_inclusive: 1000,
379                    },
380                    batch_rows: 1,
381                    total_rows: 1, // Total rows from checkpoint 1
382                },
383                WatermarkPart {
384                    watermark: CommitterWatermark {
385                        epoch_hi_inclusive: 0,
386                        checkpoint_hi_inclusive: 2,
387                        tx_hi: 6,
388                        timestamp_ms_hi_inclusive: 2000,
389                    },
390                    batch_rows: 1,
391                    total_rows: 1, // Total rows from checkpoint 2
392                },
393            ],
394        );
395
396        let batch2 = BatchedRows::from_vec(
397            vec![StoredData {
398                cp_sequence_number: 3,
399                tx_sequence_numbers: vec![7, 8, 9],
400                ..Default::default()
401            }],
402            vec![WatermarkPart {
403                watermark: CommitterWatermark {
404                    epoch_hi_inclusive: 0,
405                    checkpoint_hi_inclusive: 3,
406                    tx_hi: 9,
407                    timestamp_ms_hi_inclusive: 3000,
408                },
409                batch_rows: 1,
410                total_rows: 1, // Total rows from checkpoint 3
411            }],
412        );
413
414        setup.batch_tx.send(batch1).await.unwrap();
415        setup.batch_tx.send(batch2).await.unwrap();
416
417        // Verify watermarks. Blocking until committer has processed.
418        let watermark1 = setup.watermark_rx.recv().await.unwrap();
419        let watermark2 = setup.watermark_rx.recv().await.unwrap();
420        assert_eq!(watermark1.len(), 2);
421        assert_eq!(watermark2.len(), 1);
422
423        // Verify data was committed
424        {
425            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
426            assert_eq!(data.len(), 3);
427            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
428            assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
429            assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
430        }
431    }
432
433    #[tokio::test]
434    async fn test_commit_with_retries_for_commit_failure() {
435        let mut setup = setup_test(MockStore::default()).await;
436
437        // Create a batch with a single item that will fail once before succeeding
438        let batch = BatchedRows::from_vec(
439            vec![StoredData {
440                cp_sequence_number: 1,
441                tx_sequence_numbers: vec![1, 2, 3],
442                commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
443                commit_delay_ms: 1_000, // Long commit delay for testing state between retry
444            }],
445            vec![WatermarkPart {
446                watermark: CommitterWatermark {
447                    epoch_hi_inclusive: 0,
448                    checkpoint_hi_inclusive: 1,
449                    tx_hi: 3,
450                    timestamp_ms_hi_inclusive: 1000,
451                },
452                batch_rows: 1,
453                total_rows: 1,
454            }],
455        );
456
457        // Send the batch
458        setup.batch_tx.send(batch).await.unwrap();
459
460        // Wait for the first attempt to fail and before the retry succeeds
461        tokio::time::sleep(Duration::from_millis(1_500)).await;
462
463        // Verify state before retry succeeds
464        {
465            let data = setup.store.data.get(DataPipeline::NAME);
466            assert!(
467                data.is_none(),
468                "Data should not be committed before retry succeeds"
469            );
470        }
471        assert!(
472            setup.watermark_rx.try_recv().is_err(),
473            "No watermark should be received before retry succeeds"
474        );
475
476        // Wait for the retry to succeed
477        tokio::time::sleep(Duration::from_millis(1_500)).await;
478
479        // Verify state after retry succeeds
480        {
481            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
482
483            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
484        }
485        let watermark = setup.watermark_rx.recv().await.unwrap();
486        assert_eq!(watermark.len(), 1);
487    }
488
489    #[tokio::test]
490    async fn test_commit_with_retries_for_connection_failure() {
491        // Create a batch with a single item
492        let store = MockStore {
493            connection_failure: Arc::new(Mutex::new(ConnectionFailure {
494                connection_failure_attempts: 1,
495                connection_delay_ms: 1_000, // Long connection delay for testing state between retry
496                ..Default::default()
497            })),
498            ..Default::default()
499        };
500        let mut setup = setup_test(store).await;
501
502        let batch = BatchedRows::from_vec(
503            vec![StoredData {
504                cp_sequence_number: 1,
505                tx_sequence_numbers: vec![1, 2, 3],
506                ..Default::default()
507            }],
508            vec![WatermarkPart {
509                watermark: CommitterWatermark {
510                    epoch_hi_inclusive: 0,
511                    checkpoint_hi_inclusive: 1,
512                    tx_hi: 3,
513                    timestamp_ms_hi_inclusive: 1000,
514                },
515                batch_rows: 1,
516                total_rows: 1,
517            }],
518        );
519
520        // Send the batch
521        setup.batch_tx.send(batch).await.unwrap();
522
523        // Wait for the first attempt to fail and before the retry succeeds
524        tokio::time::sleep(Duration::from_millis(1_500)).await;
525
526        // Verify state before retry succeeds
527        {
528            let data = setup.store.data.get(DataPipeline::NAME);
529            assert!(
530                data.is_none(),
531                "Data should not be committed before retry succeeds"
532            );
533        }
534        assert!(
535            setup.watermark_rx.try_recv().is_err(),
536            "No watermark should be received before retry succeeds"
537        );
538
539        // Wait for the retry to succeed
540        tokio::time::sleep(Duration::from_millis(1_500)).await;
541
542        // Verify state after retry succeeds
543        {
544            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
545            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
546        }
547        let watermark = setup.watermark_rx.recv().await.unwrap();
548        assert_eq!(watermark.len(), 1);
549    }
550
551    #[tokio::test]
552    async fn test_empty_batch_handling() {
553        let mut setup = setup_test(MockStore::default()).await;
554
555        let empty_batch = BatchedRows::from_vec(
556            vec![], // Empty batch
557            vec![WatermarkPart {
558                watermark: CommitterWatermark {
559                    epoch_hi_inclusive: 0,
560                    checkpoint_hi_inclusive: 1,
561                    tx_hi: 0,
562                    timestamp_ms_hi_inclusive: 1000,
563                },
564                batch_rows: 0,
565                total_rows: 0,
566            }],
567        );
568
569        // Send the empty batch
570        setup.batch_tx.send(empty_batch).await.unwrap();
571
572        // Verify watermark is still sent (empty batches should still produce watermarks)
573        let watermark = setup.watermark_rx.recv().await.unwrap();
574        assert_eq!(watermark.len(), 1);
575        assert_eq!(watermark[0].batch_rows, 0);
576        assert_eq!(watermark[0].total_rows, 0);
577
578        // Verify no data was committed (since batch was empty)
579        {
580            let data = setup.store.data.get(DataPipeline::NAME);
581            assert!(
582                data.is_none(),
583                "No data should be committed for empty batch"
584            );
585        }
586    }
587
588    #[tokio::test]
589    async fn test_watermark_channel_closed() {
590        let setup = setup_test(MockStore::default()).await;
591
592        let batch = BatchedRows::from_vec(
593            vec![StoredData {
594                cp_sequence_number: 1,
595                tx_sequence_numbers: vec![1, 2, 3],
596                ..Default::default()
597            }],
598            vec![WatermarkPart {
599                watermark: CommitterWatermark {
600                    epoch_hi_inclusive: 0,
601                    checkpoint_hi_inclusive: 1,
602                    tx_hi: 3,
603                    timestamp_ms_hi_inclusive: 1000,
604                },
605                batch_rows: 1,
606                total_rows: 1,
607            }],
608        );
609
610        // Send the batch
611        setup.batch_tx.send(batch).await.unwrap();
612
613        // Wait for processing.
614        tokio::time::sleep(Duration::from_millis(100)).await;
615
616        // Close the watermark channel by dropping the receiver
617        drop(setup.watermark_rx);
618
619        // Wait a bit more for the committer to handle the channel closure
620        tokio::time::sleep(Duration::from_millis(200)).await;
621
622        // Verify data was still committed despite watermark channel closure
623        {
624            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
625            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
626        }
627
628        // Close the batch channel to allow the committer to terminate
629        drop(setup.batch_tx);
630
631        // Verify the committer task has terminated due to watermark channel closure
632        // The task should exit gracefully when it can't send watermarks (returns Break::Cancel)
633        setup.committer.shutdown().await.unwrap();
634    }
635}