sui_indexer_alt_framework/pipeline/concurrent/
collector.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::BTreeMap,
6    sync::{
7        Arc,
8        atomic::{AtomicU64, Ordering},
9    },
10};
11
12use sui_futures::service::Service;
13use tokio::{
14    sync::{SetOnce, mpsc},
15    time::{MissedTickBehavior, interval},
16};
17use tracing::{debug, info};
18
19use crate::{
20    metrics::{CheckpointLagMetricReporter, IndexerMetrics},
21    pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart},
22};
23
24use super::{BatchStatus, BatchedRows, Handler};
25
26/// Processed values that are waiting to be written to the database. This is an internal type used
27/// by the concurrent collector to hold data it is waiting to send to the committer.
28struct PendingCheckpoint<H: Handler> {
29    /// Iterator over values to be inserted into the database from this checkpoint
30    values: std::vec::IntoIter<H::Value>,
31    /// The watermark associated with this checkpoint and the part of it that is left to commit
32    watermark: WatermarkPart,
33}
34
35impl<H: Handler> PendingCheckpoint<H> {
36    /// Whether there are values left to commit from this indexed checkpoint.
37    fn is_empty(&self) -> bool {
38        let empty = self.values.len() == 0;
39        debug_assert!(!empty || self.watermark.batch_rows == 0);
40        empty
41    }
42}
43
44impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
45    fn from(indexed: IndexedCheckpoint<H>) -> Self {
46        let total_rows = indexed.values.len();
47        Self {
48            watermark: WatermarkPart {
49                watermark: indexed.watermark,
50                batch_rows: total_rows,
51                total_rows,
52            },
53            values: indexed.values.into_iter(),
54        }
55    }
56}
57
58/// The collector task is responsible for gathering rows into batches which it then sends to a
59/// committer task to write to the database. The task publishes batches in the following
60/// circumstances:
61///
62/// - If `H::BATCH_SIZE` rows are pending, it will immediately schedule a batch to be gathered.
63///
64/// - If after sending one batch there is more data to be sent, it will immediately schedule the
65///   next batch to be gathered (Each batch will contain at most `H::CHUNK_SIZE` rows).
66///
67/// - Otherwise, it will check for any data to write out at a regular interval (controlled by
68///   `config.collect_interval()`).
69///
70/// The `main_reader_lo` tracks the lowest checkpoint that can be committed by this pipeline.
71///
72/// This task will shutdown if any of its channels are closed.
73pub(super) fn collector<H: Handler + 'static>(
74    handler: Arc<H>,
75    config: CommitterConfig,
76    mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
77    tx: mpsc::Sender<BatchedRows<H>>,
78    main_reader_lo: Arc<SetOnce<AtomicU64>>,
79    metrics: Arc<IndexerMetrics>,
80) -> Service {
81    Service::new().spawn_aborting(async move {
82        // The `poll` interval controls the maximum time to wait between collecting batches,
83        // regardless of number of rows pending.
84        let mut poll = interval(config.collect_interval());
85        poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
86
87        let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
88            &metrics.collected_checkpoint_timestamp_lag,
89            &metrics.latest_collected_checkpoint_timestamp_lag_ms,
90            &metrics.latest_collected_checkpoint,
91        );
92
93        // Data for checkpoints that are ready to be sent but haven't been written yet.
94        let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
95        let mut pending_rows = 0;
96
97        info!(pipeline = H::NAME, "Starting collector");
98
99        loop {
100            tokio::select! {
101                // Time to create another batch and push it to the committer.
102                _ = poll.tick() => {
103                    let guard = metrics
104                        .collector_gather_latency
105                        .with_label_values(&[H::NAME])
106                        .start_timer();
107
108                    let mut batch = H::Batch::default();
109                    let mut watermark = Vec::new();
110                    let mut batch_len = 0;
111
112                    loop {
113                        let Some(mut entry) = pending.first_entry() else {
114                            break;
115                        };
116
117                        if watermark.len() >= H::MAX_WATERMARK_UPDATES {
118                            break;
119                        }
120
121                        let indexed = entry.get_mut();
122                        let before = indexed.values.len();
123                        let status = handler.batch(&mut batch, &mut indexed.values);
124                        let taken = before - indexed.values.len();
125
126                        batch_len += taken;
127                        watermark.push(indexed.watermark.take(taken));
128                        if indexed.is_empty() {
129                            checkpoint_lag_reporter.report_lag(
130                                indexed.watermark.checkpoint(),
131                                indexed.watermark.timestamp_ms(),
132                            );
133                            entry.remove();
134                        }
135
136                        if status == BatchStatus::Ready {
137                            // Batch is full, send it
138                            break;
139                        }
140                    }
141                    pending_rows -= batch_len;
142                    let elapsed = guard.stop_and_record();
143                    debug!(
144                        pipeline = H::NAME,
145                        elapsed_ms = elapsed * 1000.0,
146                        rows = batch_len,
147                        pending_rows = pending_rows,
148                        "Gathered batch",
149                    );
150
151                    metrics
152                        .total_collector_batches_created
153                        .with_label_values(&[H::NAME])
154                        .inc();
155
156                    metrics
157                        .collector_batch_size
158                        .with_label_values(&[H::NAME])
159                        .observe(batch_len as f64);
160
161                    let batched_rows = BatchedRows {
162                        batch,
163                        batch_len,
164                        watermark,
165                    };
166
167                    if tx.send(batched_rows).await.is_err() {
168                        info!(pipeline = H::NAME, "Committer closed channel, stopping collector");
169                        break;
170                    }
171
172                    if pending_rows > 0 {
173                        poll.reset_immediately();
174                    } else if rx.is_closed() && rx.is_empty() {
175                        info!(
176                            pipeline = H::NAME,
177                            "Processor closed channel, pending rows empty, stopping collector",
178                        );
179                        break;
180                    }
181                }
182
183                // docs::#collector (see docs/content/guides/developer/advanced/custom-indexer.mdx)
184                Some(mut indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_ROWS => {
185                    // Clear the values of outdated checkpoints, so that we don't commit data to the
186                    // store, but can still advance watermarks.
187                    let reader_lo = main_reader_lo.wait().await.load(Ordering::Relaxed);
188                    if indexed.checkpoint() < reader_lo {
189                        indexed.values.clear();
190                        metrics.total_collector_skipped_checkpoints
191                            .with_label_values(&[H::NAME])
192                            .inc();
193                    }
194
195                    metrics
196                        .total_collector_rows_received
197                        .with_label_values(&[H::NAME])
198                        .inc_by(indexed.len() as u64);
199                    metrics
200                        .total_collector_checkpoints_received
201                        .with_label_values(&[H::NAME])
202                        .inc();
203                    metrics
204                        .collector_reader_lo
205                        .with_label_values(&[H::NAME])
206                        .set(reader_lo as i64);
207
208                    pending_rows += indexed.len();
209                    pending.insert(indexed.checkpoint(), indexed.into());
210
211                    if pending_rows >= H::MIN_EAGER_ROWS {
212                        poll.reset_immediately()
213                    }
214                }
215                // docs::/#collector
216            }
217        }
218
219        Ok(())
220    })
221}
222
223#[cfg(test)]
224mod tests {
225    use std::time::Duration;
226
227    use async_trait::async_trait;
228    use sui_pg_db::{Connection, Db};
229    use tokio::sync::mpsc;
230
231    use crate::{
232        metrics::tests::test_metrics,
233        pipeline::{Processor, concurrent::BatchStatus},
234        types::full_checkpoint_content::Checkpoint,
235    };
236
237    use super::*;
238
239    #[derive(Clone)]
240    struct Entry;
241
242    struct TestHandler;
243
244    // Max chunk rows for testing - simulates postgres bind parameter limit
245    const TEST_MAX_CHUNK_ROWS: usize = 1024;
246
247    #[async_trait]
248    impl Processor for TestHandler {
249        type Value = Entry;
250        const NAME: &'static str = "test_handler";
251        const FANOUT: usize = 1;
252
253        async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
254            Ok(vec![])
255        }
256    }
257
258    #[async_trait]
259    impl Handler for TestHandler {
260        type Store = Db;
261        type Batch = Vec<Entry>;
262
263        const MIN_EAGER_ROWS: usize = 10;
264        const MAX_PENDING_ROWS: usize = 10000;
265
266        fn batch(
267            &self,
268            batch: &mut Self::Batch,
269            values: &mut std::vec::IntoIter<Self::Value>,
270        ) -> BatchStatus {
271            // Simulate batch size limit
272            let remaining_capacity = TEST_MAX_CHUNK_ROWS.saturating_sub(batch.len());
273            let to_take = remaining_capacity.min(values.len());
274            batch.extend(values.take(to_take));
275
276            if batch.len() >= TEST_MAX_CHUNK_ROWS {
277                BatchStatus::Ready
278            } else {
279                BatchStatus::Pending
280            }
281        }
282
283        async fn commit<'a>(
284            &self,
285            _batch: &Self::Batch,
286            _conn: &mut Connection<'a>,
287        ) -> anyhow::Result<usize> {
288            tokio::time::sleep(Duration::from_millis(1000)).await;
289            Ok(0)
290        }
291    }
292
293    /// Wait for a timeout on the channel, expecting this operation to timeout.
294    async fn expect_timeout<H: Handler + 'static>(
295        rx: &mut mpsc::Receiver<BatchedRows<H>>,
296        duration: Duration,
297    ) {
298        match tokio::time::timeout(duration, rx.recv()).await {
299            Err(_) => (), // Expected timeout - test passes
300            Ok(_) => panic!("Expected timeout but received data instead"),
301        }
302    }
303
304    /// Receive from the channel with a given timeout, panicking if the timeout is reached or the
305    /// channel is closed.
306    async fn recv_with_timeout<H: Handler + 'static>(
307        rx: &mut mpsc::Receiver<BatchedRows<H>>,
308        timeout: Duration,
309    ) -> BatchedRows<H> {
310        match tokio::time::timeout(timeout, rx.recv()).await {
311            Ok(Some(batch)) => batch,
312            Ok(None) => panic!("Collector channel was closed unexpectedly"),
313            Err(_) => panic!("Test timed out waiting for batch from collector"),
314        }
315    }
316
317    #[tokio::test]
318    async fn test_collector_batches_data() {
319        let (processor_tx, processor_rx) = mpsc::channel(10);
320        let (collector_tx, mut collector_rx) = mpsc::channel(10);
321        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
322
323        let handler = Arc::new(TestHandler);
324        let _collector = collector::<TestHandler>(
325            handler,
326            CommitterConfig::default(),
327            processor_rx,
328            collector_tx,
329            main_reader_lo.clone(),
330            test_metrics(),
331        );
332
333        let part1_length = TEST_MAX_CHUNK_ROWS / 2;
334        let part2_length = TEST_MAX_CHUNK_ROWS - part1_length - 1;
335
336        // Send test data
337        let test_data = vec![
338            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]),
339            IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]),
340            IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]),
341        ];
342
343        for data in test_data {
344            processor_tx.send(data).await.unwrap();
345        }
346
347        let batch1 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
348        assert_eq!(batch1.batch_len, TEST_MAX_CHUNK_ROWS);
349
350        let batch2 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
351        assert_eq!(batch2.batch_len, 1);
352
353        let batch3 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
354        assert_eq!(batch3.batch_len, 0);
355    }
356
357    #[tokio::test]
358    async fn test_collector_shutdown() {
359        let (processor_tx, processor_rx) = mpsc::channel(10);
360        let (collector_tx, mut collector_rx) = mpsc::channel(10);
361        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
362
363        let handler = Arc::new(TestHandler);
364        let mut collector = collector::<TestHandler>(
365            handler,
366            CommitterConfig::default(),
367            processor_rx,
368            collector_tx,
369            main_reader_lo,
370            test_metrics(),
371        );
372
373        processor_tx
374            .send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry]))
375            .await
376            .unwrap();
377
378        tokio::time::sleep(Duration::from_millis(200)).await;
379
380        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
381        assert_eq!(batch.batch_len, 2);
382
383        // Drop processor sender to simulate shutdown
384        drop(processor_tx);
385
386        // After a short delay, collector should shut down
387        tokio::time::timeout(Duration::from_millis(500), collector.join())
388            .await
389            .expect("collector shutdown timeout")
390            .expect("collector shutdown failed");
391    }
392
393    #[tokio::test]
394    async fn test_collector_respects_max_pending() {
395        let processor_channel_size = 5; // unit is checkpoint
396        let collector_channel_size = 2; // unit is batch, aka rows / MAX_CHUNK_ROWS
397        let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
398        let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);
399        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
400
401        let metrics = test_metrics();
402
403        let handler = Arc::new(TestHandler);
404        let _collector = collector::<TestHandler>(
405            handler,
406            CommitterConfig::default(),
407            processor_rx,
408            collector_tx,
409            main_reader_lo.clone(),
410            metrics.clone(),
411        );
412
413        // Send more data than MAX_PENDING_ROWS plus collector channel buffer
414        let data = IndexedCheckpoint::new(
415            0,
416            1,
417            10,
418            1000,
419            vec![
420                Entry;
421                // Decreasing this number by even 1 would make the test fail.
422                TestHandler::MAX_PENDING_ROWS
423                    + TEST_MAX_CHUNK_ROWS * collector_channel_size
424            ],
425        );
426        processor_tx.send(data).await.unwrap();
427
428        tokio::time::sleep(Duration::from_millis(200)).await;
429
430        // Now fill up the processor channel with minimum data to trigger send blocking
431        for _ in 0..processor_channel_size {
432            let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]);
433            processor_tx.send(more_data).await.unwrap();
434        }
435
436        // Now sending even more data should block because of MAX_PENDING_ROWS limit.
437        let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]);
438
439        let send_result = processor_tx.try_send(even_more_data);
440        assert!(matches!(
441            send_result,
442            Err(mpsc::error::TrySendError::Full(_))
443        ));
444    }
445
446    #[tokio::test]
447    async fn test_collector_accumulates_across_checkpoints_until_eager_threshold() {
448        let (processor_tx, processor_rx) = mpsc::channel(10);
449        let (collector_tx, mut collector_rx) = mpsc::channel(10);
450        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
451
452        // Set a very long collect interval (60 seconds) to ensure timing doesn't trigger batching
453        let config = CommitterConfig {
454            collect_interval_ms: 60_000,
455            ..CommitterConfig::default()
456        };
457        let handler = Arc::new(TestHandler);
458        let _collector = collector::<TestHandler>(
459            handler,
460            config,
461            processor_rx,
462            collector_tx,
463            main_reader_lo.clone(),
464            test_metrics(),
465        );
466
467        let start_time = std::time::Instant::now();
468
469        // The collector starts with an immediate poll tick, creating an empty batch
470        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
471        assert_eq!(initial_batch.batch_len, 0);
472
473        // Send data that's just below MIN_EAGER_ROWS threshold.
474        let below_threshold =
475            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
476        processor_tx.send(below_threshold).await.unwrap();
477
478        // Try to receive with timeout - should timeout since we're below threshold
479        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
480
481        // Now send one more entry to cross the MIN_EAGER_ROWS threshold
482        let threshold_trigger = IndexedCheckpoint::new(
483            0,
484            2,
485            20,
486            2000,
487            vec![Entry; 1], // Just 1 more entry to reach 10 total
488        );
489        processor_tx.send(threshold_trigger).await.unwrap();
490
491        // Should immediately get a batch without waiting for the long interval
492        let eager_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
493        assert_eq!(eager_batch.batch_len, TestHandler::MIN_EAGER_ROWS);
494
495        // Verify batch was created quickly (much less than 60 seconds)
496        let elapsed = start_time.elapsed();
497        assert!(elapsed < Duration::from_secs(10));
498    }
499
500    #[tokio::test]
501    async fn test_immediate_batch_on_min_eager_rows() {
502        let (processor_tx, processor_rx) = mpsc::channel(10);
503        let (collector_tx, mut collector_rx) = mpsc::channel(10);
504        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
505
506        // Set a very long collect interval (60 seconds) to ensure timing doesn't trigger batching
507        let config = CommitterConfig {
508            collect_interval_ms: 60_000,
509            ..CommitterConfig::default()
510        };
511        let handler = Arc::new(TestHandler);
512        let _collector = collector::<TestHandler>(
513            handler,
514            config,
515            processor_rx,
516            collector_tx,
517            main_reader_lo.clone(),
518            test_metrics(),
519        );
520
521        // The collector starts with an immediate poll tick, creating an empty batch
522        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
523        assert_eq!(initial_batch.batch_len, 0);
524        // The collector will then just wait for the next poll as there is no new data yet.
525        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
526
527        let start_time = std::time::Instant::now();
528
529        // Send exactly MIN_EAGER_ROWS in one checkpoint
530        let exact_threshold =
531            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS]);
532        processor_tx.send(exact_threshold).await.unwrap();
533
534        // Should trigger immediately since pending_rows >= MIN_EAGER_ROWS.
535        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
536        assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS);
537
538        // Verify batch was created quickly (much less than 60 seconds)
539        let elapsed = start_time.elapsed();
540        assert!(elapsed < Duration::from_secs(10));
541    }
542
543    #[tokio::test]
544    async fn test_collector_waits_for_timer_when_below_eager_threshold() {
545        let (processor_tx, processor_rx) = mpsc::channel(10);
546        let (collector_tx, mut collector_rx) = mpsc::channel(10);
547        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
548
549        // Set a reasonable collect interval for this test (3 seconds).
550        let config = CommitterConfig {
551            collect_interval_ms: 3000,
552            ..CommitterConfig::default()
553        };
554        let handler = Arc::new(TestHandler);
555        let _collector = collector::<TestHandler>(
556            handler,
557            config,
558            processor_rx,
559            collector_tx,
560            main_reader_lo.clone(),
561            test_metrics(),
562        );
563
564        // Consume initial empty batch
565        let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
566        assert_eq!(initial_batch.batch_len, 0);
567
568        // Send MIN_EAGER_ROWS - 1 entries (below threshold)
569        let below_threshold =
570            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
571        processor_tx.send(below_threshold).await.unwrap();
572
573        // Try to receive with timeout - should timeout since we're below threshold
574        expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
575
576        // Should eventually get batch when timer triggers
577        let timer_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(4)).await;
578        assert_eq!(timer_batch.batch_len, TestHandler::MIN_EAGER_ROWS - 1);
579    }
580
581    /// The collector must wait for `main_reader_lo` to be initialized before attempting to prepare
582    /// checkpoints for commit.
583    #[tokio::test(start_paused = true)]
584    async fn test_collector_waits_for_main_reader_lo_init() {
585        let (processor_tx, processor_rx) = mpsc::channel(10);
586        let (collector_tx, mut collector_rx) = mpsc::channel(10);
587        let main_reader_lo = Arc::new(SetOnce::new());
588
589        let handler = Arc::new(TestHandler);
590        let collector = collector(
591            handler,
592            CommitterConfig {
593                // Collect interval longer than time to advance to ensure timing doesn't trigger
594                // batching.
595                collect_interval_ms: 200_000,
596                ..CommitterConfig::default()
597            },
598            processor_rx,
599            collector_tx,
600            main_reader_lo.clone(),
601            test_metrics(),
602        );
603
604        // Send enough data to trigger batching.
605        let test_data =
606            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS + 1]);
607        processor_tx.send(test_data).await.unwrap();
608
609        // Advance time significantly - collector should still be blocked waiting for
610        // main_reader_lo.
611        tokio::time::advance(Duration::from_secs(100)).await;
612
613        assert!(collector_rx.try_recv().is_err());
614
615        // Now initialize the main reader lo to 0, unblocking the collector.
616        main_reader_lo.set(AtomicU64::new(0)).ok();
617
618        tokio::time::advance(Duration::from_secs(1)).await;
619
620        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
621
622        assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS + 1);
623
624        collector.shutdown().await.unwrap();
625    }
626
627    /// When receiving checkpoints, if they are below the main reader lo, they should be dropped
628    /// immediately.
629    #[tokio::test]
630    async fn test_collector_drops_checkpoints_immediately_if_le_main_reader_lo() {
631        let (processor_tx, processor_rx) = mpsc::channel(10);
632        let (collector_tx, mut collector_rx) = mpsc::channel(10);
633        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(5))));
634        let metrics = test_metrics();
635
636        let collector = collector(
637            Arc::new(TestHandler),
638            CommitterConfig {
639                // Collect interval longer than time to advance to ensure timing doesn't trigger
640                // batching.
641                collect_interval_ms: 200_000,
642                ..CommitterConfig::default()
643            },
644            processor_rx,
645            collector_tx,
646            main_reader_lo.clone(),
647            metrics.clone(),
648        );
649
650        let eager_rows_plus_one = TestHandler::MIN_EAGER_ROWS + 1;
651
652        let test_data: Vec<_> = [1, 5, 2, 6, 4, 3]
653            .into_iter()
654            .map(|cp| IndexedCheckpoint::new(0, cp, 10, 1000, vec![Entry; eager_rows_plus_one]))
655            .collect();
656        for data in test_data {
657            processor_tx.send(data).await.unwrap();
658        }
659        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
660
661        // Make sure that we are advancing watermarks.
662        assert_eq!(batch.watermark.len(), 6);
663        // And reporting the checkpoints as received.
664        assert_eq!(
665            metrics
666                .total_collector_checkpoints_received
667                .with_label_values(&[TestHandler::NAME])
668                .get(),
669            6
670        );
671        // But the collector should filter out four checkpoints: (1, 2, 3, 4)
672        assert_eq!(
673            metrics
674                .total_collector_skipped_checkpoints
675                .with_label_values(&[TestHandler::NAME])
676                .get(),
677            4
678        );
679        // And that we only have values from two checkpoints (5, 6)
680        assert_eq!(batch.batch_len, eager_rows_plus_one * 2);
681
682        collector.shutdown().await.unwrap();
683    }
684
685    /// Because a checkpoint may be partially batched before the main reader lo advances past it,
686    /// the collector must ensure that it fully writes out the checkpoint. Otherwise, this will
687    /// essentially stall the commit_watermark task indefinitely as the latter waits for the
688    /// remaining checkpoint parts.
689    #[tokio::test(start_paused = true)]
690    async fn test_collector_only_filters_whole_checkpoints() {
691        let (processor_tx, processor_rx) = mpsc::channel(10);
692        let (collector_tx, mut collector_rx) = mpsc::channel(10);
693        let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
694
695        let metrics = test_metrics();
696
697        let collector = collector(
698            Arc::new(TestHandler),
699            CommitterConfig::default(),
700            processor_rx,
701            collector_tx,
702            main_reader_lo.clone(),
703            metrics.clone(),
704        );
705
706        let more_than_max_chunk_rows = TEST_MAX_CHUNK_ROWS + 10;
707
708        let test_data =
709            IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; more_than_max_chunk_rows]);
710        processor_tx.send(test_data).await.unwrap();
711        tokio::time::advance(Duration::from_secs(1)).await;
712        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
713
714        // There are still 10 rows left to be sent in the next batch.
715        assert_eq!(batch.batch_len, TEST_MAX_CHUNK_ROWS);
716
717        // Send indexed checkpoints 2 through 5 inclusive, but also bump the main reader lo to 4.
718        let test_data: Vec<_> = (2..=5)
719            .map(|cp| {
720                IndexedCheckpoint::new(
721                    0,
722                    cp,
723                    10,
724                    1000,
725                    vec![Entry; TestHandler::MIN_EAGER_ROWS + 1],
726                )
727            })
728            .collect();
729        for data in test_data {
730            processor_tx.send(data).await.unwrap();
731        }
732        let atomic = main_reader_lo.get().unwrap();
733        atomic.store(4, Ordering::Relaxed);
734        tokio::time::advance(Duration::from_secs(10)).await;
735
736        let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
737
738        // The next batch should still be the remaining 10 rows from checkpoint 1.
739        assert_eq!(batch.batch_len, 10);
740        assert_eq!(batch.watermark[0].watermark.checkpoint_hi_inclusive, 1);
741
742        recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
743
744        assert_eq!(
745            metrics
746                .total_collector_skipped_checkpoints
747                .with_label_values(&[TestHandler::NAME])
748                .get(),
749            2
750        );
751        assert_eq!(
752            metrics
753                .total_collector_checkpoints_received
754                .with_label_values(&[TestHandler::NAME])
755                .get(),
756            5
757        );
758
759        collector.shutdown().await.unwrap();
760    }
761}