sui_indexer_alt_framework/pipeline/sequential/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::SequentialStore;
26
27/// The committer task gathers rows into batches and writes them to the database.
28///
29/// Data arrives out of order, grouped by checkpoint, on `rx`. The task orders them and waits to
30/// write them until either a configural polling interval has passed (controlled by
31/// `config.collect_interval()`), or `H::BATCH_SIZE` rows have been accumulated and we have
32/// received the next expected checkpoint.
33///
34/// Writes are performed on checkpoint boundaries (more than one checkpoint can be present in a
35/// single write), in a single transaction that includes all row updates and an update to the
36/// watermark table.
37///
38/// The committer can be configured to lag behind the ingestion service by a fixed number of
39/// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag.
40///
41/// Upon successful write, the task sends its new watermark back to the ingestion service, to
42/// unblock its regulator.
43pub(super) fn committer<H: Handler>(
44    handler: Arc<H>,
45    config: SequentialConfig,
46    mut next_checkpoint: u64,
47    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48    tx: mpsc::UnboundedSender<(&'static str, u64)>,
49    store: H::Store,
50    metrics: Arc<IndexerMetrics>,
51    min_eager_rows: usize,
52    max_batch_checkpoints: usize,
53) -> Service {
54    Service::new().spawn_aborting(async move {
55        // The `poll` interval controls the maximum time to wait between commits, regardless of the
56        // amount of data available.
57        let mut poll = interval(config.committer.collect_interval());
58        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
59
60        let checkpoint_lag = config.checkpoint_lag;
61
62        // Buffer to gather the next batch to write. A checkpoint's data is only added to the batch
63        // when it is known to come from the next checkpoint after `watermark` (the current tip of
64        // the batch), and data from previous checkpoints will be discarded to avoid double writes.
65        //
66        // The batch may be non-empty at top of a tick of the committer's loop if the previous
67        // attempt at a write failed. Attempt is incremented every time a batch write fails, and is
68        // reset when it succeeds.
69        let mut attempt = 0;
70        let mut batch = H::Batch::default();
71        let mut batch_rows = 0;
72        let mut batch_checkpoints = 0;
73
74        // The task keeps track of the highest (inclusive) checkpoint it has added to the batch
75        // through `next_checkpoint`, and whether that batch needs to be written out. By extension
76        // it also knows the next checkpoint to expect and add to the batch. In case of db txn
77        // failures, we need to know the watermark update that failed, cached to this variable. in
78        // case of db txn failures.
79        let mut watermark = None;
80
81        // The committer task will periodically output a log message at a higher log level to
82        // demonstrate that the pipeline is making progress.
83        let mut logger = WatermarkLogger::new("sequential_committer");
84
85        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
86            &metrics.watermarked_checkpoint_timestamp_lag,
87            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
88            &metrics.watermark_checkpoint_in_db,
89        );
90
91        // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes
92        // rows in `batch`.
93        let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
94        let mut pending_rows = 0;
95
96        info!(pipeline = H::NAME, "Starting committer");
97
98        loop {
99            tokio::select! {
100                _ = poll.tick() => {
101                    if batch_checkpoints == 0
102                        && rx.is_closed()
103                        && rx.is_empty()
104                        && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
105                    {
106                        info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
107                        break;
108                    }
109
110                    if pending.len() > WARN_PENDING_WATERMARKS {
111                        warn!(
112                            pipeline = H::NAME,
113                            pending = pending.len(),
114                            "Pipeline has a large number of pending watermarks",
115                        );
116                    }
117
118                    let guard = metrics
119                        .collector_gather_latency
120                        .with_label_values(&[H::NAME])
121                        .start_timer();
122
123                    // Push data into the next batch as long as it's from contiguous checkpoints,
124                    // outside of the checkpoint lag and we haven't gathered information from too
125                    // many checkpoints already.
126                    //
127                    // We don't worry about overall size because the handler may have optimized
128                    // writes by combining rows, but we will limit the number of checkpoints we try
129                    // and batch together as a way to impose some limit on the size of the batch
130                    // (and therefore the length of the write transaction).
131                    // docs::#batch  (see docs/content/guides/developer/advanced/custom-indexer.mdx)
132                    while batch_checkpoints < max_batch_checkpoints {
133                        if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
134                            break;
135                        }
136
137                        let Some(entry) = pending.first_entry() else {
138                            break;
139                        };
140
141                        match next_checkpoint.cmp(entry.key()) {
142                            // Next pending checkpoint is from the future.
143                            Ordering::Less => break,
144
145                            // This is the next checkpoint -- include it.
146                            Ordering::Equal => {
147                                let indexed = entry.remove();
148                                batch_rows += indexed.len();
149                                batch_checkpoints += 1;
150                                handler.batch(&mut batch, indexed.values.into_iter());
151                                watermark = Some(indexed.watermark);
152                                next_checkpoint += 1;
153                            }
154
155                            // Next pending checkpoint is in the past, ignore it to avoid double
156                            // writes.
157                            Ordering::Greater => {
158                                metrics
159                                    .total_watermarks_out_of_order
160                                    .with_label_values(&[H::NAME])
161                                    .inc();
162
163                                let indexed = entry.remove();
164                                pending_rows -= indexed.len();
165                            }
166                        }
167                    }
168                    // docs::/#batch
169
170                    let elapsed = guard.stop_and_record();
171                    debug!(
172                        pipeline = H::NAME,
173                        elapsed_ms = elapsed * 1000.0,
174                        rows = batch_rows,
175                        pending = pending_rows,
176                        "Gathered batch",
177                    );
178
179                    // If there is no new data to commit, we can skip the rest of the process. Note
180                    // that we cannot use batch_rows for the check, since it is possible that there
181                    // are empty checkpoints with no new rows added, but the watermark still needs
182                    // to be updated. Conversely, if there is no watermark to be updated, we know
183                    // there is no data to write out.
184                    if batch_checkpoints == 0 {
185                        assert_eq!(batch_rows, 0);
186                        continue;
187                    }
188
189                    let Some(watermark) = watermark else {
190                        continue;
191                    };
192
193                    metrics
194                        .collector_batch_size
195                        .with_label_values(&[H::NAME])
196                        .observe(batch_rows as f64);
197
198                    metrics
199                        .total_committer_batches_attempted
200                        .with_label_values(&[H::NAME])
201                        .inc();
202
203                    metrics
204                        .watermark_epoch
205                        .with_label_values(&[H::NAME])
206                        .set(watermark.epoch_hi_inclusive as i64);
207
208                    metrics
209                        .watermark_checkpoint
210                        .with_label_values(&[H::NAME])
211                        .set(watermark.checkpoint_hi_inclusive as i64);
212
213                    metrics
214                        .watermark_transaction
215                        .with_label_values(&[H::NAME])
216                        .set(watermark.tx_hi as i64);
217
218                    metrics
219                        .watermark_timestamp_ms
220                        .with_label_values(&[H::NAME])
221                        .set(watermark.timestamp_ms_hi_inclusive as i64);
222
223                    let guard = metrics
224                        .committer_commit_latency
225                        .with_label_values(&[H::NAME])
226                        .start_timer();
227
228                    let affected = store.transaction(|conn| {
229                        async {
230                            conn.set_committer_watermark(H::NAME, watermark).await?;
231                            handler.commit(&batch, conn).await
232                        }.scope_boxed()
233                    }).await;
234
235
236                    let elapsed = guard.stop_and_record();
237
238                    let affected = match affected {
239                        Ok(affected) => affected,
240
241                        Err(e) => {
242                            warn!(
243                                pipeline = H::NAME,
244                                elapsed_ms = elapsed * 1000.0,
245                                attempt,
246                                committed = batch_rows,
247                                pending = pending_rows,
248                                "Error writing batch: {e}",
249                            );
250
251                            metrics
252                                .total_committer_batches_failed
253                                .with_label_values(&[H::NAME])
254                                .inc();
255
256                            attempt += 1;
257                            continue;
258                        }
259                    };
260
261                    debug!(
262                        pipeline = H::NAME,
263                        attempt,
264                        affected,
265                        committed = batch_rows,
266                        pending = pending_rows,
267                        "Wrote batch",
268                    );
269
270                    logger.log::<H>(&watermark, elapsed);
271
272                    checkpoint_lag_reporter.report_lag(
273                        watermark.checkpoint_hi_inclusive,
274                        watermark.timestamp_ms_hi_inclusive
275                    );
276
277                    metrics
278                        .total_committer_batches_succeeded
279                        .with_label_values(&[H::NAME])
280                        .inc();
281
282                    metrics
283                        .total_committer_rows_committed
284                        .with_label_values(&[H::NAME])
285                        .inc_by(batch_rows as u64);
286
287                    metrics
288                        .total_committer_rows_affected
289                        .with_label_values(&[H::NAME])
290                        .inc_by(affected as u64);
291
292                    metrics
293                        .committer_tx_rows
294                        .with_label_values(&[H::NAME])
295                        .observe(affected as f64);
296
297                    metrics
298                        .watermark_epoch_in_db
299                        .with_label_values(&[H::NAME])
300                        .set(watermark.epoch_hi_inclusive as i64);
301
302                    metrics
303                        .watermark_checkpoint_in_db
304                        .with_label_values(&[H::NAME])
305                        .set(watermark.checkpoint_hi_inclusive as i64);
306
307                    metrics
308                        .watermark_transaction_in_db
309                        .with_label_values(&[H::NAME])
310                        .set(watermark.tx_hi as i64);
311
312                    metrics
313                        .watermark_timestamp_in_db_ms
314                        .with_label_values(&[H::NAME])
315                        .set(watermark.timestamp_ms_hi_inclusive as i64);
316
317                    // docs::#send (see docs/content/guides/developer/advanced/custom-indexer.mdx)
318                    // Ignore the result -- the ingestion service will close this channel
319                    // once it is done, but there may still be checkpoints buffered that need
320                    // processing.
321                    let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
322                    // docs::/#send
323
324                    let _ = std::mem::take(&mut batch);
325                    pending_rows -= batch_rows;
326                    batch_checkpoints = 0;
327                    batch_rows = 0;
328                    attempt = 0;
329
330                    // If we could make more progress immediately, then schedule more work without
331                    // waiting.
332                    if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
333                        poll.reset_immediately();
334                    }
335                }
336
337                Some(indexed) = rx.recv() => {
338                    // Although there isn't an explicit collector in the sequential pipeline,
339                    // keeping this metric consistent with concurrent pipeline is useful
340                    // to monitor the backpressure from committer to processor.
341                    metrics
342                        .total_collector_rows_received
343                        .with_label_values(&[H::NAME])
344                        .inc_by(indexed.len() as u64);
345
346                    pending_rows += indexed.len();
347                    pending.insert(indexed.checkpoint(), indexed);
348
349                    // Once data has been inserted, check if we need to schedule a write before the
350                    // next polling interval. This is appropriate if there are a minimum number of
351                    // rows to write, and they are already in the batch, or we can process the next
352                    // checkpoint to extract them.
353                    if pending_rows < min_eager_rows {
354                        continue;
355                    }
356
357                    if batch_checkpoints > 0
358                        || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
359                    {
360                        poll.reset_immediately();
361                    }
362                }
363            }
364        }
365
366        info!(pipeline = H::NAME, "Stopping committer");
367        Ok(())
368    })
369}
370
371// Tests whether the first checkpoint in the `pending` buffer can be processed immediately, which
372// is subject to the following conditions:
373//
374// - It is at or before the `next_checkpoint` expected by the committer.
375// - It is at least `checkpoint_lag` checkpoints before the last checkpoint in the buffer.
376fn can_process_pending<T>(
377    next_checkpoint: u64,
378    checkpoint_lag: u64,
379    pending: &BTreeMap<u64, T>,
380) -> bool {
381    let Some((&first, _)) = pending.first_key_value() else {
382        return false;
383    };
384
385    let Some((&last, _)) = pending.last_key_value() else {
386        return false;
387    };
388
389    first <= next_checkpoint && first + checkpoint_lag <= last
390}
391
392#[cfg(test)]
393mod tests {
394    use std::sync::Arc;
395    use std::time::Duration;
396
397    use async_trait::async_trait;
398    use prometheus::Registry;
399    use sui_types::full_checkpoint_content::Checkpoint;
400    use tokio::sync::mpsc;
401
402    use crate::mocks::store::MockConnection;
403    use crate::mocks::store::MockStore;
404    use crate::pipeline::CommitterConfig;
405    use crate::pipeline::Processor;
406
407    use super::*;
408
409    // Test implementation of Handler
410    #[derive(Default)]
411    struct TestHandler;
412
413    #[async_trait]
414    impl Processor for TestHandler {
415        const NAME: &'static str = "test";
416        type Value = u64;
417
418        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
419            Ok(vec![])
420        }
421    }
422
423    #[async_trait]
424    impl super::Handler for TestHandler {
425        type Store = MockStore;
426        type Batch = Vec<u64>;
427        const MAX_BATCH_CHECKPOINTS: usize = 3; // Using small max value for testing.
428        const MIN_EAGER_ROWS: usize = 4; // Using small eager value for testing.
429
430        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
431            batch.extend(values);
432        }
433
434        async fn commit<'a>(
435            &self,
436            batch: &Self::Batch,
437            conn: &mut MockConnection<'a>,
438        ) -> anyhow::Result<usize> {
439            if !batch.is_empty() {
440                let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
441                sequential_data.extend(batch.iter().cloned());
442            }
443            Ok(batch.len())
444        }
445    }
446
447    struct TestSetup {
448        store: MockStore,
449        checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
450        commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
451        #[allow(unused)]
452        committer: Service,
453    }
454
455    /// Emulates adding a sequential pipeline to the indexer. The next_checkpoint is the checkpoint
456    /// for the indexer to ingest from.
457    fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
458        let metrics = IndexerMetrics::new(None, &Registry::default());
459
460        let min_eager_rows = config
461            .min_eager_rows
462            .unwrap_or(<TestHandler as super::Handler>::MIN_EAGER_ROWS);
463        let max_batch_checkpoints = config
464            .max_batch_checkpoints
465            .unwrap_or(<TestHandler as super::Handler>::MAX_BATCH_CHECKPOINTS);
466
467        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
468        #[allow(clippy::disallowed_methods)]
469        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
470
471        let store_clone = store.clone();
472        let handler = Arc::new(TestHandler);
473        let committer = committer(
474            handler,
475            config,
476            next_checkpoint,
477            checkpoint_rx,
478            commit_hi_tx,
479            store_clone,
480            metrics,
481            min_eager_rows,
482            max_batch_checkpoints,
483        );
484
485        TestSetup {
486            store,
487            checkpoint_tx,
488            commit_hi_rx,
489            committer,
490        }
491    }
492
493    async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
494        setup
495            .checkpoint_tx
496            .send(create_checkpoint(checkpoint))
497            .await
498            .unwrap();
499    }
500
501    fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
502        IndexedCheckpoint::new(
503            checkpoint,        // epoch
504            checkpoint,        // checkpoint number
505            checkpoint,        // tx_hi
506            checkpoint * 1000, // timestamp
507            vec![checkpoint],  // values
508        )
509    }
510
511    #[tokio::test]
512    async fn test_committer_processes_sequential_checkpoints() {
513        let config = SequentialConfig {
514            committer: CommitterConfig::default(),
515            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
516            ..Default::default()
517        };
518        let mut setup = setup_test(0, config, MockStore::default());
519
520        // Send checkpoints in order
521        for i in 0..3 {
522            send_checkpoint(&mut setup, i).await;
523        }
524
525        // Wait for processing
526        tokio::time::sleep(Duration::from_millis(200)).await;
527
528        // Verify data was written in order
529        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
530
531        // Verify watermark was updated
532        {
533            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
534            assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
535            assert_eq!(watermark.tx_hi, 2);
536        }
537
538        // Verify commit_hi was sent to ingestion
539        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
540        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
541        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
542    }
543
544    /// Configure the MockStore with no watermark, and emulate `first_checkpoint` by passing the
545    /// `initial_watermark` into the setup.
546    #[tokio::test]
547    async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
548        let config = SequentialConfig::default();
549        let mut setup = setup_test(5, config, MockStore::default());
550
551        // Verify watermark hasn't progressed
552        let watermark = setup.store.watermark(TestHandler::NAME);
553        assert!(watermark.is_none());
554
555        // Send checkpoints in order
556        for i in 0..5 {
557            send_checkpoint(&mut setup, i).await;
558        }
559
560        // Wait for processing
561        tokio::time::sleep(Duration::from_millis(1000)).await;
562
563        // Verify watermark hasn't progressed
564        let watermark = setup.store.watermark(TestHandler::NAME);
565        assert!(watermark.is_none());
566
567        for i in 5..8 {
568            send_checkpoint(&mut setup, i).await;
569        }
570
571        // Wait for processing
572        tokio::time::sleep(Duration::from_millis(1000)).await;
573
574        // Verify data was written in order
575        assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
576
577        // Verify watermark was updated
578        {
579            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
580            assert_eq!(watermark.checkpoint_hi_inclusive, Some(7));
581            assert_eq!(watermark.tx_hi, 7);
582        }
583    }
584
585    #[tokio::test]
586    async fn test_committer_processes_out_of_order_checkpoints() {
587        let config = SequentialConfig {
588            committer: CommitterConfig::default(),
589            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
590            ..Default::default()
591        };
592        let mut setup = setup_test(0, config, MockStore::default());
593
594        // Send checkpoints out of order
595        for i in [1, 0, 2] {
596            send_checkpoint(&mut setup, i).await;
597        }
598
599        // Wait for processing
600        tokio::time::sleep(Duration::from_millis(200)).await;
601
602        // Verify data was written in order despite receiving out of order
603        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
604
605        // Verify watermark was updated
606        {
607            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
608            assert_eq!(watermark.checkpoint_hi_inclusive, Some(2));
609            assert_eq!(watermark.tx_hi, 2);
610        }
611
612        // Verify commit_hi was sent to ingestion
613        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
614        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
615        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
616    }
617
618    #[tokio::test]
619    async fn test_committer_commit_up_to_max_batch_checkpoints() {
620        let config = SequentialConfig {
621            committer: CommitterConfig::default(),
622            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
623            ..Default::default()
624        };
625        let mut setup = setup_test(0, config, MockStore::default());
626
627        // Send checkpoints up to MAX_BATCH_CHECKPOINTS
628        for i in 0..4 {
629            send_checkpoint(&mut setup, i).await;
630        }
631
632        // Wait for processing
633        tokio::time::sleep(Duration::from_millis(200)).await;
634
635        // Verify commit_hi values are sent for each batch
636        let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
637        assert_eq!(
638            commit_hi1.1, 3,
639            "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
640        );
641
642        let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
643        assert_eq!(
644            commit_hi2.1, 4,
645            "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
646        );
647
648        // Verify data is written in order across batches
649        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
650    }
651
652    #[tokio::test]
653    async fn test_committer_does_not_commit_until_checkpoint_lag() {
654        let config = SequentialConfig {
655            committer: CommitterConfig::default(),
656            checkpoint_lag: 1, // Only commit checkpoints that are at least 1 behind
657            ..Default::default()
658        };
659        let mut setup = setup_test(0, config, MockStore::default());
660
661        // Send checkpoints 0-2
662        for i in 0..3 {
663            send_checkpoint(&mut setup, i).await;
664        }
665
666        // Wait for processing
667        tokio::time::sleep(Duration::from_millis(200)).await;
668
669        // Verify only checkpoints 0 and 1 are written (since checkpoint 2 is not lagged enough)
670        assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
671        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
672        assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
673
674        // Send checkpoint 3 to exceed the checkpoint_lag for checkpoint 2
675        send_checkpoint(&mut setup, 3).await;
676
677        // Wait for next polling processing
678        tokio::time::sleep(Duration::from_millis(1000)).await;
679
680        // Verify checkpoint 2 is now written
681        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
682        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
683        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
684    }
685
686    #[tokio::test]
687    async fn test_committer_commits_eagerly() {
688        let config = SequentialConfig {
689            committer: CommitterConfig {
690                collect_interval_ms: 4_000, // Long polling to test eager commit
691                ..Default::default()
692            },
693            checkpoint_lag: 0, // Zero checkpoint lag to not block the eager logic
694            ..Default::default()
695        };
696        let mut setup = setup_test(0, config, MockStore::default());
697
698        // Wait for initial poll to be over
699        tokio::time::sleep(Duration::from_millis(200)).await;
700
701        // Send checkpoints 0-2
702        for i in 0..3 {
703            send_checkpoint(&mut setup, i).await;
704        }
705
706        // Verify no checkpoints are written yet (not enough rows for eager commit)
707        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
708
709        // Send checkpoint 3 to trigger the eager commit (3 + 1 >= MIN_EAGER_ROWS)
710        send_checkpoint(&mut setup, 3).await;
711
712        // Wait for processing
713        tokio::time::sleep(Duration::from_millis(200)).await;
714
715        // Verify all checkpoints are written
716        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
717    }
718
719    #[tokio::test]
720    async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
721        let config = SequentialConfig {
722            committer: CommitterConfig {
723                collect_interval_ms: 4_000, // Long polling to test eager commit
724                ..Default::default()
725            },
726            checkpoint_lag: 4, // High checkpoint lag to block eager commits
727            ..Default::default()
728        };
729        let mut setup = setup_test(0, config, MockStore::default());
730
731        // Wait for initial poll to be over
732        tokio::time::sleep(Duration::from_millis(200)).await;
733
734        // Send checkpoints 0-3
735        for i in 0..4 {
736            send_checkpoint(&mut setup, i).await;
737        }
738
739        // Wait for processing
740        tokio::time::sleep(Duration::from_millis(200)).await;
741
742        // Verify no checkpoints are written due to checkpoint lag
743        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
744
745        // Send checkpoint 4 to exceed checkpoint lag
746        send_checkpoint(&mut setup, 4).await;
747
748        // Wait for processing
749        tokio::time::sleep(Duration::from_millis(200)).await;
750
751        // Verify only checkpoint 0 is written (since it's the only one that satisfies checkpoint_lag)
752        assert_eq!(setup.store.get_sequential_data(), vec![0]);
753    }
754
755    #[tokio::test]
756    async fn test_committer_retries_on_transaction_failure() {
757        let config = SequentialConfig {
758            committer: CommitterConfig {
759                collect_interval_ms: 1_000, // Long polling to test retry logic
760                ..Default::default()
761            },
762            checkpoint_lag: 0,
763            ..Default::default()
764        };
765
766        // Create store with transaction failure configuration
767        let store = MockStore::default().with_transaction_failures(1); // Will fail once before succeeding
768
769        let mut setup = setup_test(10, config, store);
770
771        // Send a checkpoint
772        send_checkpoint(&mut setup, 10).await;
773
774        // Wait for initial poll to be over
775        tokio::time::sleep(Duration::from_millis(200)).await;
776
777        // Verify no data is written before retries complete
778        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
779
780        // Wait for retries to complete
781        tokio::time::sleep(Duration::from_millis(1_200)).await;
782
783        // Verify data is written after retries complete on next polling
784        assert_eq!(setup.store.get_sequential_data(), vec![10]);
785
786        // Verify commit_hi is updated
787        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
788        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
789        assert_eq!(
790            commit_hi.1, 11,
791            "commit_hi should be 11 (checkpoint 10 + 1)"
792        );
793    }
794}