sui_indexer_alt_framework/pipeline/sequential/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
5
6use scoped_futures::ScopedFutureExt;
7use tokio::{
8    sync::mpsc,
9    task::JoinHandle,
10    time::{MissedTickBehavior, interval},
11};
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, info, warn};
14
15use crate::{
16    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
17    pipeline::{IndexedCheckpoint, WARN_PENDING_WATERMARKS, logging::WatermarkLogger},
18    store::{Connection, TransactionalStore},
19};
20
21use super::{Handler, SequentialConfig};
22
23/// The committer task gathers rows into batches and writes them to the database.
24///
25/// Data arrives out of order, grouped by checkpoint, on `rx`. The task orders them and waits to
26/// write them until either a configural polling interval has passed (controlled by
27/// `config.collect_interval()`), or `H::BATCH_SIZE` rows have been accumulated and we have
28/// received the next expected checkpoint.
29///
30/// Writes are performed on checkpoint boundaries (more than one checkpoint can be present in a
31/// single write), in a single transaction that includes all row updates and an update to the
32/// watermark table.
33///
34/// The committer can be configured to lag behind the ingestion service by a fixed number of
35/// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag.
36///
37/// Upon successful write, the task sends its new watermark back to the ingestion service, to
38/// unblock its regulator.
39///
40/// The task can be shutdown using its `cancel` token or if either of its channels are closed.
41pub(super) fn committer<H>(
42    config: SequentialConfig,
43    mut next_checkpoint: u64,
44    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
45    tx: mpsc::UnboundedSender<(&'static str, u64)>,
46    store: H::Store,
47    metrics: Arc<IndexerMetrics>,
48    cancel: CancellationToken,
49) -> JoinHandle<()>
50where
51    H: Handler + Send + Sync + 'static,
52    H::Store: TransactionalStore + 'static,
53{
54    tokio::spawn(async move {
55        // The `poll` interval controls the maximum time to wait between commits, regardless of the
56        // amount of data available.
57        let mut poll = interval(config.committer.collect_interval());
58        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
59
60        let checkpoint_lag = config.checkpoint_lag;
61
62        // Buffer to gather the next batch to write. A checkpoint's data is only added to the batch
63        // when it is known to come from the next checkpoint after `watermark` (the current tip of
64        // the batch), and data from previous checkpoints will be discarded to avoid double writes.
65        //
66        // The batch may be non-empty at top of a tick of the committer's loop if the previous
67        // attempt at a write failed. Attempt is incremented every time a batch write fails, and is
68        // reset when it succeeds.
69        let mut attempt = 0;
70        let mut batch = H::Batch::default();
71        let mut batch_rows = 0;
72        let mut batch_checkpoints = 0;
73        // The task keeps track of the highest (inclusive) checkpoint it has added to the batch
74        // through `next_checkpoint`, and whether that batch needs to be written out. By extension
75        // it also knows the next checkpoint to expect and add to the batch. In case of db txn
76        // failures, we need to know the watermark update that failed, cached to this variable. in
77        // case of db txn failures.
78        let mut watermark = None;
79
80        // The committer task will periodically output a log message at a higher log level to
81        // demonstrate that the pipeline is making progress.
82        let mut logger = WatermarkLogger::new("sequential_committer");
83
84        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
85            &metrics.watermarked_checkpoint_timestamp_lag,
86            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
87            &metrics.watermark_checkpoint_in_db,
88        );
89
90        // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes
91        // rows in `batch`.
92        let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
93        let mut pending_rows = 0;
94
95        info!(pipeline = H::NAME, "Starting committer");
96
97        loop {
98            tokio::select! {
99                _ = cancel.cancelled() => {
100                    info!(pipeline = H::NAME, "Shutdown received");
101                    break;
102                }
103
104                _ = poll.tick() => {
105                    if batch_checkpoints == 0
106                        && rx.is_closed()
107                        && rx.is_empty()
108                        && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
109                    {
110                        info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
111                        break;
112                    }
113
114                    if pending.len() > WARN_PENDING_WATERMARKS {
115                        warn!(
116                            pipeline = H::NAME,
117                            pending = pending.len(),
118                            "Pipeline has a large number of pending watermarks",
119                        );
120                    }
121
122                    let guard = metrics
123                        .collector_gather_latency
124                        .with_label_values(&[H::NAME])
125                        .start_timer();
126
127                    // Push data into the next batch as long as it's from contiguous checkpoints,
128                    // outside of the checkpoint lag and we haven't gathered information from too
129                    // many checkpoints already.
130                    //
131                    // We don't worry about overall size because the handler may have optimized
132                    // writes by combining rows, but we will limit the number of checkpoints we try
133                    // and batch together as a way to impose some limit on the size of the batch
134                    // (and therefore the length of the write transaction).
135                    // docs::#batch  (see docs/content/guides/developer/advanced/custom-indexer.mdx)
136                    while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
137                        if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
138                            break;
139                        }
140
141                        let Some(entry) = pending.first_entry() else {
142                            break;
143                        };
144
145                        match next_checkpoint.cmp(entry.key()) {
146                            // Next pending checkpoint is from the future.
147                            Ordering::Less => break,
148
149                            // This is the next checkpoint -- include it.
150                            Ordering::Equal => {
151                                let indexed = entry.remove();
152                                batch_rows += indexed.len();
153                                batch_checkpoints += 1;
154                                H::batch(&mut batch, indexed.values);
155                                watermark = Some(indexed.watermark);
156                                next_checkpoint += 1;
157                            }
158
159                            // Next pending checkpoint is in the past, ignore it to avoid double
160                            // writes.
161                            Ordering::Greater => {
162                                metrics
163                                    .total_watermarks_out_of_order
164                                    .with_label_values(&[H::NAME])
165                                    .inc();
166
167                                let indexed = entry.remove();
168                                pending_rows -= indexed.len();
169                            }
170                        }
171                    }
172                    // docs::/#batch
173
174                    let elapsed = guard.stop_and_record();
175                    debug!(
176                        pipeline = H::NAME,
177                        elapsed_ms = elapsed * 1000.0,
178                        rows = batch_rows,
179                        pending = pending_rows,
180                        "Gathered batch",
181                    );
182
183                    // If there is no new data to commit, we can skip the rest of the process. Note
184                    // that we cannot use batch_rows for the check, since it is possible that there
185                    // are empty checkpoints with no new rows added, but the watermark still needs
186                    // to be updated. Conversely, if there is no watermark to be updated, we know
187                    // there is no data to write out.
188                    if batch_checkpoints == 0 {
189                        assert_eq!(batch_rows, 0);
190                        continue;
191                    }
192
193                    let Some(watermark) = watermark else {
194                        continue;
195                    };
196
197                    metrics
198                        .collector_batch_size
199                        .with_label_values(&[H::NAME])
200                        .observe(batch_rows as f64);
201
202                    metrics
203                        .total_committer_batches_attempted
204                        .with_label_values(&[H::NAME])
205                        .inc();
206
207                    metrics
208                        .watermark_epoch
209                        .with_label_values(&[H::NAME])
210                        .set(watermark.epoch_hi_inclusive as i64);
211
212                    metrics
213                        .watermark_checkpoint
214                        .with_label_values(&[H::NAME])
215                        .set(watermark.checkpoint_hi_inclusive as i64);
216
217                    metrics
218                        .watermark_transaction
219                        .with_label_values(&[H::NAME])
220                        .set(watermark.tx_hi as i64);
221
222                    metrics
223                        .watermark_timestamp_ms
224                        .with_label_values(&[H::NAME])
225                        .set(watermark.timestamp_ms_hi_inclusive as i64);
226
227                    let guard = metrics
228                        .committer_commit_latency
229                        .with_label_values(&[H::NAME])
230                        .start_timer();
231
232                    let affected = store.transaction(|conn| {
233                        async {
234                            conn.set_committer_watermark(H::NAME, watermark).await?;
235                            H::commit(&batch, conn).await
236                        }.scope_boxed()
237                    }).await;
238
239
240                    let elapsed = guard.stop_and_record();
241
242                    let affected = match affected {
243                        Ok(affected) => affected,
244
245                        Err(e) => {
246                            warn!(
247                                pipeline = H::NAME,
248                                elapsed_ms = elapsed * 1000.0,
249                                attempt,
250                                committed = batch_rows,
251                                pending = pending_rows,
252                                "Error writing batch: {e}",
253                            );
254
255                            metrics
256                                .total_committer_batches_failed
257                                .with_label_values(&[H::NAME])
258                                .inc();
259
260                            attempt += 1;
261                            continue;
262                        }
263                    };
264
265                    debug!(
266                        pipeline = H::NAME,
267                        attempt,
268                        affected,
269                        committed = batch_rows,
270                        pending = pending_rows,
271                        "Wrote batch",
272                    );
273
274                    logger.log::<H>(&watermark, elapsed);
275
276                    checkpoint_lag_reporter.report_lag(
277                        watermark.checkpoint_hi_inclusive,
278                        watermark.timestamp_ms_hi_inclusive
279                    );
280
281                    metrics
282                        .total_committer_batches_succeeded
283                        .with_label_values(&[H::NAME])
284                        .inc();
285
286                    metrics
287                        .total_committer_rows_committed
288                        .with_label_values(&[H::NAME])
289                        .inc_by(batch_rows as u64);
290
291                    metrics
292                        .total_committer_rows_affected
293                        .with_label_values(&[H::NAME])
294                        .inc_by(affected as u64);
295
296                    metrics
297                        .committer_tx_rows
298                        .with_label_values(&[H::NAME])
299                        .observe(affected as f64);
300
301                    metrics
302                        .watermark_epoch_in_db
303                        .with_label_values(&[H::NAME])
304                        .set(watermark.epoch_hi_inclusive as i64);
305
306                    metrics
307                        .watermark_checkpoint_in_db
308                        .with_label_values(&[H::NAME])
309                        .set(watermark.checkpoint_hi_inclusive as i64);
310
311                    metrics
312                        .watermark_transaction_in_db
313                        .with_label_values(&[H::NAME])
314                        .set(watermark.tx_hi as i64);
315
316                    metrics
317                        .watermark_timestamp_in_db_ms
318                        .with_label_values(&[H::NAME])
319                        .set(watermark.timestamp_ms_hi_inclusive as i64);
320
321                    // docs::#send (see docs/content/guides/developer/advanced/custom-indexer.mdx)
322                    // Ignore the result -- the ingestion service will close this channel
323                    // once it is done, but there may still be checkpoints buffered that need
324                    // processing.
325                    let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
326                    // docs::/#send
327
328                    let _ = std::mem::take(&mut batch);
329                    pending_rows -= batch_rows;
330                    batch_checkpoints = 0;
331                    batch_rows = 0;
332                    attempt = 0;
333
334                    // If we could make more progress immediately, then schedule more work without
335                    // waiting.
336                    if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
337                        poll.reset_immediately();
338                    }
339                }
340
341                Some(indexed) = rx.recv() => {
342                    // Although there isn't an explicit collector in the sequential pipeline,
343                    // keeping this metric consistent with concurrent pipeline is useful
344                    // to monitor the backpressure from committer to processor.
345                    metrics
346                        .total_collector_rows_received
347                        .with_label_values(&[H::NAME])
348                        .inc_by(indexed.len() as u64);
349
350                    pending_rows += indexed.len();
351                    pending.insert(indexed.checkpoint(), indexed);
352
353                    // Once data has been inserted, check if we need to schedule a write before the
354                    // next polling interval. This is appropriate if there are a minimum number of
355                    // rows to write, and they are already in the batch, or we can process the next
356                    // checkpoint to extract them.
357                    if pending_rows < H::MIN_EAGER_ROWS {
358                        continue;
359                    }
360
361                    if batch_checkpoints > 0
362                        || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
363                    {
364                        poll.reset_immediately();
365                    }
366                }
367            }
368        }
369
370        info!(pipeline = H::NAME, "Stopping committer");
371    })
372}
373
374// Tests whether the first checkpoint in the `pending` buffer can be processed immediately, which
375// is subject to the following conditions:
376//
377// - It is at or before the `next_checkpoint` expected by the committer.
378// - It is at least `checkpoint_lag` checkpoints before the last checkpoint in the buffer.
379fn can_process_pending<T>(
380    next_checkpoint: u64,
381    checkpoint_lag: u64,
382    pending: &BTreeMap<u64, T>,
383) -> bool {
384    let Some((&first, _)) = pending.first_key_value() else {
385        return false;
386    };
387
388    let Some((&last, _)) = pending.last_key_value() else {
389        return false;
390    };
391
392    first <= next_checkpoint && first + checkpoint_lag <= last
393}
394
395#[cfg(test)]
396mod tests {
397    use crate::{
398        mocks::store::{MockConnection, MockStore},
399        pipeline::{CommitterConfig, Processor},
400    };
401
402    use super::*;
403    use async_trait::async_trait;
404    use prometheus::Registry;
405    use std::{sync::Arc, time::Duration};
406    use sui_types::full_checkpoint_content::CheckpointData;
407    use tokio::sync::mpsc;
408    use tokio_util::sync::CancellationToken;
409
410    // Test implementation of Handler
411    #[derive(Default)]
412    struct TestHandler;
413
414    #[async_trait]
415    impl Processor for TestHandler {
416        const NAME: &'static str = "test";
417        type Value = u64;
418
419        async fn process(
420            &self,
421            _checkpoint: &Arc<CheckpointData>,
422        ) -> anyhow::Result<Vec<Self::Value>> {
423            Ok(vec![])
424        }
425    }
426
427    #[async_trait]
428    impl super::Handler for TestHandler {
429        type Store = MockStore;
430        type Batch = Vec<u64>;
431        const MAX_BATCH_CHECKPOINTS: usize = 3; // Using small max value for testing.
432        const MIN_EAGER_ROWS: usize = 4; // Using small eager value for testing.
433
434        fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
435            batch.extend(values);
436        }
437
438        async fn commit<'a>(
439            batch: &Self::Batch,
440            conn: &mut MockConnection<'a>,
441        ) -> anyhow::Result<usize> {
442            if !batch.is_empty() {
443                let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
444                sequential_data.extend(batch.iter().cloned());
445            }
446            Ok(batch.len())
447        }
448    }
449
450    struct TestSetup {
451        store: MockStore,
452        checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
453        commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
454        committer_handle: JoinHandle<()>,
455    }
456
457    /// Emulates adding a sequential pipeline to the indexer. The next_checkpoint is the checkpoint
458    /// for the indexer to ingest from.
459    fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
460        let metrics = IndexerMetrics::new(None, &Registry::default());
461        let cancel = CancellationToken::new();
462
463        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
464        #[allow(clippy::disallowed_methods)]
465        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
466
467        let store_clone = store.clone();
468        let committer_handle = committer(
469            config,
470            next_checkpoint,
471            checkpoint_rx,
472            commit_hi_tx,
473            store_clone,
474            metrics,
475            cancel,
476        );
477
478        TestSetup {
479            store,
480            checkpoint_tx,
481            commit_hi_rx,
482            committer_handle,
483        }
484    }
485
486    async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
487        setup
488            .checkpoint_tx
489            .send(create_checkpoint(checkpoint))
490            .await
491            .unwrap();
492    }
493
494    fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
495        IndexedCheckpoint::new(
496            checkpoint,        // epoch
497            checkpoint,        // checkpoint number
498            checkpoint,        // tx_hi
499            checkpoint * 1000, // timestamp
500            vec![checkpoint],  // values
501        )
502    }
503
504    #[tokio::test]
505    async fn test_committer_processes_sequential_checkpoints() {
506        let config = SequentialConfig {
507            committer: CommitterConfig::default(),
508            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
509        };
510        let mut setup = setup_test(0, config, MockStore::default());
511
512        // Send checkpoints in order
513        for i in 0..3 {
514            send_checkpoint(&mut setup, i).await;
515        }
516
517        // Wait for processing
518        tokio::time::sleep(Duration::from_millis(200)).await;
519
520        // Verify data was written in order
521        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
522
523        // Verify watermark was updated
524        {
525            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
526            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
527            assert_eq!(watermark.tx_hi, 2);
528        }
529
530        // Verify commit_hi was sent to ingestion
531        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
532        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
533        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
534
535        // Clean up
536        drop(setup.checkpoint_tx);
537        let _ = setup.committer_handle.await;
538    }
539
540    /// Configure the MockStore with no watermark, and emulate `first_checkpoint` by passing the
541    /// `initial_watermark` into the setup.
542    #[tokio::test]
543    async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
544        let config = SequentialConfig::default();
545        let mut setup = setup_test(5, config, MockStore::default());
546
547        // Verify watermark hasn't progressed
548        let watermark = setup.store.watermark(TestHandler::NAME);
549        assert!(watermark.is_none());
550
551        // Send checkpoints in order
552        for i in 0..5 {
553            send_checkpoint(&mut setup, i).await;
554        }
555
556        // Wait for processing
557        tokio::time::sleep(Duration::from_millis(1000)).await;
558
559        // Verify watermark hasn't progressed
560        let watermark = setup.store.watermark(TestHandler::NAME);
561        assert!(watermark.is_none());
562
563        for i in 5..8 {
564            send_checkpoint(&mut setup, i).await;
565        }
566
567        // Wait for processing
568        tokio::time::sleep(Duration::from_millis(1000)).await;
569
570        // Verify data was written in order
571        assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
572
573        // Verify watermark was updated
574        {
575            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
576            assert_eq!(watermark.checkpoint_hi_inclusive, 7);
577            assert_eq!(watermark.tx_hi, 7);
578        }
579
580        // Clean up
581        drop(setup.checkpoint_tx);
582        let _ = setup.committer_handle.await;
583    }
584
585    #[tokio::test]
586    async fn test_committer_processes_out_of_order_checkpoints() {
587        let config = SequentialConfig {
588            committer: CommitterConfig::default(),
589            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
590        };
591        let mut setup = setup_test(0, config, MockStore::default());
592
593        // Send checkpoints out of order
594        for i in [1, 0, 2] {
595            send_checkpoint(&mut setup, i).await;
596        }
597
598        // Wait for processing
599        tokio::time::sleep(Duration::from_millis(200)).await;
600
601        // Verify data was written in order despite receiving out of order
602        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
603
604        // Verify watermark was updated
605        {
606            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
607            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
608            assert_eq!(watermark.tx_hi, 2);
609        }
610
611        // Verify commit_hi was sent to ingestion
612        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
613        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
614        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
615
616        // Clean up
617        drop(setup.checkpoint_tx);
618        let _ = setup.committer_handle.await;
619    }
620
621    #[tokio::test]
622    async fn test_committer_commit_up_to_max_batch_checkpoints() {
623        let config = SequentialConfig {
624            committer: CommitterConfig::default(),
625            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
626        };
627        let mut setup = setup_test(0, config, MockStore::default());
628
629        // Send checkpoints up to MAX_BATCH_CHECKPOINTS
630        for i in 0..4 {
631            send_checkpoint(&mut setup, i).await;
632        }
633
634        // Wait for processing
635        tokio::time::sleep(Duration::from_millis(200)).await;
636
637        // Verify commit_hi values are sent for each batch
638        let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
639        assert_eq!(
640            commit_hi1.1, 3,
641            "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
642        );
643
644        let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
645        assert_eq!(
646            commit_hi2.1, 4,
647            "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
648        );
649
650        // Verify data is written in order across batches
651        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
652
653        // Clean up
654        drop(setup.checkpoint_tx);
655        let _ = setup.committer_handle.await;
656    }
657
658    #[tokio::test]
659    async fn test_committer_does_not_commit_until_checkpoint_lag() {
660        let config = SequentialConfig {
661            committer: CommitterConfig::default(),
662            checkpoint_lag: 1, // Only commit checkpoints that are at least 1 behind
663        };
664        let mut setup = setup_test(0, config, MockStore::default());
665
666        // Send checkpoints 0-2
667        for i in 0..3 {
668            send_checkpoint(&mut setup, i).await;
669        }
670
671        // Wait for processing
672        tokio::time::sleep(Duration::from_millis(200)).await;
673
674        // Verify only checkpoints 0 and 1 are written (since checkpoint 2 is not lagged enough)
675        assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
676        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
677        assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
678
679        // Send checkpoint 3 to exceed the checkpoint_lag for checkpoint 2
680        send_checkpoint(&mut setup, 3).await;
681
682        // Wait for next polling processing
683        tokio::time::sleep(Duration::from_millis(1000)).await;
684
685        // Verify checkpoint 2 is now written
686        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
687        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
688        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
689
690        // Clean up
691        drop(setup.checkpoint_tx);
692        let _ = setup.committer_handle.await;
693    }
694
695    #[tokio::test]
696    async fn test_committer_commits_eagerly() {
697        let config = SequentialConfig {
698            committer: CommitterConfig {
699                collect_interval_ms: 4_000, // Long polling to test eager commit
700                ..Default::default()
701            },
702            checkpoint_lag: 0, // Zero checkpoint lag to not block the eager logic
703        };
704        let mut setup = setup_test(0, config, MockStore::default());
705
706        // Wait for initial poll to be over
707        tokio::time::sleep(Duration::from_millis(200)).await;
708
709        // Send checkpoints 0-2
710        for i in 0..3 {
711            send_checkpoint(&mut setup, i).await;
712        }
713
714        // Verify no checkpoints are written yet (not enough rows for eager commit)
715        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
716
717        // Send checkpoint 3 to trigger the eager commit (3 + 1 >= MIN_EAGER_ROWS)
718        send_checkpoint(&mut setup, 3).await;
719
720        // Wait for processing
721        tokio::time::sleep(Duration::from_millis(200)).await;
722
723        // Verify all checkpoints are written
724        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
725
726        // Clean up
727        drop(setup.checkpoint_tx);
728        let _ = setup.committer_handle.await;
729    }
730
731    #[tokio::test]
732    async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
733        let config = SequentialConfig {
734            committer: CommitterConfig {
735                collect_interval_ms: 4_000, // Long polling to test eager commit
736                ..Default::default()
737            },
738            checkpoint_lag: 4, // High checkpoint lag to block eager commits
739        };
740        let mut setup = setup_test(0, config, MockStore::default());
741
742        // Wait for initial poll to be over
743        tokio::time::sleep(Duration::from_millis(200)).await;
744
745        // Send checkpoints 0-3
746        for i in 0..4 {
747            send_checkpoint(&mut setup, i).await;
748        }
749
750        // Wait for processing
751        tokio::time::sleep(Duration::from_millis(200)).await;
752
753        // Verify no checkpoints are written due to checkpoint lag
754        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
755
756        // Send checkpoint 4 to exceed checkpoint lag
757        send_checkpoint(&mut setup, 4).await;
758
759        // Wait for processing
760        tokio::time::sleep(Duration::from_millis(200)).await;
761
762        // Verify only checkpoint 0 is written (since it's the only one that satisfies checkpoint_lag)
763        assert_eq!(setup.store.get_sequential_data(), vec![0]);
764
765        // Clean up
766        drop(setup.checkpoint_tx);
767        let _ = setup.committer_handle.await;
768    }
769
770    #[tokio::test]
771    async fn test_committer_retries_on_transaction_failure() {
772        let config = SequentialConfig {
773            committer: CommitterConfig {
774                collect_interval_ms: 1_000, // Long polling to test retry logic
775                ..Default::default()
776            },
777            checkpoint_lag: 0,
778        };
779
780        // Create store with transaction failure configuration
781        let store = MockStore::default().with_transaction_failures(1); // Will fail once before succeeding
782
783        let mut setup = setup_test(10, config, store);
784
785        // Send a checkpoint
786        send_checkpoint(&mut setup, 10).await;
787
788        // Wait for initial poll to be over
789        tokio::time::sleep(Duration::from_millis(200)).await;
790
791        // Verify no data is written before retries complete
792        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
793
794        // Wait for retries to complete
795        tokio::time::sleep(Duration::from_millis(1_200)).await;
796
797        // Verify data is written after retries complete on next polling
798        assert_eq!(setup.store.get_sequential_data(), vec![10]);
799
800        // Verify commit_hi is updated
801        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
802        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
803        assert_eq!(
804            commit_hi.1, 11,
805            "commit_hi should be 11 (checkpoint 10 + 1)"
806        );
807
808        // Clean up
809        drop(setup.checkpoint_tx);
810        let _ = setup.committer_handle.await;
811    }
812}