sui_indexer_alt_framework/pipeline/concurrent/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use backoff::ExponentialBackoff;
7use sui_futures::{
8    service::Service,
9    stream::{Break, TrySpawnStreamExt},
10};
11use tokio::sync::mpsc;
12use tokio_stream::wrappers::ReceiverStream;
13use tracing::{debug, error, info, warn};
14
15use crate::{
16    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
17    pipeline::{CommitterConfig, WatermarkPart},
18    store::Store,
19};
20
21use super::{BatchedRows, Handler};
22
23/// If the committer needs to retry a commit, it will wait this long initially.
24const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
25
26/// If the committer needs to retry a commit, it will wait at most this long between retries.
27const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
28
29/// The committer task is responsible for writing batches of rows to the database. It receives
30/// batches on `rx` and writes them out to the `db` concurrently (`config.write_concurrency`
31/// controls the degree of fan-out).
32///
33/// The writing of each batch will be repeatedly retried on an exponential back-off until it
34/// succeeds. Once the write succeeds, the [WatermarkPart]s for that batch are sent on `tx` to the
35/// watermark task.
36///
37/// This task will shutdown if its receiver or sender channels are closed.
38pub(super) fn committer<H: Handler + 'static>(
39    handler: Arc<H>,
40    config: CommitterConfig,
41    rx: mpsc::Receiver<BatchedRows<H>>,
42    tx: mpsc::Sender<Vec<WatermarkPart>>,
43    db: H::Store,
44    metrics: Arc<IndexerMetrics>,
45) -> Service {
46    Service::new().spawn_aborting(async move {
47        info!(pipeline = H::NAME, "Starting committer");
48        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
49            &metrics.partially_committed_checkpoint_timestamp_lag,
50            &metrics.latest_partially_committed_checkpoint_timestamp_lag_ms,
51            &metrics.latest_partially_committed_checkpoint,
52        );
53
54        match ReceiverStream::new(rx)
55            .try_for_each_spawned(
56                config.write_concurrency,
57                |BatchedRows {
58                     batch,
59                     batch_len,
60                     watermark,
61                 }| {
62                    let batch = Arc::new(batch);
63                    let handler = handler.clone();
64                    let tx = tx.clone();
65                    let db = db.clone();
66                    let metrics = metrics.clone();
67                    let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
68
69                    // Repeatedly try to get a connection to the DB and write the batch. Use an
70                    // exponential backoff in case the failure is due to contention over the DB
71                    // connection pool.
72                    let backoff = ExponentialBackoff {
73                        initial_interval: INITIAL_RETRY_INTERVAL,
74                        current_interval: INITIAL_RETRY_INTERVAL,
75                        max_interval: MAX_RETRY_INTERVAL,
76                        max_elapsed_time: None,
77                        ..Default::default()
78                    };
79
80                    let highest_checkpoint = watermark.iter().map(|w| w.checkpoint()).max();
81                    let highest_checkpoint_timestamp =
82                        watermark.iter().map(|w| w.timestamp_ms()).max();
83
84                    use backoff::Error as BE;
85                    let commit = move || {
86                        let batch = batch.clone();
87                        let handler = handler.clone();
88                        let db = db.clone();
89                        let metrics = metrics.clone();
90                        let checkpoint_lag_reporter = checkpoint_lag_reporter.clone();
91                        async move {
92                            if batch_len == 0 {
93                                return Ok(());
94                            }
95
96                            metrics
97                                .total_committer_batches_attempted
98                                .with_label_values(&[H::NAME])
99                                .inc();
100
101                            let guard = metrics
102                                .committer_commit_latency
103                                .with_label_values(&[H::NAME])
104                                .start_timer();
105
106                            let mut conn = db.connect().await.map_err(|e| {
107                                warn!(
108                                    pipeline = H::NAME,
109                                    "Committed failed to get connection for DB"
110                                );
111
112                                metrics
113                                    .total_committer_batches_failed
114                                    .with_label_values(&[H::NAME])
115                                    .inc();
116
117                                BE::transient(Break::Err(e))
118                            })?;
119
120                            let affected = handler.commit(&batch, &mut conn).await;
121                            let elapsed = guard.stop_and_record();
122
123                            match affected {
124                                Ok(affected) => {
125                                    debug!(
126                                        pipeline = H::NAME,
127                                        elapsed_ms = elapsed * 1000.0,
128                                        affected,
129                                        committed = batch_len,
130                                        "Wrote batch",
131                                    );
132
133                                    checkpoint_lag_reporter.report_lag(
134                                        // unwrap is safe because we would have returned if values is empty.
135                                        highest_checkpoint.unwrap(),
136                                        highest_checkpoint_timestamp.unwrap(),
137                                    );
138
139                                    metrics
140                                        .total_committer_batches_succeeded
141                                        .with_label_values(&[H::NAME])
142                                        .inc();
143
144                                    metrics
145                                        .total_committer_rows_committed
146                                        .with_label_values(&[H::NAME])
147                                        .inc_by(batch_len as u64);
148
149                                    metrics
150                                        .total_committer_rows_affected
151                                        .with_label_values(&[H::NAME])
152                                        .inc_by(affected as u64);
153
154                                    metrics
155                                        .committer_tx_rows
156                                        .with_label_values(&[H::NAME])
157                                        .observe(affected as f64);
158
159                                    Ok(())
160                                }
161
162                                Err(e) => {
163                                    warn!(
164                                        pipeline = H::NAME,
165                                        elapsed_ms = elapsed * 1000.0,
166                                        committed = batch_len,
167                                        "Error writing batch: {e}",
168                                    );
169
170                                    metrics
171                                        .total_committer_batches_failed
172                                        .with_label_values(&[H::NAME])
173                                        .inc();
174
175                                    Err(BE::transient(Break::Err(e)))
176                                }
177                            }
178                        }
179                    };
180
181                    async move {
182                        // Double check that the commit actually went through, (this backoff should
183                        // not produce any permanent errors, but if it does, we need to shutdown
184                        // the pipeline).
185                        backoff::future::retry(backoff, commit).await?;
186                        if tx.send(watermark).await.is_err() {
187                            info!(pipeline = H::NAME, "Watermark closed channel");
188                            return Err(Break::<anyhow::Error>::Break);
189                        }
190
191                        Ok(())
192                    }
193                },
194            )
195            .await
196        {
197            Ok(()) => {
198                info!(pipeline = H::NAME, "Batches done, stopping committer");
199                Ok(())
200            }
201
202            Err(Break::Break) => {
203                info!(pipeline = H::NAME, "Channels closed, stopping committer");
204                Ok(())
205            }
206
207            Err(Break::Err(e)) => {
208                error!(pipeline = H::NAME, "Error from committer: {e}");
209                Err(e.context(format!("Error from committer {}", H::NAME)))
210            }
211        }
212    })
213}
214
215#[cfg(test)]
216mod tests {
217    use std::sync::{
218        Arc, Mutex,
219        atomic::{AtomicUsize, Ordering},
220    };
221
222    use anyhow::ensure;
223    use async_trait::async_trait;
224    use sui_types::full_checkpoint_content::Checkpoint;
225    use tokio::sync::mpsc;
226
227    use crate::{
228        FieldCount,
229        metrics::IndexerMetrics,
230        mocks::store::*,
231        pipeline::{
232            Processor, WatermarkPart,
233            concurrent::{BatchStatus, BatchedRows, Handler},
234        },
235        store::CommitterWatermark,
236    };
237
238    use super::*;
239
240    #[derive(Clone, FieldCount, Default)]
241    pub struct StoredData {
242        pub cp_sequence_number: u64,
243        pub tx_sequence_numbers: Vec<u64>,
244        /// Tracks remaining commit failures for testing retry logic.
245        /// The committer spawns concurrent tasks that call H::commit,
246        /// so this needs to be thread-safe (hence Arc<AtomicUsize>).
247        pub commit_failure_remaining: Arc<AtomicUsize>,
248        pub commit_delay_ms: u64,
249    }
250
251    pub struct DataPipeline;
252
253    #[async_trait]
254    impl Processor for DataPipeline {
255        const NAME: &'static str = "data";
256
257        type Value = StoredData;
258
259        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
260            Ok(vec![])
261        }
262    }
263
264    #[async_trait]
265    impl Handler for DataPipeline {
266        type Store = MockStore;
267        type Batch = Vec<Self::Value>;
268
269        fn batch(
270            &self,
271            batch: &mut Self::Batch,
272            values: &mut std::vec::IntoIter<Self::Value>,
273        ) -> BatchStatus {
274            batch.extend(values);
275            BatchStatus::Pending
276        }
277
278        async fn commit<'a>(
279            &self,
280            batch: &Self::Batch,
281            conn: &mut MockConnection<'a>,
282        ) -> anyhow::Result<usize> {
283            for value in batch {
284                // If there's a delay, sleep for that duration
285                if value.commit_delay_ms > 0 {
286                    tokio::time::sleep(Duration::from_millis(value.commit_delay_ms)).await;
287                }
288
289                // If there are remaining failures, fail the commit and decrement the counter
290                {
291                    let remaining = value
292                        .commit_failure_remaining
293                        .fetch_sub(1, Ordering::Relaxed);
294                    ensure!(
295                        remaining == 0,
296                        "Commit failed, remaining failures: {}",
297                        remaining - 1
298                    );
299                }
300
301                conn.0
302                    .commit_data(
303                        DataPipeline::NAME,
304                        value.cp_sequence_number,
305                        value.tx_sequence_numbers.clone(),
306                    )
307                    .await?;
308            }
309            Ok(batch.len())
310        }
311    }
312
313    struct TestSetup {
314        store: MockStore,
315        batch_tx: mpsc::Sender<BatchedRows<DataPipeline>>,
316        watermark_rx: mpsc::Receiver<Vec<WatermarkPart>>,
317        committer: Service,
318    }
319
320    /// Creates and spawns a committer task with the provided mock store, along with
321    /// all necessary channels and configuration. The committer runs in the background
322    /// and can be interacted with through the returned channels.
323    ///
324    /// # Arguments
325    /// * `store` - The mock store to use for testing
326    async fn setup_test(store: MockStore) -> TestSetup {
327        let config = CommitterConfig::default();
328        let metrics = IndexerMetrics::new(None, &Default::default());
329
330        let (batch_tx, batch_rx) = mpsc::channel::<BatchedRows<DataPipeline>>(10);
331        let (watermark_tx, watermark_rx) = mpsc::channel(10);
332
333        let store_clone = store.clone();
334        let handler = Arc::new(DataPipeline);
335        let committer = committer(
336            handler,
337            config,
338            batch_rx,
339            watermark_tx,
340            store_clone,
341            metrics,
342        );
343
344        TestSetup {
345            store,
346            batch_tx,
347            watermark_rx,
348            committer,
349        }
350    }
351
352    #[tokio::test]
353    async fn test_concurrent_batch_processing() {
354        let mut setup = setup_test(MockStore::default()).await;
355
356        // Send batches
357        let batch1 = BatchedRows::from_vec(
358            vec![
359                StoredData {
360                    cp_sequence_number: 1,
361                    tx_sequence_numbers: vec![1, 2, 3],
362                    ..Default::default()
363                },
364                StoredData {
365                    cp_sequence_number: 2,
366                    tx_sequence_numbers: vec![4, 5, 6],
367                    ..Default::default()
368                },
369            ],
370            vec![
371                WatermarkPart {
372                    watermark: CommitterWatermark {
373                        epoch_hi_inclusive: 0,
374                        checkpoint_hi_inclusive: 1,
375                        tx_hi: 3,
376                        timestamp_ms_hi_inclusive: 1000,
377                    },
378                    batch_rows: 1,
379                    total_rows: 1, // Total rows from checkpoint 1
380                },
381                WatermarkPart {
382                    watermark: CommitterWatermark {
383                        epoch_hi_inclusive: 0,
384                        checkpoint_hi_inclusive: 2,
385                        tx_hi: 6,
386                        timestamp_ms_hi_inclusive: 2000,
387                    },
388                    batch_rows: 1,
389                    total_rows: 1, // Total rows from checkpoint 2
390                },
391            ],
392        );
393
394        let batch2 = BatchedRows::from_vec(
395            vec![StoredData {
396                cp_sequence_number: 3,
397                tx_sequence_numbers: vec![7, 8, 9],
398                ..Default::default()
399            }],
400            vec![WatermarkPart {
401                watermark: CommitterWatermark {
402                    epoch_hi_inclusive: 0,
403                    checkpoint_hi_inclusive: 3,
404                    tx_hi: 9,
405                    timestamp_ms_hi_inclusive: 3000,
406                },
407                batch_rows: 1,
408                total_rows: 1, // Total rows from checkpoint 3
409            }],
410        );
411
412        setup.batch_tx.send(batch1).await.unwrap();
413        setup.batch_tx.send(batch2).await.unwrap();
414
415        // Verify watermarks. Blocking until committer has processed.
416        let watermark1 = setup.watermark_rx.recv().await.unwrap();
417        let watermark2 = setup.watermark_rx.recv().await.unwrap();
418        assert_eq!(watermark1.len(), 2);
419        assert_eq!(watermark2.len(), 1);
420
421        // Verify data was committed
422        {
423            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
424            assert_eq!(data.len(), 3);
425            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
426            assert_eq!(data.get(&2).unwrap().value(), &vec![4, 5, 6]);
427            assert_eq!(data.get(&3).unwrap().value(), &vec![7, 8, 9]);
428        }
429    }
430
431    #[tokio::test]
432    async fn test_commit_with_retries_for_commit_failure() {
433        let mut setup = setup_test(MockStore::default()).await;
434
435        // Create a batch with a single item that will fail once before succeeding
436        let batch = BatchedRows::from_vec(
437            vec![StoredData {
438                cp_sequence_number: 1,
439                tx_sequence_numbers: vec![1, 2, 3],
440                commit_failure_remaining: Arc::new(AtomicUsize::new(1)),
441                commit_delay_ms: 1_000, // Long commit delay for testing state between retry
442            }],
443            vec![WatermarkPart {
444                watermark: CommitterWatermark {
445                    epoch_hi_inclusive: 0,
446                    checkpoint_hi_inclusive: 1,
447                    tx_hi: 3,
448                    timestamp_ms_hi_inclusive: 1000,
449                },
450                batch_rows: 1,
451                total_rows: 1,
452            }],
453        );
454
455        // Send the batch
456        setup.batch_tx.send(batch).await.unwrap();
457
458        // Wait for the first attempt to fail and before the retry succeeds
459        tokio::time::sleep(Duration::from_millis(1_500)).await;
460
461        // Verify state before retry succeeds
462        {
463            let data = setup.store.data.get(DataPipeline::NAME);
464            assert!(
465                data.is_none(),
466                "Data should not be committed before retry succeeds"
467            );
468        }
469        assert!(
470            setup.watermark_rx.try_recv().is_err(),
471            "No watermark should be received before retry succeeds"
472        );
473
474        // Wait for the retry to succeed
475        tokio::time::sleep(Duration::from_millis(1_500)).await;
476
477        // Verify state after retry succeeds
478        {
479            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
480
481            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
482        }
483        let watermark = setup.watermark_rx.recv().await.unwrap();
484        assert_eq!(watermark.len(), 1);
485    }
486
487    #[tokio::test]
488    async fn test_commit_with_retries_for_connection_failure() {
489        // Create a batch with a single item
490        let store = MockStore {
491            connection_failure: Arc::new(Mutex::new(ConnectionFailure {
492                connection_failure_attempts: 1,
493                connection_delay_ms: 1_000, // Long connection delay for testing state between retry
494                ..Default::default()
495            })),
496            ..Default::default()
497        };
498        let mut setup = setup_test(store).await;
499
500        let batch = BatchedRows::from_vec(
501            vec![StoredData {
502                cp_sequence_number: 1,
503                tx_sequence_numbers: vec![1, 2, 3],
504                ..Default::default()
505            }],
506            vec![WatermarkPart {
507                watermark: CommitterWatermark {
508                    epoch_hi_inclusive: 0,
509                    checkpoint_hi_inclusive: 1,
510                    tx_hi: 3,
511                    timestamp_ms_hi_inclusive: 1000,
512                },
513                batch_rows: 1,
514                total_rows: 1,
515            }],
516        );
517
518        // Send the batch
519        setup.batch_tx.send(batch).await.unwrap();
520
521        // Wait for the first attempt to fail and before the retry succeeds
522        tokio::time::sleep(Duration::from_millis(1_500)).await;
523
524        // Verify state before retry succeeds
525        {
526            let data = setup.store.data.get(DataPipeline::NAME);
527            assert!(
528                data.is_none(),
529                "Data should not be committed before retry succeeds"
530            );
531        }
532        assert!(
533            setup.watermark_rx.try_recv().is_err(),
534            "No watermark should be received before retry succeeds"
535        );
536
537        // Wait for the retry to succeed
538        tokio::time::sleep(Duration::from_millis(1_500)).await;
539
540        // Verify state after retry succeeds
541        {
542            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
543            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
544        }
545        let watermark = setup.watermark_rx.recv().await.unwrap();
546        assert_eq!(watermark.len(), 1);
547    }
548
549    #[tokio::test]
550    async fn test_empty_batch_handling() {
551        let mut setup = setup_test(MockStore::default()).await;
552
553        let empty_batch = BatchedRows::from_vec(
554            vec![], // Empty batch
555            vec![WatermarkPart {
556                watermark: CommitterWatermark {
557                    epoch_hi_inclusive: 0,
558                    checkpoint_hi_inclusive: 1,
559                    tx_hi: 0,
560                    timestamp_ms_hi_inclusive: 1000,
561                },
562                batch_rows: 0,
563                total_rows: 0,
564            }],
565        );
566
567        // Send the empty batch
568        setup.batch_tx.send(empty_batch).await.unwrap();
569
570        // Verify watermark is still sent (empty batches should still produce watermarks)
571        let watermark = setup.watermark_rx.recv().await.unwrap();
572        assert_eq!(watermark.len(), 1);
573        assert_eq!(watermark[0].batch_rows, 0);
574        assert_eq!(watermark[0].total_rows, 0);
575
576        // Verify no data was committed (since batch was empty)
577        {
578            let data = setup.store.data.get(DataPipeline::NAME);
579            assert!(
580                data.is_none(),
581                "No data should be committed for empty batch"
582            );
583        }
584    }
585
586    #[tokio::test]
587    async fn test_watermark_channel_closed() {
588        let setup = setup_test(MockStore::default()).await;
589
590        let batch = BatchedRows::from_vec(
591            vec![StoredData {
592                cp_sequence_number: 1,
593                tx_sequence_numbers: vec![1, 2, 3],
594                ..Default::default()
595            }],
596            vec![WatermarkPart {
597                watermark: CommitterWatermark {
598                    epoch_hi_inclusive: 0,
599                    checkpoint_hi_inclusive: 1,
600                    tx_hi: 3,
601                    timestamp_ms_hi_inclusive: 1000,
602                },
603                batch_rows: 1,
604                total_rows: 1,
605            }],
606        );
607
608        // Send the batch
609        setup.batch_tx.send(batch).await.unwrap();
610
611        // Wait for processing.
612        tokio::time::sleep(Duration::from_millis(100)).await;
613
614        // Close the watermark channel by dropping the receiver
615        drop(setup.watermark_rx);
616
617        // Wait a bit more for the committer to handle the channel closure
618        tokio::time::sleep(Duration::from_millis(200)).await;
619
620        // Verify data was still committed despite watermark channel closure
621        {
622            let data = setup.store.data.get(DataPipeline::NAME).unwrap();
623            assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
624        }
625
626        // Close the batch channel to allow the committer to terminate
627        drop(setup.batch_tx);
628
629        // Verify the committer task has terminated due to watermark channel closure
630        // The task should exit gracefully when it can't send watermarks (returns Break::Cancel)
631        setup.committer.shutdown().await.unwrap();
632    }
633}