sui_indexer_alt_framework/pipeline/concurrent/
collector.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use sui_futures::service::Service;
10use tokio::sync::SetOnce;
11use tokio::sync::mpsc;
12use tokio::time::MissedTickBehavior;
13use tokio::time::interval;
14use tracing::debug;
15use tracing::info;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IndexedCheckpoint;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchStatus;
23use crate::pipeline::concurrent::BatchedRows;
24use crate::pipeline::concurrent::Handler;
25
26/// Processed values that are waiting to be written to the database. This is an internal type used
27/// by the concurrent collector to hold data it is waiting to send to the committer.
28struct PendingCheckpoint<H: Handler> {
29    /// Iterator over values to be inserted into the database from this checkpoint
30    values: std::vec::IntoIter<H::Value>,
31    /// The watermark associated with this checkpoint and the part of it that is left to commit
32    watermark: WatermarkPart,
33}
34
35impl<H: Handler> PendingCheckpoint<H> {
36    /// Whether there are values left to commit from this indexed checkpoint.
37    fn is_empty(&self) -> bool {
38        let empty = self.values.len() == 0;
39        debug_assert!(!empty || self.watermark.batch_rows == 0);
40        empty
41    }
42}
43
44impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
45    fn from(indexed: IndexedCheckpoint<H>) -> Self {
46        let total_rows = indexed.values.len();
47        Self {
48            watermark: WatermarkPart {
49                watermark: indexed.watermark,
50                batch_rows: total_rows,
51                total_rows,
52            },
53            values: indexed.values.into_iter(),
54        }
55    }
56}
57
58/// The collector task is responsible for gathering rows into batches which it then sends to a
59/// committer task to write to the database. The task publishes batches in the following
60/// circumstances:
61///
62/// - If `H::BATCH_SIZE` rows are pending, it will immediately schedule a batch to be gathered.
63///
64/// - If after sending one batch there is more data to be sent, it will immediately schedule the
65///   next batch to be gathered (Each batch will contain at most `H::CHUNK_SIZE` rows).
66///
67/// - Otherwise, it will check for any data to write out at a regular interval (controlled by
68///   `config.collect_interval()`).
69///
70/// The `main_reader_lo` tracks the lowest checkpoint that can be committed by this pipeline.
71///
72/// This task will shutdown if any of its channels are closed.
73pub(super) fn collector<H: Handler + 'static>(
74    handler: Arc<H>,
75    config: CommitterConfig,
76    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
77    tx: mpsc::Sender<BatchedRows<H>>,
78    main_reader_lo: Arc<SetOnce<AtomicU64>>,
79    metrics: Arc<IndexerMetrics>,
80    min_eager_rows: usize,
81    max_pending_rows: usize,
82    max_watermark_updates: usize,
83) -> Service {
84    Service::new().spawn_aborting(async move {
85        // The `poll` interval controls the maximum time to wait between collecting batches,
86        // regardless of number of rows pending.
87        let mut poll = interval(config.collect_interval());
88        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
89
90        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
91            &metrics.collected_checkpoint_timestamp_lag,
92            &metrics.latest_collected_checkpoint_timestamp_lag_ms,
93            &metrics.latest_collected_checkpoint,
94        );
95
96        // Data for checkpoints that are ready to be sent but haven't been written yet.
97        let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
98        let mut pending_rows = 0;
99
100        info!(pipeline = H::NAME, "Starting collector");
101
102        // Wait for main_reader_lo to be initialized before processing any checkpoints.
103        let reader_lo_atomic = main_reader_lo.wait().await;
104
105        loop {
106            // === IDLE: block until timer fires or enough data accumulates ===
107            tokio::select! {
108                biased;
109
110                // docs::#collector (see docs/content/guides/developer/advanced/custom-indexer.mdx)
111                Some(mut indexed) = rx.recv(), if pending_rows < max_pending_rows => {
112                    let reader_lo = reader_lo_atomic.load(Ordering::Relaxed);
113
114                    metrics
115                        .collector_reader_lo
116                        .with_label_values(&[H::NAME])
117                        .set(reader_lo as i64);
118
119                    let mut recv_cps = 0usize;
120                    let mut recv_rows = 0usize;
121                    loop {
122                        if indexed.checkpoint() < reader_lo {
123                            indexed.values.clear();
124                            metrics
125                                .total_collector_skipped_checkpoints
126                                .with_label_values(&[H::NAME])
127                                .inc();
128                        }
129
130                        recv_cps += 1;
131                        recv_rows += indexed.len();
132                        pending_rows += indexed.len();
133                        pending.insert(indexed.checkpoint(), indexed.into());
134
135                        if pending_rows >= max_pending_rows {
136                            break;
137                        }
138
139                        match rx.try_recv() {
140                            Ok(next) => indexed = next,
141                            Err(_) => break,
142                        }
143                    }
144
145                    metrics
146                        .total_collector_rows_received
147                        .with_label_values(&[H::NAME])
148                        .inc_by(recv_rows as u64);
149                    metrics
150                        .total_collector_checkpoints_received
151                        .with_label_values(&[H::NAME])
152                        .inc_by(recv_cps as u64);
153
154                    if pending_rows < min_eager_rows {
155                        continue;
156                    }
157                }
158                // docs::/#collector
159
160                // Timer: always flush (even if empty, for watermark progress)
161                _ = poll.tick() => {}
162            }
163
164            // === FLUSHING: send batches until pending is drained ===
165            //
166            // Always executes at least once — on timer ticks this sends an empty
167            // heartbeat batch so watermarks make progress.
168            loop {
169                let guard = metrics
170                    .collector_gather_latency
171                    .with_label_values(&[H::NAME])
172                    .start_timer();
173
174                let mut batch = H::Batch::default();
175                let mut watermark = Vec::new();
176                let mut batch_len = 0;
177
178                loop {
179                    let Some(mut entry) = pending.first_entry() else {
180                        break;
181                    };
182
183                    if watermark.len() >= max_watermark_updates {
184                        break;
185                    }
186
187                    let indexed = entry.get_mut();
188                    let before = indexed.values.len();
189                    let status = handler.batch(&mut batch, &mut indexed.values);
190                    let taken = before - indexed.values.len();
191
192                    batch_len += taken;
193                    watermark.push(indexed.watermark.take(taken));
194                    if indexed.is_empty() {
195                        checkpoint_lag_reporter.report_lag(
196                            indexed.watermark.checkpoint(),
197                            indexed.watermark.timestamp_ms(),
198                        );
199                        entry.remove();
200                    }
201
202                    if status == BatchStatus::Ready {
203                        break;
204                    }
205                }
206
207                let elapsed = guard.stop_and_record();
208                debug!(
209                    pipeline = H::NAME,
210                    elapsed_ms = elapsed * 1000.0,
211                    rows = batch_len,
212                    "Gathered batch",
213                );
214
215                metrics
216                    .total_collector_batches_created
217                    .with_label_values(&[H::NAME])
218                    .inc();
219
220                metrics
221                    .collector_batch_size
222                    .with_label_values(&[H::NAME])
223                    .observe(batch_len as f64);
224
225                pending_rows -= batch_len;
226
227                let batched_rows = BatchedRows {
228                    batch,
229                    batch_len,
230                    watermark,
231                };
232                if tx.send(batched_rows).await.is_err() {
233                    info!(
234                        pipeline = H::NAME,
235                        "Committer closed channel, stopping collector"
236                    );
237                    return Ok(());
238                }
239
240                if pending.is_empty() {
241                    break;
242                }
243            }
244
245            if rx.is_closed() && rx.is_empty() && pending_rows == 0 {
246                info!(
247                    pipeline = H::NAME,
248                    "Processor closed channel, pending rows empty, stopping collector",
249                );
250                break;
251            }
252        }
253
254        Ok(())
255    })
256}
257
258#[cfg(test)]
259mod tests {
260    use std::time::Duration;
261
262    use async_trait::async_trait;
263    use sui_pg_db::Connection;
264    use sui_pg_db::Db;
265    use tokio::sync::mpsc;
266
267    use crate::metrics::tests::test_metrics;
268    use crate::pipeline::Processor;
269    use crate::pipeline::concurrent::BatchStatus;
270    use crate::types::full_checkpoint_content::Checkpoint;
271
272    use super::*;
273
274    #[derive(Clone)]
275    struct Entry;
276
277    struct TestHandler;
278
279    // Max chunk rows for testing - simulates postgres bind parameter limit
280    const TEST_MAX_CHUNK_ROWS: usize = 1024;
281
282    #[async_trait]
283    impl Processor for TestHandler {
284        type Value = Entry;
285        const NAME: &'static str = "test_handler";
286
287        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
288            Ok(vec![])
289        }
290    }
291
292    #[async_trait]
293    impl Handler for TestHandler {
294        type Store = Db;
295        type Batch = Vec<Entry>;
296
297        const MIN_EAGER_ROWS: usize = 10;
298        const MAX_PENDING_ROWS: usize = 10000;
299
300        fn batch(
301            &self,
302            batch: &mut Self::Batch,
303            values: &mut std::vec::IntoIter<Self::Value>,
304        ) -> BatchStatus {
305            // Simulate batch size limit
306            let remaining_capacity = TEST_MAX_CHUNK_ROWS.saturating_sub(batch.len());
307            let to_take = remaining_capacity.min(values.len());
308            batch.extend(values.take(to_take));
309
310            if batch.len() >= TEST_MAX_CHUNK_ROWS {
311                BatchStatus::Ready
312            } else {
313                BatchStatus::Pending
314            }
315        }
316
317        async fn commit<'a>(
318            &self,
319            _batch: &Self::Batch,
320            _conn: &mut Connection<'a>,
321        ) -> anyhow::Result<usize> {
322            tokio::time::sleep(Duration::from_millis(1000)).await;
323            Ok(0)
324        }
325    }
326
327    /// Wait for a timeout on the channel, expecting this operation to timeout.
328    async fn expect_timeout<H: Handler + 'static>(
329        rx: &mut mpsc::Receiver<BatchedRows<H>>,
330        duration: Duration,
331    ) {
332        match tokio::time::timeout(duration, rx.recv()).await {
333            Err(_) => (), // Expected timeout - test passes
334            Ok(_) => panic!("Expected timeout but received data instead"),
335        }
336    }
337
338    /// Receive from the channel with a given timeout, panicking if the timeout is reached or the
339    /// channel is closed.
340    async fn recv_with_timeout<H: Handler + 'static>(
341        rx: &mut mpsc::Receiver<BatchedRows<H>>,
342        timeout: Duration,
343    ) -> BatchedRows<H> {
344        match tokio::time::timeout(timeout, rx.recv()).await {
345            Ok(Some(batch)) => batch,
346            Ok(None) => panic!("Collector channel was closed unexpectedly"),
347            Err(_) => panic!("Test timed out waiting for batch from collector"),
348        }
349    }
350
351    #[tokio::test]
352    async fn test_collector_batches_data() {
353        let (processor_tx, processor_rx) = mpsc::channel(10);
354        let (collector_tx, mut collector_rx) = mpsc::channel(10);
355        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
356
357        let handler = Arc::new(TestHandler);
358        let _collector = collector::<TestHandler>(
359            handler,
360            CommitterConfig::default(),
361            processor_rx,
362            collector_tx,
363            main_reader_lo.clone(),
364            test_metrics(),
365            TestHandler::MIN_EAGER_ROWS,
366            TestHandler::MAX_PENDING_ROWS,
367            TestHandler::MAX_WATERMARK_UPDATES,
368        );
369
370        let part1_length = TEST_MAX_CHUNK_ROWS / 2;
371        let part2_length = TEST_MAX_CHUNK_ROWS - part1_length - 1;
372
373        // Send test data
374        let test_data = vec![
375            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]),
376            IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]),
377            IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]),
378        ];
379
380        for data in test_data {
381            processor_tx.send(data).await.unwrap();
382        }
383
384        let batch1 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
385        assert_eq!(batch1.batch_len, TEST_MAX_CHUNK_ROWS);
386
387        let batch2 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
388        assert_eq!(batch2.batch_len, 1);
389    }
390
391    #[tokio::test]
392    async fn test_collector_shutdown() {
393        let (processor_tx, processor_rx) = mpsc::channel(10);
394        let (collector_tx, mut collector_rx) = mpsc::channel(10);
395        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
396
397        let handler = Arc::new(TestHandler);
398        let mut collector = collector::<TestHandler>(
399            handler,
400            CommitterConfig::default(),
401            processor_rx,
402            collector_tx,
403            main_reader_lo,
404            test_metrics(),
405            TestHandler::MIN_EAGER_ROWS,
406            TestHandler::MAX_PENDING_ROWS,
407            TestHandler::MAX_WATERMARK_UPDATES,
408        );
409
410        processor_tx
411            .send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry]))
412            .await
413            .unwrap();
414
415        tokio::time::sleep(Duration::from_millis(200)).await;
416
417        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
418        assert_eq!(batch.batch_len, 2);
419
420        // Drop processor sender to simulate shutdown
421        drop(processor_tx);
422
423        // After a short delay, collector should shut down
424        tokio::time::timeout(Duration::from_millis(500), collector.join())
425            .await
426            .expect("collector shutdown timeout")
427            .expect("collector shutdown failed");
428    }
429
430    #[tokio::test]
431    async fn test_collector_respects_max_pending() {
432        let processor_channel_size = 5; // unit is checkpoint
433        let collector_channel_size = 2; // unit is batch, aka rows / MAX_CHUNK_ROWS
434        let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
435        let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);
436        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
437
438        let metrics = test_metrics();
439
440        let handler = Arc::new(TestHandler);
441        let _collector = collector::<TestHandler>(
442            handler,
443            CommitterConfig::default(),
444            processor_rx,
445            collector_tx,
446            main_reader_lo.clone(),
447            metrics.clone(),
448            TestHandler::MIN_EAGER_ROWS,
449            TestHandler::MAX_PENDING_ROWS,
450            TestHandler::MAX_WATERMARK_UPDATES,
451        );
452
453        // Send more data than MAX_PENDING_ROWS plus collector channel buffer
454        let data = IndexedCheckpoint::new(
455            0,
456            1,
457            10,
458            1000,
459            vec![
460                Entry;
461                // Decreasing this number by even 1 would make the test fail.
462                TestHandler::MAX_PENDING_ROWS
463                    + TEST_MAX_CHUNK_ROWS * collector_channel_size
464            ],
465        );
466        processor_tx.send(data).await.unwrap();
467
468        tokio::time::sleep(Duration::from_millis(200)).await;
469
470        // Now fill up the processor channel with minimum data to trigger send blocking
471        for _ in 0..processor_channel_size {
472            let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]);
473            processor_tx.send(more_data).await.unwrap();
474        }
475
476        // Now sending even more data should block because of MAX_PENDING_ROWS limit.
477        let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]);
478
479        let send_result = processor_tx.try_send(even_more_data);
480        assert!(matches!(
481            send_result,
482            Err(mpsc::error::TrySendError::Full(_))
483        ));
484    }
485
486    #[tokio::test]
487    async fn test_collector_accumulates_across_checkpoints_until_eager_threshold() {
488        let (processor_tx, processor_rx) = mpsc::channel(10);
489        let (collector_tx, mut collector_rx) = mpsc::channel(10);
490        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
491
492        // Set a very long collect interval (60 seconds) to ensure timing doesn't trigger batching
493        let config = CommitterConfig {
494            collect_interval_ms: 60_000,
495            ..CommitterConfig::default()
496        };
497        let handler = Arc::new(TestHandler);
498        let _collector = collector::<TestHandler>(
499            handler,
500            config,
501            processor_rx,
502            collector_tx,
503            main_reader_lo.clone(),
504            test_metrics(),
505            TestHandler::MIN_EAGER_ROWS,
506            TestHandler::MAX_PENDING_ROWS,
507            TestHandler::MAX_WATERMARK_UPDATES,
508        );
509
510        let start_time = std::time::Instant::now();
511
512        // The collector starts with an immediate poll tick, creating an empty batch
513        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
514        assert_eq!(initial_batch.batch_len, 0);
515
516        // Send data that's just below MIN_EAGER_ROWS threshold.
517        let below_threshold =
518            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
519        processor_tx.send(below_threshold).await.unwrap();
520
521        // Try to receive with timeout - should timeout since we're below threshold
522        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
523
524        // Now send one more entry to cross the MIN_EAGER_ROWS threshold
525        let threshold_trigger = IndexedCheckpoint::new(
526            0,
527            2,
528            20,
529            2000,
530            vec![Entry; 1], // Just 1 more entry to reach 10 total
531        );
532        processor_tx.send(threshold_trigger).await.unwrap();
533
534        // Should immediately get a batch without waiting for the long interval
535        let eager_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
536        assert_eq!(eager_batch.batch_len, TestHandler::MIN_EAGER_ROWS);
537
538        // Verify batch was created quickly (much less than 60 seconds)
539        let elapsed = start_time.elapsed();
540        assert!(elapsed < Duration::from_secs(10));
541    }
542
543    #[tokio::test]
544    async fn test_immediate_batch_on_min_eager_rows() {
545        let (processor_tx, processor_rx) = mpsc::channel(10);
546        let (collector_tx, mut collector_rx) = mpsc::channel(10);
547        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
548
549        // Set a very long collect interval (60 seconds) to ensure timing doesn't trigger batching
550        let config = CommitterConfig {
551            collect_interval_ms: 60_000,
552            ..CommitterConfig::default()
553        };
554        let handler = Arc::new(TestHandler);
555        let _collector = collector::<TestHandler>(
556            handler,
557            config,
558            processor_rx,
559            collector_tx,
560            main_reader_lo.clone(),
561            test_metrics(),
562            TestHandler::MIN_EAGER_ROWS,
563            TestHandler::MAX_PENDING_ROWS,
564            TestHandler::MAX_WATERMARK_UPDATES,
565        );
566
567        // The collector starts with an immediate poll tick, creating an empty batch
568        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
569        assert_eq!(initial_batch.batch_len, 0);
570        // The collector will then just wait for the next poll as there is no new data yet.
571        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
572
573        let start_time = std::time::Instant::now();
574
575        // Send exactly MIN_EAGER_ROWS in one checkpoint
576        let exact_threshold =
577            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS]);
578        processor_tx.send(exact_threshold).await.unwrap();
579
580        // Should trigger immediately since pending_rows >= MIN_EAGER_ROWS.
581        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
582        assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS);
583
584        // Verify batch was created quickly (much less than 60 seconds)
585        let elapsed = start_time.elapsed();
586        assert!(elapsed < Duration::from_secs(10));
587    }
588
589    #[tokio::test]
590    async fn test_collector_waits_for_timer_when_below_eager_threshold() {
591        let (processor_tx, processor_rx) = mpsc::channel(10);
592        let (collector_tx, mut collector_rx) = mpsc::channel(10);
593        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
594
595        // Set a reasonable collect interval for this test (3 seconds).
596        let config = CommitterConfig {
597            collect_interval_ms: 3000,
598            ..CommitterConfig::default()
599        };
600        let handler = Arc::new(TestHandler);
601        let _collector = collector::<TestHandler>(
602            handler,
603            config,
604            processor_rx,
605            collector_tx,
606            main_reader_lo.clone(),
607            test_metrics(),
608            TestHandler::MIN_EAGER_ROWS,
609            TestHandler::MAX_PENDING_ROWS,
610            TestHandler::MAX_WATERMARK_UPDATES,
611        );
612
613        // Consume initial empty batch
614        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
615        assert_eq!(initial_batch.batch_len, 0);
616
617        // Send MIN_EAGER_ROWS - 1 entries (below threshold)
618        let below_threshold =
619            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
620        processor_tx.send(below_threshold).await.unwrap();
621
622        // Try to receive with timeout - should timeout since we're below threshold
623        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
624
625        // Should eventually get batch when timer triggers
626        let timer_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(4)).await;
627        assert_eq!(timer_batch.batch_len, TestHandler::MIN_EAGER_ROWS - 1);
628    }
629
630    /// The collector must wait for `main_reader_lo` to be initialized before attempting to prepare
631    /// checkpoints for commit.
632    #[tokio::test(start_paused = true)]
633    async fn test_collector_waits_for_main_reader_lo_init() {
634        let (processor_tx, processor_rx) = mpsc::channel(10);
635        let (collector_tx, mut collector_rx) = mpsc::channel(10);
636        let main_reader_lo = Arc::new(SetOnce::new());
637
638        let handler = Arc::new(TestHandler);
639        let collector = collector(
640            handler,
641            CommitterConfig {
642                // Collect interval longer than time to advance to ensure timing doesn't trigger
643                // batching.
644                collect_interval_ms: 200_000,
645                ..CommitterConfig::default()
646            },
647            processor_rx,
648            collector_tx,
649            main_reader_lo.clone(),
650            test_metrics(),
651            TestHandler::MIN_EAGER_ROWS,
652            TestHandler::MAX_PENDING_ROWS,
653            TestHandler::MAX_WATERMARK_UPDATES,
654        );
655
656        // Send enough data to trigger batching.
657        let test_data =
658            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS + 1]);
659        processor_tx.send(test_data).await.unwrap();
660
661        // Advance time significantly - collector should still be blocked waiting for
662        // main_reader_lo.
663        tokio::time::advance(Duration::from_secs(100)).await;
664
665        assert!(collector_rx.try_recv().is_err());
666
667        // Now initialize the main reader lo to 0, unblocking the collector.
668        main_reader_lo.set(AtomicU64::new(0)).ok();
669
670        tokio::time::advance(Duration::from_secs(1)).await;
671
672        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
673
674        assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS + 1);
675
676        collector.shutdown().await.unwrap();
677    }
678
679    /// When receiving checkpoints, if they are below the main reader lo, they should be dropped
680    /// immediately.
681    #[tokio::test]
682    async fn test_collector_drops_checkpoints_immediately_if_le_main_reader_lo() {
683        let (processor_tx, processor_rx) = mpsc::channel(10);
684        let (collector_tx, mut collector_rx) = mpsc::channel(10);
685        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(5))));
686        let metrics = test_metrics();
687
688        let collector = collector(
689            Arc::new(TestHandler),
690            CommitterConfig {
691                // Collect interval longer than time to advance to ensure timing doesn't trigger
692                // batching.
693                collect_interval_ms: 200_000,
694                ..CommitterConfig::default()
695            },
696            processor_rx,
697            collector_tx,
698            main_reader_lo.clone(),
699            metrics.clone(),
700            TestHandler::MIN_EAGER_ROWS,
701            TestHandler::MAX_PENDING_ROWS,
702            TestHandler::MAX_WATERMARK_UPDATES,
703        );
704
705        let eager_rows_plus_one = TestHandler::MIN_EAGER_ROWS + 1;
706
707        let test_data: Vec<_> = [1, 5, 2, 6, 4, 3]
708            .into_iter()
709            .map(|cp| IndexedCheckpoint::new(0, cp, 10, 1000, vec![Entry; eager_rows_plus_one]))
710            .collect();
711        for data in test_data {
712            processor_tx.send(data).await.unwrap();
713        }
714        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
715
716        // Make sure that we are advancing watermarks.
717        assert_eq!(batch.watermark.len(), 6);
718        // And reporting the checkpoints as received.
719        assert_eq!(
720            metrics
721                .total_collector_checkpoints_received
722                .with_label_values(&[TestHandler::NAME])
723                .get(),
724            6
725        );
726        // But the collector should filter out four checkpoints: (1, 2, 3, 4)
727        assert_eq!(
728            metrics
729                .total_collector_skipped_checkpoints
730                .with_label_values(&[TestHandler::NAME])
731                .get(),
732            4
733        );
734        // And that we only have values from two checkpoints (5, 6)
735        assert_eq!(batch.batch_len, eager_rows_plus_one * 2);
736
737        collector.shutdown().await.unwrap();
738    }
739
740    /// Because a checkpoint may be partially batched before the main reader lo advances past it,
741    /// the collector must ensure that it fully writes out the checkpoint. Otherwise, this will
742    /// essentially stall the commit_watermark task indefinitely as the latter waits for the
743    /// remaining checkpoint parts.
744    #[tokio::test(start_paused = true)]
745    async fn test_collector_only_filters_whole_checkpoints() {
746        let (processor_tx, processor_rx) = mpsc::channel(10);
747        let (collector_tx, mut collector_rx) = mpsc::channel(10);
748        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
749
750        let metrics = test_metrics();
751
752        let collector = collector(
753            Arc::new(TestHandler),
754            CommitterConfig::default(),
755            processor_rx,
756            collector_tx,
757            main_reader_lo.clone(),
758            metrics.clone(),
759            TestHandler::MIN_EAGER_ROWS,
760            TestHandler::MAX_PENDING_ROWS,
761            TestHandler::MAX_WATERMARK_UPDATES,
762        );
763
764        let more_than_max_chunk_rows = TEST_MAX_CHUNK_ROWS + 10;
765
766        let test_data =
767            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; more_than_max_chunk_rows]);
768        processor_tx.send(test_data).await.unwrap();
769        tokio::time::advance(Duration::from_secs(1)).await;
770        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
771
772        // There are still 10 rows left to be sent in the next batch.
773        assert_eq!(batch.batch_len, TEST_MAX_CHUNK_ROWS);
774
775        // Send indexed checkpoints 2 through 5 inclusive, but also bump the main reader lo to 4.
776        let test_data: Vec<_> = (2..=5)
777            .map(|cp| {
778                IndexedCheckpoint::new(
779                    0,
780                    cp,
781                    10,
782                    1000,
783                    vec![Entry; TestHandler::MIN_EAGER_ROWS + 1],
784                )
785            })
786            .collect();
787        for data in test_data {
788            processor_tx.send(data).await.unwrap();
789        }
790        let atomic = main_reader_lo.get().unwrap();
791        atomic.store(4, Ordering::Relaxed);
792        tokio::time::advance(Duration::from_secs(10)).await;
793
794        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
795
796        // The next batch should still be the remaining 10 rows from checkpoint 1.
797        assert_eq!(batch.batch_len, 10);
798        assert_eq!(batch.watermark[0].watermark.checkpoint_hi_inclusive, 1);
799
800        recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
801
802        assert_eq!(
803            metrics
804                .total_collector_skipped_checkpoints
805                .with_label_values(&[TestHandler::NAME])
806                .get(),
807            2
808        );
809        assert_eq!(
810            metrics
811                .total_collector_checkpoints_received
812                .with_label_values(&[TestHandler::NAME])
813                .get(),
814            5
815        );
816
817        collector.shutdown().await.unwrap();
818    }
819}