sui_indexer_alt_framework/pipeline/sequential/
committer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::Ordering;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use scoped_futures::ScopedFutureExt;
9use sui_futures::service::Service;
10use tokio::sync::mpsc;
11use tokio::time::MissedTickBehavior;
12use tokio::time::interval;
13use tracing::debug;
14use tracing::info;
15use tracing::warn;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::IndexedCheckpoint;
20use crate::pipeline::WARN_PENDING_WATERMARKS;
21use crate::pipeline::logging::WatermarkLogger;
22use crate::pipeline::sequential::Handler;
23use crate::pipeline::sequential::SequentialConfig;
24use crate::store::Connection;
25use crate::store::TransactionalStore;
26
27/// The committer task gathers rows into batches and writes them to the database.
28///
29/// Data arrives out of order, grouped by checkpoint, on `rx`. The task orders them and waits to
30/// write them until either a configural polling interval has passed (controlled by
31/// `config.collect_interval()`), or `H::BATCH_SIZE` rows have been accumulated and we have
32/// received the next expected checkpoint.
33///
34/// Writes are performed on checkpoint boundaries (more than one checkpoint can be present in a
35/// single write), in a single transaction that includes all row updates and an update to the
36/// watermark table.
37///
38/// The committer can be configured to lag behind the ingestion service by a fixed number of
39/// checkpoints (configured by `checkpoint_lag`). A value of `0` means no lag.
40///
41/// Upon successful write, the task sends its new watermark back to the ingestion service, to
42/// unblock its regulator.
43pub(super) fn committer<H>(
44    handler: Arc<H>,
45    config: SequentialConfig,
46    mut next_checkpoint: u64,
47    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
48    tx: mpsc::UnboundedSender<(&'static str, u64)>,
49    store: H::Store,
50    metrics: Arc<IndexerMetrics>,
51) -> Service
52where
53    H: Handler + Send + Sync + 'static,
54    H::Store: TransactionalStore + 'static,
55{
56    Service::new().spawn_aborting(async move {
57        // The `poll` interval controls the maximum time to wait between commits, regardless of the
58        // amount of data available.
59        let mut poll = interval(config.committer.collect_interval());
60        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
61
62        let checkpoint_lag = config.checkpoint_lag;
63
64        // Buffer to gather the next batch to write. A checkpoint's data is only added to the batch
65        // when it is known to come from the next checkpoint after `watermark` (the current tip of
66        // the batch), and data from previous checkpoints will be discarded to avoid double writes.
67        //
68        // The batch may be non-empty at top of a tick of the committer's loop if the previous
69        // attempt at a write failed. Attempt is incremented every time a batch write fails, and is
70        // reset when it succeeds.
71        let mut attempt = 0;
72        let mut batch = H::Batch::default();
73        let mut batch_rows = 0;
74        let mut batch_checkpoints = 0;
75
76        // The task keeps track of the highest (inclusive) checkpoint it has added to the batch
77        // through `next_checkpoint`, and whether that batch needs to be written out. By extension
78        // it also knows the next checkpoint to expect and add to the batch. In case of db txn
79        // failures, we need to know the watermark update that failed, cached to this variable. in
80        // case of db txn failures.
81        let mut watermark = None;
82
83        // The committer task will periodically output a log message at a higher log level to
84        // demonstrate that the pipeline is making progress.
85        let mut logger = WatermarkLogger::new("sequential_committer");
86
87        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
88            &metrics.watermarked_checkpoint_timestamp_lag,
89            &metrics.latest_watermarked_checkpoint_timestamp_lag_ms,
90            &metrics.watermark_checkpoint_in_db,
91        );
92
93        // Data for checkpoint that haven't been written yet. Note that `pending_rows` includes
94        // rows in `batch`.
95        let mut pending: BTreeMap<u64, IndexedCheckpoint<H>> = BTreeMap::new();
96        let mut pending_rows = 0;
97
98        info!(pipeline = H::NAME, "Starting committer");
99
100        loop {
101            tokio::select! {
102                _ = poll.tick() => {
103                    if batch_checkpoints == 0
104                        && rx.is_closed()
105                        && rx.is_empty()
106                        && !can_process_pending(next_checkpoint, checkpoint_lag, &pending)
107                    {
108                        info!(pipeline = H::NAME, "Process closed channel and no more data to commit");
109                        break;
110                    }
111
112                    if pending.len() > WARN_PENDING_WATERMARKS {
113                        warn!(
114                            pipeline = H::NAME,
115                            pending = pending.len(),
116                            "Pipeline has a large number of pending watermarks",
117                        );
118                    }
119
120                    let guard = metrics
121                        .collector_gather_latency
122                        .with_label_values(&[H::NAME])
123                        .start_timer();
124
125                    // Push data into the next batch as long as it's from contiguous checkpoints,
126                    // outside of the checkpoint lag and we haven't gathered information from too
127                    // many checkpoints already.
128                    //
129                    // We don't worry about overall size because the handler may have optimized
130                    // writes by combining rows, but we will limit the number of checkpoints we try
131                    // and batch together as a way to impose some limit on the size of the batch
132                    // (and therefore the length of the write transaction).
133                    // docs::#batch  (see docs/content/guides/developer/advanced/custom-indexer.mdx)
134                    while batch_checkpoints < H::MAX_BATCH_CHECKPOINTS {
135                        if !can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
136                            break;
137                        }
138
139                        let Some(entry) = pending.first_entry() else {
140                            break;
141                        };
142
143                        match next_checkpoint.cmp(entry.key()) {
144                            // Next pending checkpoint is from the future.
145                            Ordering::Less => break,
146
147                            // This is the next checkpoint -- include it.
148                            Ordering::Equal => {
149                                let indexed = entry.remove();
150                                batch_rows += indexed.len();
151                                batch_checkpoints += 1;
152                                handler.batch(&mut batch, indexed.values.into_iter());
153                                watermark = Some(indexed.watermark);
154                                next_checkpoint += 1;
155                            }
156
157                            // Next pending checkpoint is in the past, ignore it to avoid double
158                            // writes.
159                            Ordering::Greater => {
160                                metrics
161                                    .total_watermarks_out_of_order
162                                    .with_label_values(&[H::NAME])
163                                    .inc();
164
165                                let indexed = entry.remove();
166                                pending_rows -= indexed.len();
167                            }
168                        }
169                    }
170                    // docs::/#batch
171
172                    let elapsed = guard.stop_and_record();
173                    debug!(
174                        pipeline = H::NAME,
175                        elapsed_ms = elapsed * 1000.0,
176                        rows = batch_rows,
177                        pending = pending_rows,
178                        "Gathered batch",
179                    );
180
181                    // If there is no new data to commit, we can skip the rest of the process. Note
182                    // that we cannot use batch_rows for the check, since it is possible that there
183                    // are empty checkpoints with no new rows added, but the watermark still needs
184                    // to be updated. Conversely, if there is no watermark to be updated, we know
185                    // there is no data to write out.
186                    if batch_checkpoints == 0 {
187                        assert_eq!(batch_rows, 0);
188                        continue;
189                    }
190
191                    let Some(watermark) = watermark else {
192                        continue;
193                    };
194
195                    metrics
196                        .collector_batch_size
197                        .with_label_values(&[H::NAME])
198                        .observe(batch_rows as f64);
199
200                    metrics
201                        .total_committer_batches_attempted
202                        .with_label_values(&[H::NAME])
203                        .inc();
204
205                    metrics
206                        .watermark_epoch
207                        .with_label_values(&[H::NAME])
208                        .set(watermark.epoch_hi_inclusive as i64);
209
210                    metrics
211                        .watermark_checkpoint
212                        .with_label_values(&[H::NAME])
213                        .set(watermark.checkpoint_hi_inclusive as i64);
214
215                    metrics
216                        .watermark_transaction
217                        .with_label_values(&[H::NAME])
218                        .set(watermark.tx_hi as i64);
219
220                    metrics
221                        .watermark_timestamp_ms
222                        .with_label_values(&[H::NAME])
223                        .set(watermark.timestamp_ms_hi_inclusive as i64);
224
225                    let guard = metrics
226                        .committer_commit_latency
227                        .with_label_values(&[H::NAME])
228                        .start_timer();
229
230                    let affected = store.transaction(|conn| {
231                        async {
232                            conn.set_committer_watermark(H::NAME, watermark).await?;
233                            handler.commit(&batch, conn).await
234                        }.scope_boxed()
235                    }).await;
236
237
238                    let elapsed = guard.stop_and_record();
239
240                    let affected = match affected {
241                        Ok(affected) => affected,
242
243                        Err(e) => {
244                            warn!(
245                                pipeline = H::NAME,
246                                elapsed_ms = elapsed * 1000.0,
247                                attempt,
248                                committed = batch_rows,
249                                pending = pending_rows,
250                                "Error writing batch: {e}",
251                            );
252
253                            metrics
254                                .total_committer_batches_failed
255                                .with_label_values(&[H::NAME])
256                                .inc();
257
258                            attempt += 1;
259                            continue;
260                        }
261                    };
262
263                    debug!(
264                        pipeline = H::NAME,
265                        attempt,
266                        affected,
267                        committed = batch_rows,
268                        pending = pending_rows,
269                        "Wrote batch",
270                    );
271
272                    logger.log::<H>(&watermark, elapsed);
273
274                    checkpoint_lag_reporter.report_lag(
275                        watermark.checkpoint_hi_inclusive,
276                        watermark.timestamp_ms_hi_inclusive
277                    );
278
279                    metrics
280                        .total_committer_batches_succeeded
281                        .with_label_values(&[H::NAME])
282                        .inc();
283
284                    metrics
285                        .total_committer_rows_committed
286                        .with_label_values(&[H::NAME])
287                        .inc_by(batch_rows as u64);
288
289                    metrics
290                        .total_committer_rows_affected
291                        .with_label_values(&[H::NAME])
292                        .inc_by(affected as u64);
293
294                    metrics
295                        .committer_tx_rows
296                        .with_label_values(&[H::NAME])
297                        .observe(affected as f64);
298
299                    metrics
300                        .watermark_epoch_in_db
301                        .with_label_values(&[H::NAME])
302                        .set(watermark.epoch_hi_inclusive as i64);
303
304                    metrics
305                        .watermark_checkpoint_in_db
306                        .with_label_values(&[H::NAME])
307                        .set(watermark.checkpoint_hi_inclusive as i64);
308
309                    metrics
310                        .watermark_transaction_in_db
311                        .with_label_values(&[H::NAME])
312                        .set(watermark.tx_hi as i64);
313
314                    metrics
315                        .watermark_timestamp_in_db_ms
316                        .with_label_values(&[H::NAME])
317                        .set(watermark.timestamp_ms_hi_inclusive as i64);
318
319                    // docs::#send (see docs/content/guides/developer/advanced/custom-indexer.mdx)
320                    // Ignore the result -- the ingestion service will close this channel
321                    // once it is done, but there may still be checkpoints buffered that need
322                    // processing.
323                    let _ = tx.send((H::NAME, watermark.checkpoint_hi_inclusive + 1));
324                    // docs::/#send
325
326                    let _ = std::mem::take(&mut batch);
327                    pending_rows -= batch_rows;
328                    batch_checkpoints = 0;
329                    batch_rows = 0;
330                    attempt = 0;
331
332                    // If we could make more progress immediately, then schedule more work without
333                    // waiting.
334                    if can_process_pending(next_checkpoint, checkpoint_lag, &pending) {
335                        poll.reset_immediately();
336                    }
337                }
338
339                Some(indexed) = rx.recv() => {
340                    // Although there isn't an explicit collector in the sequential pipeline,
341                    // keeping this metric consistent with concurrent pipeline is useful
342                    // to monitor the backpressure from committer to processor.
343                    metrics
344                        .total_collector_rows_received
345                        .with_label_values(&[H::NAME])
346                        .inc_by(indexed.len() as u64);
347
348                    pending_rows += indexed.len();
349                    pending.insert(indexed.checkpoint(), indexed);
350
351                    // Once data has been inserted, check if we need to schedule a write before the
352                    // next polling interval. This is appropriate if there are a minimum number of
353                    // rows to write, and they are already in the batch, or we can process the next
354                    // checkpoint to extract them.
355                    if pending_rows < H::MIN_EAGER_ROWS {
356                        continue;
357                    }
358
359                    if batch_checkpoints > 0
360                        || can_process_pending(next_checkpoint, checkpoint_lag, &pending)
361                    {
362                        poll.reset_immediately();
363                    }
364                }
365            }
366        }
367
368        info!(pipeline = H::NAME, "Stopping committer");
369        Ok(())
370    })
371}
372
373// Tests whether the first checkpoint in the `pending` buffer can be processed immediately, which
374// is subject to the following conditions:
375//
376// - It is at or before the `next_checkpoint` expected by the committer.
377// - It is at least `checkpoint_lag` checkpoints before the last checkpoint in the buffer.
378fn can_process_pending<T>(
379    next_checkpoint: u64,
380    checkpoint_lag: u64,
381    pending: &BTreeMap<u64, T>,
382) -> bool {
383    let Some((&first, _)) = pending.first_key_value() else {
384        return false;
385    };
386
387    let Some((&last, _)) = pending.last_key_value() else {
388        return false;
389    };
390
391    first <= next_checkpoint && first + checkpoint_lag <= last
392}
393
394#[cfg(test)]
395mod tests {
396    use std::sync::Arc;
397    use std::time::Duration;
398
399    use async_trait::async_trait;
400    use prometheus::Registry;
401    use sui_types::full_checkpoint_content::Checkpoint;
402    use tokio::sync::mpsc;
403
404    use crate::mocks::store::MockConnection;
405    use crate::mocks::store::MockStore;
406    use crate::pipeline::CommitterConfig;
407    use crate::pipeline::Processor;
408
409    use super::*;
410
411    // Test implementation of Handler
412    #[derive(Default)]
413    struct TestHandler;
414
415    #[async_trait]
416    impl Processor for TestHandler {
417        const NAME: &'static str = "test";
418        type Value = u64;
419
420        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
421            Ok(vec![])
422        }
423    }
424
425    #[async_trait]
426    impl super::Handler for TestHandler {
427        type Store = MockStore;
428        type Batch = Vec<u64>;
429        const MAX_BATCH_CHECKPOINTS: usize = 3; // Using small max value for testing.
430        const MIN_EAGER_ROWS: usize = 4; // Using small eager value for testing.
431
432        fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
433            batch.extend(values);
434        }
435
436        async fn commit<'a>(
437            &self,
438            batch: &Self::Batch,
439            conn: &mut MockConnection<'a>,
440        ) -> anyhow::Result<usize> {
441            if !batch.is_empty() {
442                let mut sequential_data = conn.0.sequential_checkpoint_data.lock().unwrap();
443                sequential_data.extend(batch.iter().cloned());
444            }
445            Ok(batch.len())
446        }
447    }
448
449    struct TestSetup {
450        store: MockStore,
451        checkpoint_tx: mpsc::Sender<IndexedCheckpoint<TestHandler>>,
452        commit_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>,
453        #[allow(unused)]
454        committer: Service,
455    }
456
457    /// Emulates adding a sequential pipeline to the indexer. The next_checkpoint is the checkpoint
458    /// for the indexer to ingest from.
459    fn setup_test(next_checkpoint: u64, config: SequentialConfig, store: MockStore) -> TestSetup {
460        let metrics = IndexerMetrics::new(None, &Registry::default());
461
462        let (checkpoint_tx, checkpoint_rx) = mpsc::channel(10);
463        #[allow(clippy::disallowed_methods)]
464        let (commit_hi_tx, commit_hi_rx) = mpsc::unbounded_channel();
465
466        let store_clone = store.clone();
467        let handler = Arc::new(TestHandler);
468        let committer = committer(
469            handler,
470            config,
471            next_checkpoint,
472            checkpoint_rx,
473            commit_hi_tx,
474            store_clone,
475            metrics,
476        );
477
478        TestSetup {
479            store,
480            checkpoint_tx,
481            commit_hi_rx,
482            committer,
483        }
484    }
485
486    async fn send_checkpoint(setup: &mut TestSetup, checkpoint: u64) {
487        setup
488            .checkpoint_tx
489            .send(create_checkpoint(checkpoint))
490            .await
491            .unwrap();
492    }
493
494    fn create_checkpoint(checkpoint: u64) -> IndexedCheckpoint<TestHandler> {
495        IndexedCheckpoint::new(
496            checkpoint,        // epoch
497            checkpoint,        // checkpoint number
498            checkpoint,        // tx_hi
499            checkpoint * 1000, // timestamp
500            vec![checkpoint],  // values
501        )
502    }
503
504    #[tokio::test]
505    async fn test_committer_processes_sequential_checkpoints() {
506        let config = SequentialConfig {
507            committer: CommitterConfig::default(),
508            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
509        };
510        let mut setup = setup_test(0, config, MockStore::default());
511
512        // Send checkpoints in order
513        for i in 0..3 {
514            send_checkpoint(&mut setup, i).await;
515        }
516
517        // Wait for processing
518        tokio::time::sleep(Duration::from_millis(200)).await;
519
520        // Verify data was written in order
521        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
522
523        // Verify watermark was updated
524        {
525            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
526            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
527            assert_eq!(watermark.tx_hi, 2);
528        }
529
530        // Verify commit_hi was sent to ingestion
531        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
532        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
533        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
534    }
535
536    /// Configure the MockStore with no watermark, and emulate `first_checkpoint` by passing the
537    /// `initial_watermark` into the setup.
538    #[tokio::test]
539    async fn test_committer_processes_sequential_checkpoints_with_initial_watermark() {
540        let config = SequentialConfig::default();
541        let mut setup = setup_test(5, config, MockStore::default());
542
543        // Verify watermark hasn't progressed
544        let watermark = setup.store.watermark(TestHandler::NAME);
545        assert!(watermark.is_none());
546
547        // Send checkpoints in order
548        for i in 0..5 {
549            send_checkpoint(&mut setup, i).await;
550        }
551
552        // Wait for processing
553        tokio::time::sleep(Duration::from_millis(1000)).await;
554
555        // Verify watermark hasn't progressed
556        let watermark = setup.store.watermark(TestHandler::NAME);
557        assert!(watermark.is_none());
558
559        for i in 5..8 {
560            send_checkpoint(&mut setup, i).await;
561        }
562
563        // Wait for processing
564        tokio::time::sleep(Duration::from_millis(1000)).await;
565
566        // Verify data was written in order
567        assert_eq!(setup.store.get_sequential_data(), vec![5, 6, 7]);
568
569        // Verify watermark was updated
570        {
571            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
572            assert_eq!(watermark.checkpoint_hi_inclusive, 7);
573            assert_eq!(watermark.tx_hi, 7);
574        }
575    }
576
577    #[tokio::test]
578    async fn test_committer_processes_out_of_order_checkpoints() {
579        let config = SequentialConfig {
580            committer: CommitterConfig::default(),
581            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
582        };
583        let mut setup = setup_test(0, config, MockStore::default());
584
585        // Send checkpoints out of order
586        for i in [1, 0, 2] {
587            send_checkpoint(&mut setup, i).await;
588        }
589
590        // Wait for processing
591        tokio::time::sleep(Duration::from_millis(200)).await;
592
593        // Verify data was written in order despite receiving out of order
594        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
595
596        // Verify watermark was updated
597        {
598            let watermark = setup.store.watermark(TestHandler::NAME).unwrap();
599            assert_eq!(watermark.checkpoint_hi_inclusive, 2);
600            assert_eq!(watermark.tx_hi, 2);
601        }
602
603        // Verify commit_hi was sent to ingestion
604        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
605        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
606        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
607    }
608
609    #[tokio::test]
610    async fn test_committer_commit_up_to_max_batch_checkpoints() {
611        let config = SequentialConfig {
612            committer: CommitterConfig::default(),
613            checkpoint_lag: 0, // Zero checkpoint lag to process new batch instantly
614        };
615        let mut setup = setup_test(0, config, MockStore::default());
616
617        // Send checkpoints up to MAX_BATCH_CHECKPOINTS
618        for i in 0..4 {
619            send_checkpoint(&mut setup, i).await;
620        }
621
622        // Wait for processing
623        tokio::time::sleep(Duration::from_millis(200)).await;
624
625        // Verify commit_hi values are sent for each batch
626        let commit_hi1 = setup.commit_hi_rx.recv().await.unwrap();
627        assert_eq!(
628            commit_hi1.1, 3,
629            "First commit_hi should be 3 (checkpoint 2 + 1, highest processed of first batch)"
630        );
631
632        let commit_hi2 = setup.commit_hi_rx.recv().await.unwrap();
633        assert_eq!(
634            commit_hi2.1, 4,
635            "Second commit_hi should be 4 (checkpoint 3 + 1, highest processed of second batch)"
636        );
637
638        // Verify data is written in order across batches
639        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
640    }
641
642    #[tokio::test]
643    async fn test_committer_does_not_commit_until_checkpoint_lag() {
644        let config = SequentialConfig {
645            committer: CommitterConfig::default(),
646            checkpoint_lag: 1, // Only commit checkpoints that are at least 1 behind
647        };
648        let mut setup = setup_test(0, config, MockStore::default());
649
650        // Send checkpoints 0-2
651        for i in 0..3 {
652            send_checkpoint(&mut setup, i).await;
653        }
654
655        // Wait for processing
656        tokio::time::sleep(Duration::from_millis(200)).await;
657
658        // Verify only checkpoints 0 and 1 are written (since checkpoint 2 is not lagged enough)
659        assert_eq!(setup.store.get_sequential_data(), vec![0, 1]);
660        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
661        assert_eq!(commit_hi.1, 2, "commit_hi should be 2 (checkpoint 1 + 1)");
662
663        // Send checkpoint 3 to exceed the checkpoint_lag for checkpoint 2
664        send_checkpoint(&mut setup, 3).await;
665
666        // Wait for next polling processing
667        tokio::time::sleep(Duration::from_millis(1000)).await;
668
669        // Verify checkpoint 2 is now written
670        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2]);
671        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
672        assert_eq!(commit_hi.1, 3, "commit_hi should be 3 (checkpoint 2 + 1)");
673    }
674
675    #[tokio::test]
676    async fn test_committer_commits_eagerly() {
677        let config = SequentialConfig {
678            committer: CommitterConfig {
679                collect_interval_ms: 4_000, // Long polling to test eager commit
680                ..Default::default()
681            },
682            checkpoint_lag: 0, // Zero checkpoint lag to not block the eager logic
683        };
684        let mut setup = setup_test(0, config, MockStore::default());
685
686        // Wait for initial poll to be over
687        tokio::time::sleep(Duration::from_millis(200)).await;
688
689        // Send checkpoints 0-2
690        for i in 0..3 {
691            send_checkpoint(&mut setup, i).await;
692        }
693
694        // Verify no checkpoints are written yet (not enough rows for eager commit)
695        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
696
697        // Send checkpoint 3 to trigger the eager commit (3 + 1 >= MIN_EAGER_ROWS)
698        send_checkpoint(&mut setup, 3).await;
699
700        // Wait for processing
701        tokio::time::sleep(Duration::from_millis(200)).await;
702
703        // Verify all checkpoints are written
704        assert_eq!(setup.store.get_sequential_data(), vec![0, 1, 2, 3]);
705    }
706
707    #[tokio::test]
708    async fn test_committer_cannot_commit_eagerly_due_to_checkpoint_lag() {
709        let config = SequentialConfig {
710            committer: CommitterConfig {
711                collect_interval_ms: 4_000, // Long polling to test eager commit
712                ..Default::default()
713            },
714            checkpoint_lag: 4, // High checkpoint lag to block eager commits
715        };
716        let mut setup = setup_test(0, config, MockStore::default());
717
718        // Wait for initial poll to be over
719        tokio::time::sleep(Duration::from_millis(200)).await;
720
721        // Send checkpoints 0-3
722        for i in 0..4 {
723            send_checkpoint(&mut setup, i).await;
724        }
725
726        // Wait for processing
727        tokio::time::sleep(Duration::from_millis(200)).await;
728
729        // Verify no checkpoints are written due to checkpoint lag
730        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
731
732        // Send checkpoint 4 to exceed checkpoint lag
733        send_checkpoint(&mut setup, 4).await;
734
735        // Wait for processing
736        tokio::time::sleep(Duration::from_millis(200)).await;
737
738        // Verify only checkpoint 0 is written (since it's the only one that satisfies checkpoint_lag)
739        assert_eq!(setup.store.get_sequential_data(), vec![0]);
740    }
741
742    #[tokio::test]
743    async fn test_committer_retries_on_transaction_failure() {
744        let config = SequentialConfig {
745            committer: CommitterConfig {
746                collect_interval_ms: 1_000, // Long polling to test retry logic
747                ..Default::default()
748            },
749            checkpoint_lag: 0,
750        };
751
752        // Create store with transaction failure configuration
753        let store = MockStore::default().with_transaction_failures(1); // Will fail once before succeeding
754
755        let mut setup = setup_test(10, config, store);
756
757        // Send a checkpoint
758        send_checkpoint(&mut setup, 10).await;
759
760        // Wait for initial poll to be over
761        tokio::time::sleep(Duration::from_millis(200)).await;
762
763        // Verify no data is written before retries complete
764        assert_eq!(setup.store.get_sequential_data(), Vec::<u64>::new());
765
766        // Wait for retries to complete
767        tokio::time::sleep(Duration::from_millis(1_200)).await;
768
769        // Verify data is written after retries complete on next polling
770        assert_eq!(setup.store.get_sequential_data(), vec![10]);
771
772        // Verify commit_hi is updated
773        let commit_hi = setup.commit_hi_rx.recv().await.unwrap();
774        assert_eq!(commit_hi.0, "test", "Pipeline name should be 'test'");
775        assert_eq!(
776            commit_hi.1, 11,
777            "commit_hi should be 11 (checkpoint 10 + 1)"
778        );
779    }
780}