sui_indexer_alt_framework/pipeline/sequential/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::TransactionalStore;
26
27/// The committer task gathers rows into batches and writes them to the database.
28///
29/// Data arrives out of order, grouped by checkpoint, on `rx`. The task orders them and waits to
30/// write them until either a configural polling interval has passed (controlled by
31/// `config.collect_interval()`), or `H::BATCH_SIZE` rows have been accumulated and we have
32/// received the next expected checkpoint.
33///
34/// Writes are performed on checkpoint boundaries (more than one checkpoint can be present in a
35/// single write), in a single transaction that includes all row updates and an update to the
36/// watermark table.
37///
38/// The committer can be configured to lag behind the ingestion service by a fixed number of
39/// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag.
40///
41/// Upon successful write, the task sends its new watermark back to the ingestion service, to
42/// unblock its regulator.
43pub(super) fn committer<H>(
44    handler: Arc<H>,
45    config: SequentialConfig,
46    mut next_checkpoint: u64,
47    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48    tx: mpsc::UnboundedSender<(&'static str, u64)>,
49    store: H::Store,
50    metrics: Arc<IndexerMetrics>,
51    min_eager_rows: usize,
52    max_batch_checkpoints: usize,
53) -> Service
54where
55    H: Handler + Send + Sync + 'static,
56    H::Store: TransactionalStore + 'static,
57{
58    Service::new().spawn_aborting(async move {
59        // The `poll` interval controls the maximum time to wait between commits, regardless of the
60        // amount of data available.
61        let mut poll = interval(config.committer.collect_interval());
62        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
63
64        let checkpoint_lag = config.checkpoint_lag;
65
66        // Buffer to gather the next batch to write. A checkpoint's data is only added to the batch
67        // when it is known to come from the next checkpoint after `watermark` (the current tip of
68        // the batch), and data from previous checkpoints will be discarded to avoid double writes.
69        //
70        // The batch may be non-empty at top of a tick of the committer's loop if the previous
71        // attempt at a write failed. Attempt is incremented every time a batch write fails, and is
72        // reset when it succeeds.
73        let mut attempt = 0;
74        let mut batch = H::Batch::default();
75        let mut batch_rows = 0;
76        let mut batch_checkpoints = 0;
77
78        // The task keeps track of the highest (inclusive) checkpoint it has added to the batch
79        // through `next_checkpoint`, and whether that batch needs to be written out. By extension
80        // it also knows the next checkpoint to expect and add to the batch. In case of db txn
81        // failures, we need to know the watermark update that failed, cached to this variable. in
82        // case of db txn failures.
83        let mut watermark = None;
84
85        // The committer task will periodically output a log message at a higher log level to
86        // demonstrate that the pipeline is making progress.
87        let mut logger = WatermarkLogger::new("sequential_committer");
88
89        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
90            &metrics.watermarked_checkpoint_timestamp_lag,
91            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
92            &metrics.watermark_checkpoint_in_db,
93        );
94
95        // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes
96        // rows in `batch`.
97        let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
98        let mut pending_rows = 0;
99
100        info!(pipeline = H::NAME, "Starting committer");
101
102        loop {
103            tokio::select! {
104                _ = poll.tick() => {
105                    if batch_checkpoints == 0
106                        && rx.is_closed()
107                        && rx.is_empty()
108                        && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
109                    {
110                        info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
111                        break;
112                    }
113
114                    if pending.len() > WARN_PENDING_WATERMARKS {
115                        warn!(
116                            pipeline = H::NAME,
117                            pending = pending.len(),
118                            "Pipeline has a large number of pending watermarks",
119                        );
120                    }
121
122                    let guard = metrics
123                        .collector_gather_latency
124                        .with_label_values(&[H::NAME])
125                        .start_timer();
126
127                    // Push data into the next batch as long as it's from contiguous checkpoints,
128                    // outside of the checkpoint lag and we haven't gathered information from too
129                    // many checkpoints already.
130                    //
131                    // We don't worry about overall size because the handler may have optimized
132                    // writes by combining rows, but we will limit the number of checkpoints we try
133                    // and batch together as a way to impose some limit on the size of the batch
134                    // (and therefore the length of the write transaction).
135                    // docs::#batch  (see docs/content/guides/developer/advanced/custom-indexer.mdx)
136                    while batch_checkpoints < max_batch_checkpoints {
137                        if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
138                            break;
139                        }
140
141                        let Some(entry) = pending.first_entry() else {
142                            break;
143                        };
144
145                        match next_checkpoint.cmp(entry.key()) {
146                            // Next pending checkpoint is from the future.
147                            Ordering::Less => break,
148
149                            // This is the next checkpoint -- include it.
150                            Ordering::Equal => {
151                                let indexed = entry.remove();
152                                batch_rows += indexed.len();
153                                batch_checkpoints += 1;
154                                handler.batch(&mut batch, indexed.values.into_iter());
155                                watermark = Some(indexed.watermark);
156                                next_checkpoint += 1;
157                            }
158
159                            // Next pending checkpoint is in the past, ignore it to avoid double
160                            // writes.
161                            Ordering::Greater => {
162                                metrics
163                                    .total_watermarks_out_of_order
164                                    .with_label_values(&[H::NAME])
165                                    .inc();
166
167                                let indexed = entry.remove();
168                                pending_rows -= indexed.len();
169                            }
170                        }
171                    }
172                    // docs::/#batch
173
174                    let elapsed = guard.stop_and_record();
175                    debug!(
176                        pipeline = H::NAME,
177                        elapsed_ms = elapsed * 1000.0,
178                        rows = batch_rows,
179                        pending = pending_rows,
180                        "Gathered batch",
181                    );
182
183                    // If there is no new data to commit, we can skip the rest of the process. Note
184                    // that we cannot use batch_rows for the check, since it is possible that there
185                    // are empty checkpoints with no new rows added, but the watermark still needs
186                    // to be updated. Conversely, if there is no watermark to be updated, we know
187                    // there is no data to write out.
188                    if batch_checkpoints == 0 {
189                        assert_eq!(batch_rows, 0);
190                        continue;
191                    }
192
193                    let Some(watermark) = watermark else {
194                        continue;
195                    };
196
197                    metrics
198                        .collector_batch_size
199                        .with_label_values(&[H::NAME])
200                        .observe(batch_rows as f64);
201
202                    metrics
203                        .total_committer_batches_attempted
204                        .with_label_values(&[H::NAME])
205                        .inc();
206
207                    metrics
208                        .watermark_epoch
209                        .with_label_values(&[H::NAME])
210                        .set(watermark.epoch_hi_inclusive as i64);
211
212                    metrics
213                        .watermark_checkpoint
214                        .with_label_values(&[H::NAME])
215                        .set(watermark.checkpoint_hi_inclusive as i64);
216
217                    metrics
218                        .watermark_transaction
219                        .with_label_values(&[H::NAME])
220                        .set(watermark.tx_hi as i64);
221
222                    metrics
223                        .watermark_timestamp_ms
224                        .with_label_values(&[H::NAME])
225                        .set(watermark.timestamp_ms_hi_inclusive as i64);
226
227                    let guard = metrics
228                        .committer_commit_latency
229                        .with_label_values(&[H::NAME])
230                        .start_timer();
231
232                    let affected = store.transaction(|conn| {
233                        async {
234                            conn.set_committer_watermark(H::NAME, watermark).await?;
235                            handler.commit(&batch, conn).await
236                        }.scope_boxed()
237                    }).await;
238
239
240                    let elapsed = guard.stop_and_record();
241
242                    let affected = match affected {
243                        Ok(affected) => affected,
244
245                        Err(e) => {
246                            warn!(
247                                pipeline = H::NAME,
248                                elapsed_ms = elapsed * 1000.0,
249                                attempt,
250                                committed = batch_rows,
251                                pending = pending_rows,
252                                "Error writing batch: {e}",
253                            );
254
255                            metrics
256                                .total_committer_batches_failed
257                                .with_label_values(&[H::NAME])
258                                .inc();
259
260                            attempt += 1;
261                            continue;
262                        }
263                    };
264
265                    debug!(
266                        pipeline = H::NAME,
267                        attempt,
268                        affected,
269                        committed = batch_rows,
270                        pending = pending_rows,
271                        "Wrote batch",
272                    );
273
274                    logger.log::<H>(&watermark, elapsed);
275
276                    checkpoint_lag_reporter.report_lag(
277                        watermark.checkpoint_hi_inclusive,
278                        watermark.timestamp_ms_hi_inclusive
279                    );
280
281                    metrics
282                        .total_committer_batches_succeeded
283                        .with_label_values(&[H::NAME])
284                        .inc();
285
286                    metrics
287                        .total_committer_rows_committed
288                        .with_label_values(&[H::NAME])
289                        .inc_by(batch_rows as u64);
290
291                    metrics
292                        .total_committer_rows_affected
293                        .with_label_values(&[H::NAME])
294                        .inc_by(affected as u64);
295
296                    metrics
297                        .committer_tx_rows
298                        .with_label_values(&[H::NAME])
299                        .observe(affected as f64);
300
301                    metrics
302                        .watermark_epoch_in_db
303                        .with_label_values(&[H::NAME])
304                        .set(watermark.epoch_hi_inclusive as i64);
305
306                    metrics
307                        .watermark_checkpoint_in_db
308                        .with_label_values(&[H::NAME])
309                        .set(watermark.checkpoint_hi_inclusive as i64);
310
311                    metrics
312                        .watermark_transaction_in_db
313                        .with_label_values(&[H::NAME])
314                        .set(watermark.tx_hi as i64);
315
316                    metrics
317                        .watermark_timestamp_in_db_ms
318                        .with_label_values(&[H::NAME])
319                        .set(watermark.timestamp_ms_hi_inclusive as i64);
320
321                    // docs::#send (see docs/content/guides/developer/advanced/custom-indexer.mdx)
322                    // Ignore the result -- the ingestion service will close this channel
323                    // once it is done, but there may still be checkpoints buffered that need
324                    // processing.
325                    let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
326                    // docs::/#send
327
328                    let _ = std::mem::take(&mut batch);
329                    pending_rows -= batch_rows;
330                    batch_checkpoints = 0;
331                    batch_rows = 0;
332                    attempt = 0;
333
334                    // If we could make more progress immediately, then schedule more work without
335                    // waiting.
336                    if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
337                        poll.reset_immediately();
338                    }
339                }
340
341                Some(indexed) = rx.recv() => {
342                    // Although there isn't an explicit collector in the sequential pipeline,
343                    // keeping this metric consistent with concurrent pipeline is useful
344                    // to monitor the backpressure from committer to processor.
345                    metrics
346                        .total_collector_rows_received
347                        .with_label_values(&[H::NAME])
348                        .inc_by(indexed.len() as u64);
349
350                    pending_rows += indexed.len();
351                    pending.insert(indexed.checkpoint(), indexed);
352
353                    // Once data has been inserted, check if we need to schedule a write before the
354                    // next polling interval. This is appropriate if there are a minimum number of
355                    // rows to write, and they are already in the batch, or we can process the next
356                    // checkpoint to extract them.
357                    if pending_rows < min_eager_rows {
358                        continue;
359                    }
360
361                    if batch_checkpoints > 0
362                        || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
363                    {
364                        poll.reset_immediately();
365                    }
366                }
367            }
368        }
369
370        info!(pipeline = H::NAME, "Stopping committer");
371        Ok(())
372    })
373}
374
375// Tests whether the first checkpoint in the `pending` buffer can be processed immediately, which
376// is subject to the following conditions:
377//
378// - It is at or before the `next_checkpoint` expected by the committer.
379// - It is at least `checkpoint_lag` checkpoints before the last checkpoint in the buffer.
380fn can_process_pending<T>(
381    next_checkpoint: u64,
382    checkpoint_lag: u64,
383    pending: &BTreeMap<u64, T>,
384) -> bool {
385    let Some((&first, _)) = pending.first_key_value() else {
386        return false;
387    };
388
389    let Some((&last, _)) = pending.last_key_value() else {
390        return false;
391    };
392
393    first <= next_checkpoint && first + checkpoint_lag <= last
394}
395
396#[cfg(test)]
397mod tests {
398    use std::sync::Arc;
399    use std::time::Duration;
400
401    use async_trait::async_trait;
402    use prometheus::Registry;
403    use sui_types::full_checkpoint_content::Checkpoint;
404    use tokio::sync::mpsc;
405
406    use crate::mocks::store::MockConnection;
407    use crate::mocks::store::MockStore;
408    use crate::pipeline::CommitterConfig;
409    use crate::pipeline::Processor;
410
411    use super::*;
412
413    // Test implementation of Handler
414    #[derive(Default)]
415    struct TestHandler;
416
417    #[async_trait]
418    impl Processor for TestHandler {
419        const NAME: &'static str = "test";
420        type Value = u64;
421
422        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
423            Ok(vec![])
424        }
425    }
426
427    #[async_trait]
428    impl super::Handler for TestHandler {
429        type Store = MockStore;
430        type Batch = Vec<u64>;
431        const MAX_BATCH_CHECKPOINTS: usize = 3; // Using small max value for testing.
432        const MIN_EAGER_ROWS: usize = 4; // Using small eager value for testing.
433
434        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
435            batch.extend(values);
436        }
437
438        async fn commit<'a>(
439            &self,
440            batch: &Self::Batch,
441            conn: &mut MockConnection<'a>,
442        ) -> anyhow::Result<usize> {
443            if !batch.is_empty() {
444                let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
445                sequential_data.extend(batch.iter().cloned());
446            }
447            Ok(batch.len())
448        }
449    }
450
451    struct TestSetup {
452        store: MockStore,
453        checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
454        commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
455        #[allow(unused)]
456        committer: Service,
457    }
458
459    /// Emulates adding a sequential pipeline to the indexer. The next_checkpoint is the checkpoint
460    /// for the indexer to ingest from.
461    fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
462        let metrics = IndexerMetrics::new(None, &Registry::default());
463
464        let min_eager_rows = config
465            .min_eager_rows
466            .unwrap_or(<TestHandler as super::Handler>::MIN_EAGER_ROWS);
467        let max_batch_checkpoints = config
468            .max_batch_checkpoints
469            .unwrap_or(<TestHandler as super::Handler>::MAX_BATCH_CHECKPOINTS);
470
471        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
472        #[allow(clippy::disallowed_methods)]
473        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
474
475        let store_clone = store.clone();
476        let handler = Arc::new(TestHandler);
477        let committer = committer(
478            handler,
479            config,
480            next_checkpoint,
481            checkpoint_rx,
482            commit_hi_tx,
483            store_clone,
484            metrics,
485            min_eager_rows,
486            max_batch_checkpoints,
487        );
488
489        TestSetup {
490            store,
491            checkpoint_tx,
492            commit_hi_rx,
493            committer,
494        }
495    }
496
497    async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
498        setup
499            .checkpoint_tx
500            .send(create_checkpoint(checkpoint))
501            .await
502            .unwrap();
503    }
504
505    fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
506        IndexedCheckpoint::new(
507            checkpoint,        // epoch
508            checkpoint,        // checkpoint number
509            checkpoint,        // tx_hi
510            checkpoint * 1000, // timestamp
511            vec![checkpoint],  // values
512        )
513    }
514
515    #[tokio::test]
516    async fn test_committer_processes_sequential_checkpoints() {
517        let config = SequentialConfig {
518            committer: CommitterConfig::default(),
519            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
520            ..Default::default()
521        };
522        let mut setup = setup_test(0, config, MockStore::default());
523
524        // Send checkpoints in order
525        for i in 0..3 {
526            send_checkpoint(&mut setup, i).await;
527        }
528
529        // Wait for processing
530        tokio::time::sleep(Duration::from_millis(200)).await;
531
532        // Verify data was written in order
533        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
534
535        // Verify watermark was updated
536        {
537            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
538            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
539            assert_eq!(watermark.tx_hi, 2);
540        }
541
542        // Verify commit_hi was sent to ingestion
543        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
544        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
545        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
546    }
547
548    /// Configure the MockStore with no watermark, and emulate `first_checkpoint` by passing the
549    /// `initial_watermark` into the setup.
550    #[tokio::test]
551    async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
552        let config = SequentialConfig::default();
553        let mut setup = setup_test(5, config, MockStore::default());
554
555        // Verify watermark hasn't progressed
556        let watermark = setup.store.watermark(TestHandler::NAME);
557        assert!(watermark.is_none());
558
559        // Send checkpoints in order
560        for i in 0..5 {
561            send_checkpoint(&mut setup, i).await;
562        }
563
564        // Wait for processing
565        tokio::time::sleep(Duration::from_millis(1000)).await;
566
567        // Verify watermark hasn't progressed
568        let watermark = setup.store.watermark(TestHandler::NAME);
569        assert!(watermark.is_none());
570
571        for i in 5..8 {
572            send_checkpoint(&mut setup, i).await;
573        }
574
575        // Wait for processing
576        tokio::time::sleep(Duration::from_millis(1000)).await;
577
578        // Verify data was written in order
579        assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
580
581        // Verify watermark was updated
582        {
583            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
584            assert_eq!(watermark.checkpoint_hi_inclusive, 7);
585            assert_eq!(watermark.tx_hi, 7);
586        }
587    }
588
589    #[tokio::test]
590    async fn test_committer_processes_out_of_order_checkpoints() {
591        let config = SequentialConfig {
592            committer: CommitterConfig::default(),
593            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
594            ..Default::default()
595        };
596        let mut setup = setup_test(0, config, MockStore::default());
597
598        // Send checkpoints out of order
599        for i in [1, 0, 2] {
600            send_checkpoint(&mut setup, i).await;
601        }
602
603        // Wait for processing
604        tokio::time::sleep(Duration::from_millis(200)).await;
605
606        // Verify data was written in order despite receiving out of order
607        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
608
609        // Verify watermark was updated
610        {
611            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
612            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
613            assert_eq!(watermark.tx_hi, 2);
614        }
615
616        // Verify commit_hi was sent to ingestion
617        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
618        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
619        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
620    }
621
622    #[tokio::test]
623    async fn test_committer_commit_up_to_max_batch_checkpoints() {
624        let config = SequentialConfig {
625            committer: CommitterConfig::default(),
626            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
627            ..Default::default()
628        };
629        let mut setup = setup_test(0, config, MockStore::default());
630
631        // Send checkpoints up to MAX_BATCH_CHECKPOINTS
632        for i in 0..4 {
633            send_checkpoint(&mut setup, i).await;
634        }
635
636        // Wait for processing
637        tokio::time::sleep(Duration::from_millis(200)).await;
638
639        // Verify commit_hi values are sent for each batch
640        let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
641        assert_eq!(
642            commit_hi1.1, 3,
643            "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
644        );
645
646        let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
647        assert_eq!(
648            commit_hi2.1, 4,
649            "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
650        );
651
652        // Verify data is written in order across batches
653        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
654    }
655
656    #[tokio::test]
657    async fn test_committer_does_not_commit_until_checkpoint_lag() {
658        let config = SequentialConfig {
659            committer: CommitterConfig::default(),
660            checkpoint_lag: 1, // Only commit checkpoints that are at least 1 behind
661            ..Default::default()
662        };
663        let mut setup = setup_test(0, config, MockStore::default());
664
665        // Send checkpoints 0-2
666        for i in 0..3 {
667            send_checkpoint(&mut setup, i).await;
668        }
669
670        // Wait for processing
671        tokio::time::sleep(Duration::from_millis(200)).await;
672
673        // Verify only checkpoints 0 and 1 are written (since checkpoint 2 is not lagged enough)
674        assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
675        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
676        assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
677
678        // Send checkpoint 3 to exceed the checkpoint_lag for checkpoint 2
679        send_checkpoint(&mut setup, 3).await;
680
681        // Wait for next polling processing
682        tokio::time::sleep(Duration::from_millis(1000)).await;
683
684        // Verify checkpoint 2 is now written
685        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
686        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
687        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
688    }
689
690    #[tokio::test]
691    async fn test_committer_commits_eagerly() {
692        let config = SequentialConfig {
693            committer: CommitterConfig {
694                collect_interval_ms: 4_000, // Long polling to test eager commit
695                ..Default::default()
696            },
697            checkpoint_lag: 0, // Zero checkpoint lag to not block the eager logic
698            ..Default::default()
699        };
700        let mut setup = setup_test(0, config, MockStore::default());
701
702        // Wait for initial poll to be over
703        tokio::time::sleep(Duration::from_millis(200)).await;
704
705        // Send checkpoints 0-2
706        for i in 0..3 {
707            send_checkpoint(&mut setup, i).await;
708        }
709
710        // Verify no checkpoints are written yet (not enough rows for eager commit)
711        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
712
713        // Send checkpoint 3 to trigger the eager commit (3 + 1 >= MIN_EAGER_ROWS)
714        send_checkpoint(&mut setup, 3).await;
715
716        // Wait for processing
717        tokio::time::sleep(Duration::from_millis(200)).await;
718
719        // Verify all checkpoints are written
720        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
721    }
722
723    #[tokio::test]
724    async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
725        let config = SequentialConfig {
726            committer: CommitterConfig {
727                collect_interval_ms: 4_000, // Long polling to test eager commit
728                ..Default::default()
729            },
730            checkpoint_lag: 4, // High checkpoint lag to block eager commits
731            ..Default::default()
732        };
733        let mut setup = setup_test(0, config, MockStore::default());
734
735        // Wait for initial poll to be over
736        tokio::time::sleep(Duration::from_millis(200)).await;
737
738        // Send checkpoints 0-3
739        for i in 0..4 {
740            send_checkpoint(&mut setup, i).await;
741        }
742
743        // Wait for processing
744        tokio::time::sleep(Duration::from_millis(200)).await;
745
746        // Verify no checkpoints are written due to checkpoint lag
747        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
748
749        // Send checkpoint 4 to exceed checkpoint lag
750        send_checkpoint(&mut setup, 4).await;
751
752        // Wait for processing
753        tokio::time::sleep(Duration::from_millis(200)).await;
754
755        // Verify only checkpoint 0 is written (since it's the only one that satisfies checkpoint_lag)
756        assert_eq!(setup.store.get_sequential_data(), vec![0]);
757    }
758
759    #[tokio::test]
760    async fn test_committer_retries_on_transaction_failure() {
761        let config = SequentialConfig {
762            committer: CommitterConfig {
763                collect_interval_ms: 1_000, // Long polling to test retry logic
764                ..Default::default()
765            },
766            checkpoint_lag: 0,
767            ..Default::default()
768        };
769
770        // Create store with transaction failure configuration
771        let store = MockStore::default().with_transaction_failures(1); // Will fail once before succeeding
772
773        let mut setup = setup_test(10, config, store);
774
775        // Send a checkpoint
776        send_checkpoint(&mut setup, 10).await;
777
778        // Wait for initial poll to be over
779        tokio::time::sleep(Duration::from_millis(200)).await;
780
781        // Verify no data is written before retries complete
782        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
783
784        // Wait for retries to complete
785        tokio::time::sleep(Duration::from_millis(1_200)).await;
786
787        // Verify data is written after retries complete on next polling
788        assert_eq!(setup.store.get_sequential_data(), vec![10]);
789
790        // Verify commit_hi is updated
791        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
792        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
793        assert_eq!(
794            commit_hi.1, 11,
795            "commit_hi should be 11 (checkpoint 10 + 1)"
796        );
797    }
798}