sui_indexer_alt_framework/pipeline/concurrent/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use backoff::ExponentialBackoff;
8use sui_futures::service::Service;
9use sui_futures::stream::Break;
10use sui_futures::stream::TrySpawnStreamExt;
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18use crate::metrics::CheckpointLagMetricReporter;
19use crate::metrics::IndexerMetrics;
20use crate::pipeline::CommitterConfig;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchedRows;
23use crate::pipeline::concurrent::Handler;
24use crate::store::Store;
25
26/// If the committer needs to retry a commit, it will wait this long initially.
27const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
28
29/// If the committer needs to retry a commit, it will wait at most this long between retries.
30const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
31
32/// The committer task is responsible for writing batches of rows to the database. It receives
33/// batches on `rx` and writes them out to the `db` concurrently (`config.write_concurrency`
34/// controls the degree of fan-out).
35///
36/// The writing of each batch will be repeatedly retried on an exponential back-off until it
37/// succeeds. Once the write succeeds, the [WatermarkPart]s for that batch are sent on `tx` to the
38/// watermark task.
39///
40/// This task will shutdown if its receiver or sender channels are closed.
41pub(super) fn committer<H: Handler>(
42    handler: Arc<H>,
43    config: CommitterConfig,
44    rx: mpsc::Receiver<BatchedRows<H>>,
45    tx: mpsc::Sender<Vec<WatermarkPart>>,
46    db: H::Store,
47    metrics: Arc<IndexerMetrics>,
48) -> Service {
49    Service::new().spawn_aborting(async move {
50        info!(pipeline = H::NAME, "Starting committer");
51        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
52            &metrics.partially_committed_checkpoint_timestamp_lag,
53            &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
54            &metrics.latest_partially_committed_checkpoint,
55        );
56
57        match ReceiverStream::new(rx)
58            .try_for_each_spawned(
59                config.write_concurrency,
60                |BatchedRows {
61                     batch,
62                     batch_len,
63                     watermark,
64                 }| {
65                    let batch = Arc::new(batch);
66                    let handler = handler.clone();
67                    let tx = tx.clone();
68                    let db = db.clone();
69                    let metrics = metrics.clone();
70                    let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
71
72                    // Repeatedly try to get a connection to the DB and write the batch. Use an
73                    // exponential backoff in case the failure is due to contention over the DB
74                    // connection pool.
75                    let backoff = ExponentialBackoff {
76                        initial_interval: INITIAL_RETRY_INTERVAL,
77                        current_interval: INITIAL_RETRY_INTERVAL,
78                        max_interval: MAX_RETRY_INTERVAL,
79                        max_elapsed_time: None,
80                        ..Default::default()
81                    };
82
83                    let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
84                    let highest_checkpoint_timestamp =
85                        watermark.iter().map(|w| w.timestamp_ms()).max();
86
87                    use backoff::Error as BE;
88                    let commit = move || {
89                        let batch = batch.clone();
90                        let handler = handler.clone();
91                        let db = db.clone();
92                        let metrics = metrics.clone();
93                        let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
94                        async move {
95                            if batch_len == 0 {
96                                return Ok(());
97                            }
98
99                            metrics
100                                .total_committer_batches_attempted
101                                .with_label_values(&[H::NAME])
102                                .inc();
103
104                            let guard = metrics
105                                .committer_commit_latency
106                                .with_label_values(&[H::NAME])
107                                .start_timer();
108
109                            let mut conn = db.connect().await.map_err(|e| {
110                                warn!(
111                                    pipeline = H::NAME,
112                                    "Committed failed to get connection for DB"
113                                );
114
115                                metrics
116                                    .total_committer_batches_failed
117                                    .with_label_values(&[H::NAME])
118                                    .inc();
119
120                                BE::transient(Break::Err(e))
121                            })?;
122
123                            let affected = handler.commit(&batch, &mut conn).await;
124                            let elapsed = guard.stop_and_record();
125
126                            match affected {
127                                Ok(affected) => {
128                                    debug!(
129                                        pipeline = H::NAME,
130                                        elapsed_ms = elapsed * 1000.0,
131                                        affected,
132                                        committed = batch_len,
133                                        "Wrote batch",
134                                    );
135
136                                    checkpoint_lag_reporter.report_lag(
137                                        // unwrap is safe because we would have returned if values is empty.
138                                        highest_checkpoint.unwrap(),
139                                        highest_checkpoint_timestamp.unwrap(),
140                                    );
141
142                                    metrics
143                                        .total_committer_batches_succeeded
144                                        .with_label_values(&[H::NAME])
145                                        .inc();
146
147                                    metrics
148                                        .total_committer_rows_committed
149                                        .with_label_values(&[H::NAME])
150                                        .inc_by(batch_len as u64);
151
152                                    metrics
153                                        .total_committer_rows_affected
154                                        .with_label_values(&[H::NAME])
155                                        .inc_by(affected as u64);
156
157                                    metrics
158                                        .committer_tx_rows
159                                        .with_label_values(&[H::NAME])
160                                        .observe(affected as f64);
161
162                                    Ok(())
163                                }
164
165                                Err(e) => {
166                                    warn!(
167                                        pipeline = H::NAME,
168                                        elapsed_ms = elapsed * 1000.0,
169                                        committed = batch_len,
170                                        "Error writing batch: {e}",
171                                    );
172
173                                    metrics
174                                        .total_committer_batches_failed
175                                        .with_label_values(&[H::NAME])
176                                        .inc();
177
178                                    Err(BE::transient(Break::Err(e)))
179                                }
180                            }
181                        }
182                    };
183
184                    async move {
185                        // Double check that the commit actually went through, (this backoff should
186                        // not produce any permanent errors, but if it does, we need to shutdown
187                        // the pipeline).
188                        backoff::future::retry(backoff, commit).await?;
189                        if tx.send(watermark).await.is_err() {
190                            info!(pipeline = H::NAME, "Watermark closed channel");
191                            return Err(Break::<anyhow::Error>::Break);
192                        }
193
194                        Ok(())
195                    }
196                },
197            )
198            .await
199        {
200            Ok(()) => {
201                info!(pipeline = H::NAME, "Batches done, stopping committer");
202                Ok(())
203            }
204
205            Err(Break::Break) => {
206                info!(pipeline = H::NAME, "Channels closed, stopping committer");
207                Ok(())
208            }
209
210            Err(Break::Err(e)) => {
211                error!(pipeline = H::NAME, "Error from committer: {e}");
212                Err(e.context(format!("Error from committer {}", H::NAME)))
213            }
214        }
215    })
216}
217
218#[cfg(test)]
219mod tests {
220    use std::sync::Arc;
221    use std::sync::Mutex;
222    use std::sync::atomic::AtomicUsize;
223    use std::sync::atomic::Ordering;
224
225    use anyhow::ensure;
226    use async_trait::async_trait;
227    use sui_types::full_checkpoint_content::Checkpoint;
228    use tokio::sync::mpsc;
229
230    use crate::FieldCount;
231    use crate::metrics::IndexerMetrics;
232    use crate::mocks::store::ConnectionFailure;
233    use crate::mocks::store::FallibleMockConnection;
234    use crate::mocks::store::FallibleMockStore;
235    use crate::pipeline::Processor;
236    use crate::pipeline::WatermarkPart;
237    use crate::pipeline::concurrent::BatchStatus;
238    use crate::pipeline::concurrent::BatchedRows;
239    use crate::pipeline::concurrent::Handler;
240    use crate::store::CommitterWatermark;
241
242    use super::*;
243
244    #[derive(Clone, FieldCount, Default)]
245    pub struct StoredData {
246        pub cp_sequence_number: u64,
247        pub tx_sequence_numbers: Vec<u64>,
248        /// Tracks remaining commit failures for testing retry logic.
249        /// The committer spawns concurrent tasks that call H::commit,
250        /// so this needs to be thread-safe (hence Arc<AtomicUsize>).
251        pub commit_failure_remaining: Arc<AtomicUsize>,
252        pub commit_delay_ms: u64,
253    }
254
255    pub struct DataPipeline;
256
257    #[async_trait]
258    impl Processor for DataPipeline {
259        const NAME: &'static str = "data";
260
261        type Value = StoredData;
262
263        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
264            Ok(vec![])
265        }
266    }
267
268    #[async_trait]
269    impl Handler for DataPipeline {
270        type Store = FallibleMockStore;
271        type Batch = Vec<Self::Value>;
272
273        fn batch(
274            &self,
275            batch: &mut Self::Batch,
276            values: &mut std::vec::IntoIter<Self::Value>,
277        ) -> BatchStatus {
278            batch.extend(values);
279            BatchStatus::Pending
280        }
281
282        async fn commit<'a>(
283            &self,
284            batch: &Self::Batch,
285            conn: &mut FallibleMockConnection<'a>,
286        ) -> anyhow::Result<usize> {
287            for value in batch {
288                // If there's a delay, sleep for that duration
289                if value.commit_delay_ms > 0 {
290                    tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
291                }
292
293                // If there are remaining failures, fail the commit and decrement the counter
294                {
295                    let remaining = value
296                        .commit_failure_remaining
297                        .fetch_sub(1, Ordering::Relaxed);
298                    ensure!(
299                        remaining == 0,
300                        "Commit failed, remaining failures: {}",
301                        remaining - 1
302                    );
303                }
304
305                conn.0
306                    .commit_data(
307                        DataPipeline::NAME,
308                        value.cp_sequence_number,
309                        value.tx_sequence_numbers.clone(),
310                    )
311                    .await?;
312            }
313            Ok(batch.len())
314        }
315    }
316
317    struct TestSetup {
318        store: FallibleMockStore,
319        batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
320        watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
321        committer: Service,
322    }
323
324    /// Creates and spawns a committer task with the provided mock store, along with
325    /// all necessary channels and configuration. The committer runs in the background
326    /// and can be interacted with through the returned channels.
327    ///
328    /// # Arguments
329    /// * `store` - The mock store to use for testing
330    async fn setup_test(store: FallibleMockStore) -> TestSetup {
331        let config = CommitterConfig::default();
332        let metrics = IndexerMetrics::new(None, &Default::default());
333
334        let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
335        let (watermark_tx, watermark_rx) = mpsc::channel(10);
336
337        let store_clone = store.clone();
338        let handler = Arc::new(DataPipeline);
339        let committer = committer(
340            handler,
341            config,
342            batch_rx,
343            watermark_tx,
344            store_clone,
345            metrics,
346        );
347
348        TestSetup {
349            store,
350            batch_tx,
351            watermark_rx,
352            committer,
353        }
354    }
355
356    #[tokio::test]
357    async fn test_concurrent_batch_processing() {
358        let mut setup = setup_test(FallibleMockStore::default()).await;
359
360        // Send batches
361        let batch1 = BatchedRows::from_vec(
362            vec![
363                StoredData {
364                    cp_sequence_number: 1,
365                    tx_sequence_numbers: vec![1, 2, 3],
366                    ..Default::default()
367                },
368                StoredData {
369                    cp_sequence_number: 2,
370                    tx_sequence_numbers: vec![4, 5, 6],
371                    ..Default::default()
372                },
373            ],
374            vec![
375                WatermarkPart {
376                    watermark: CommitterWatermark {
377                        epoch_hi_inclusive: 0,
378                        checkpoint_hi_inclusive: 1,
379                        tx_hi: 3,
380                        timestamp_ms_hi_inclusive: 1000,
381                    },
382                    batch_rows: 1,
383                    total_rows: 1, // Total rows from checkpoint 1
384                },
385                WatermarkPart {
386                    watermark: CommitterWatermark {
387                        epoch_hi_inclusive: 0,
388                        checkpoint_hi_inclusive: 2,
389                        tx_hi: 6,
390                        timestamp_ms_hi_inclusive: 2000,
391                    },
392                    batch_rows: 1,
393                    total_rows: 1, // Total rows from checkpoint 2
394                },
395            ],
396        );
397
398        let batch2 = BatchedRows::from_vec(
399            vec![StoredData {
400                cp_sequence_number: 3,
401                tx_sequence_numbers: vec![7, 8, 9],
402                ..Default::default()
403            }],
404            vec![WatermarkPart {
405                watermark: CommitterWatermark {
406                    epoch_hi_inclusive: 0,
407                    checkpoint_hi_inclusive: 3,
408                    tx_hi: 9,
409                    timestamp_ms_hi_inclusive: 3000,
410                },
411                batch_rows: 1,
412                total_rows: 1, // Total rows from checkpoint 3
413            }],
414        );
415
416        setup.batch_tx.send(batch1).await.unwrap();
417        setup.batch_tx.send(batch2).await.unwrap();
418
419        // Verify watermarks. Blocking until committer has processed.
420        let watermark1 = setup.watermark_rx.recv().await.unwrap();
421        let watermark2 = setup.watermark_rx.recv().await.unwrap();
422        assert_eq!(watermark1.len(), 2);
423        assert_eq!(watermark2.len(), 1);
424
425        // Verify data was committed
426        {
427            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
428            assert_eq!(data.len(), 3);
429            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
430            assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
431            assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
432        }
433    }
434
435    #[tokio::test]
436    async fn test_commit_with_retries_for_commit_failure() {
437        let mut setup = setup_test(FallibleMockStore::default()).await;
438
439        // Create a batch with a single item that will fail once before succeeding
440        let batch = BatchedRows::from_vec(
441            vec![StoredData {
442                cp_sequence_number: 1,
443                tx_sequence_numbers: vec![1, 2, 3],
444                commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
445                commit_delay_ms: 1_000, // Long commit delay for testing state between retry
446            }],
447            vec![WatermarkPart {
448                watermark: CommitterWatermark {
449                    epoch_hi_inclusive: 0,
450                    checkpoint_hi_inclusive: 1,
451                    tx_hi: 3,
452                    timestamp_ms_hi_inclusive: 1000,
453                },
454                batch_rows: 1,
455                total_rows: 1,
456            }],
457        );
458
459        // Send the batch
460        setup.batch_tx.send(batch).await.unwrap();
461
462        // Wait for the first attempt to fail and before the retry succeeds
463        tokio::time::sleep(Duration::from_millis(1_500)).await;
464
465        // Verify state before retry succeeds
466        {
467            let data = setup.store.data.get(DataPipeline::NAME);
468            assert!(
469                data.is_none(),
470                "Data should not be committed before retry succeeds"
471            );
472        }
473        assert!(
474            setup.watermark_rx.try_recv().is_err(),
475            "No watermark should be received before retry succeeds"
476        );
477
478        // Wait for the retry to succeed
479        tokio::time::sleep(Duration::from_millis(1_500)).await;
480
481        // Verify state after retry succeeds
482        {
483            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
484
485            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
486        }
487        let watermark = setup.watermark_rx.recv().await.unwrap();
488        assert_eq!(watermark.len(), 1);
489    }
490
491    #[tokio::test]
492    async fn test_commit_with_retries_for_connection_failure() {
493        // Create a batch with a single item
494        let store = FallibleMockStore {
495            connection_failure: Arc::new(Mutex::new(ConnectionFailure {
496                connection_failure_attempts: 1,
497                connection_delay_ms: 1_000, // Long connection delay for testing state between retry
498                ..Default::default()
499            })),
500            ..Default::default()
501        };
502        let mut setup = setup_test(store).await;
503
504        let batch = BatchedRows::from_vec(
505            vec![StoredData {
506                cp_sequence_number: 1,
507                tx_sequence_numbers: vec![1, 2, 3],
508                ..Default::default()
509            }],
510            vec![WatermarkPart {
511                watermark: CommitterWatermark {
512                    epoch_hi_inclusive: 0,
513                    checkpoint_hi_inclusive: 1,
514                    tx_hi: 3,
515                    timestamp_ms_hi_inclusive: 1000,
516                },
517                batch_rows: 1,
518                total_rows: 1,
519            }],
520        );
521
522        // Send the batch
523        setup.batch_tx.send(batch).await.unwrap();
524
525        // Wait for the first attempt to fail and before the retry succeeds
526        tokio::time::sleep(Duration::from_millis(1_500)).await;
527
528        // Verify state before retry succeeds
529        {
530            let data = setup.store.data.get(DataPipeline::NAME);
531            assert!(
532                data.is_none(),
533                "Data should not be committed before retry succeeds"
534            );
535        }
536        assert!(
537            setup.watermark_rx.try_recv().is_err(),
538            "No watermark should be received before retry succeeds"
539        );
540
541        // Wait for the retry to succeed
542        tokio::time::sleep(Duration::from_millis(1_500)).await;
543
544        // Verify state after retry succeeds
545        {
546            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
547            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
548        }
549        let watermark = setup.watermark_rx.recv().await.unwrap();
550        assert_eq!(watermark.len(), 1);
551    }
552
553    #[tokio::test]
554    async fn test_empty_batch_handling() {
555        let mut setup = setup_test(FallibleMockStore::default()).await;
556
557        let empty_batch = BatchedRows::from_vec(
558            vec![], // Empty batch
559            vec![WatermarkPart {
560                watermark: CommitterWatermark {
561                    epoch_hi_inclusive: 0,
562                    checkpoint_hi_inclusive: 1,
563                    tx_hi: 0,
564                    timestamp_ms_hi_inclusive: 1000,
565                },
566                batch_rows: 0,
567                total_rows: 0,
568            }],
569        );
570
571        // Send the empty batch
572        setup.batch_tx.send(empty_batch).await.unwrap();
573
574        // Verify watermark is still sent (empty batches should still produce watermarks)
575        let watermark = setup.watermark_rx.recv().await.unwrap();
576        assert_eq!(watermark.len(), 1);
577        assert_eq!(watermark[0].batch_rows, 0);
578        assert_eq!(watermark[0].total_rows, 0);
579
580        // Verify no data was committed (since batch was empty)
581        {
582            let data = setup.store.data.get(DataPipeline::NAME);
583            assert!(
584                data.is_none(),
585                "No data should be committed for empty batch"
586            );
587        }
588    }
589
590    #[tokio::test]
591    async fn test_watermark_channel_closed() {
592        let setup = setup_test(FallibleMockStore::default()).await;
593
594        let batch = BatchedRows::from_vec(
595            vec![StoredData {
596                cp_sequence_number: 1,
597                tx_sequence_numbers: vec![1, 2, 3],
598                ..Default::default()
599            }],
600            vec![WatermarkPart {
601                watermark: CommitterWatermark {
602                    epoch_hi_inclusive: 0,
603                    checkpoint_hi_inclusive: 1,
604                    tx_hi: 3,
605                    timestamp_ms_hi_inclusive: 1000,
606                },
607                batch_rows: 1,
608                total_rows: 1,
609            }],
610        );
611
612        // Send the batch
613        setup.batch_tx.send(batch).await.unwrap();
614
615        // Wait for processing.
616        tokio::time::sleep(Duration::from_millis(100)).await;
617
618        // Close the watermark channel by dropping the receiver
619        drop(setup.watermark_rx);
620
621        // Wait a bit more for the committer to handle the channel closure
622        tokio::time::sleep(Duration::from_millis(200)).await;
623
624        // Verify data was still committed despite watermark channel closure
625        {
626            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
627            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
628        }
629
630        // Close the batch channel to allow the committer to terminate
631        drop(setup.batch_tx);
632
633        // Verify the committer task has terminated due to watermark channel closure
634        // The task should exit gracefully when it can't send watermarks (returns Break::Cancel)
635        setup.committer.shutdown().await.unwrap();
636    }
637}