1use std::collections::BTreeMap;
5use std::sync::Arc;
6use std::sync::atomic::AtomicU64;
7use std::sync::atomic::Ordering;
8
9use sui_futures::service::Service;
10use tokio::sync::SetOnce;
11use tokio::sync::mpsc;
12use tokio::time::MissedTickBehavior;
13use tokio::time::interval;
14use tracing::debug;
15use tracing::info;
16
17use crate::metrics::CheckpointLagMetricReporter;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IndexedCheckpoint;
21use crate::pipeline::WatermarkPart;
22use crate::pipeline::concurrent::BatchStatus;
23use crate::pipeline::concurrent::BatchedRows;
24use crate::pipeline::concurrent::Handler;
25
26struct PendingCheckpoint<H: Handler> {
29 values: std::vec::IntoIter<H::Value>,
31 watermark: WatermarkPart,
33}
34
35impl<H: Handler> PendingCheckpoint<H> {
36 fn is_empty(&self) -> bool {
38 let empty = self.values.len() == 0;
39 debug_assert!(!empty || self.watermark.batch_rows == 0);
40 empty
41 }
42}
43
44impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
45 fn from(indexed: IndexedCheckpoint<H>) -> Self {
46 let total_rows = indexed.values.len();
47 Self {
48 watermark: WatermarkPart {
49 watermark: indexed.watermark,
50 batch_rows: total_rows,
51 total_rows,
52 },
53 values: indexed.values.into_iter(),
54 }
55 }
56}
57
58pub(super) fn collector<H: Handler + 'static>(
74 handler: Arc<H>,
75 config: CommitterConfig,
76 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
77 tx: mpsc::Sender<BatchedRows<H>>,
78 main_reader_lo: Arc<SetOnce<AtomicU64>>,
79 metrics: Arc<IndexerMetrics>,
80 min_eager_rows: usize,
81 max_pending_rows: usize,
82 max_watermark_updates: usize,
83) -> Service {
84 Service::new().spawn_aborting(async move {
85 let mut poll = interval(config.collect_interval());
88 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
89
90 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
91 &metrics.collected_checkpoint_timestamp_lag,
92 &metrics.latest_collected_checkpoint_timestamp_lag_ms,
93 &metrics.latest_collected_checkpoint,
94 );
95
96 let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
98 let mut pending_rows = 0;
99
100 info!(pipeline = H::NAME, "Starting collector");
101
102 let reader_lo_atomic = main_reader_lo.wait().await;
104
105 loop {
106 tokio::select! {
108 biased;
109
110 Some(mut indexed) = rx.recv(), if pending_rows < max_pending_rows => {
112 let reader_lo = reader_lo_atomic.load(Ordering::Relaxed);
113
114 metrics
115 .collector_reader_lo
116 .with_label_values(&[H::NAME])
117 .set(reader_lo as i64);
118
119 let mut recv_cps = 0usize;
120 let mut recv_rows = 0usize;
121 loop {
122 if indexed.checkpoint() < reader_lo {
123 indexed.values.clear();
124 metrics
125 .total_collector_skipped_checkpoints
126 .with_label_values(&[H::NAME])
127 .inc();
128 }
129
130 recv_cps += 1;
131 recv_rows += indexed.len();
132 pending_rows += indexed.len();
133 pending.insert(indexed.checkpoint(), indexed.into());
134
135 if pending_rows >= max_pending_rows {
136 break;
137 }
138
139 match rx.try_recv() {
140 Ok(next) => indexed = next,
141 Err(_) => break,
142 }
143 }
144
145 metrics
146 .total_collector_rows_received
147 .with_label_values(&[H::NAME])
148 .inc_by(recv_rows as u64);
149 metrics
150 .total_collector_checkpoints_received
151 .with_label_values(&[H::NAME])
152 .inc_by(recv_cps as u64);
153
154 if pending_rows < min_eager_rows {
155 continue;
156 }
157 }
158 _ = poll.tick() => {}
162 }
163
164 loop {
169 let guard = metrics
170 .collector_gather_latency
171 .with_label_values(&[H::NAME])
172 .start_timer();
173
174 let mut batch = H::Batch::default();
175 let mut watermark = Vec::new();
176 let mut batch_len = 0;
177
178 loop {
179 let Some(mut entry) = pending.first_entry() else {
180 break;
181 };
182
183 if watermark.len() >= max_watermark_updates {
184 break;
185 }
186
187 let indexed = entry.get_mut();
188 let before = indexed.values.len();
189 let status = handler.batch(&mut batch, &mut indexed.values);
190 let taken = before - indexed.values.len();
191
192 batch_len += taken;
193 watermark.push(indexed.watermark.take(taken));
194 if indexed.is_empty() {
195 checkpoint_lag_reporter.report_lag(
196 indexed.watermark.checkpoint(),
197 indexed.watermark.timestamp_ms(),
198 );
199 entry.remove();
200 }
201
202 if status == BatchStatus::Ready {
203 break;
204 }
205 }
206
207 let elapsed = guard.stop_and_record();
208 debug!(
209 pipeline = H::NAME,
210 elapsed_ms = elapsed * 1000.0,
211 rows = batch_len,
212 "Gathered batch",
213 );
214
215 metrics
216 .total_collector_batches_created
217 .with_label_values(&[H::NAME])
218 .inc();
219
220 metrics
221 .collector_batch_size
222 .with_label_values(&[H::NAME])
223 .observe(batch_len as f64);
224
225 pending_rows -= batch_len;
226
227 let batched_rows = BatchedRows {
228 batch,
229 batch_len,
230 watermark,
231 };
232 if tx.send(batched_rows).await.is_err() {
233 info!(
234 pipeline = H::NAME,
235 "Committer closed channel, stopping collector"
236 );
237 return Ok(());
238 }
239
240 if pending.is_empty() {
241 break;
242 }
243 }
244
245 if rx.is_closed() && rx.is_empty() && pending_rows == 0 {
246 info!(
247 pipeline = H::NAME,
248 "Processor closed channel, pending rows empty, stopping collector",
249 );
250 break;
251 }
252 }
253
254 Ok(())
255 })
256}
257
258#[cfg(test)]
259mod tests {
260 use std::time::Duration;
261
262 use async_trait::async_trait;
263 use sui_pg_db::Connection;
264 use sui_pg_db::Db;
265 use tokio::sync::mpsc;
266
267 use crate::metrics::tests::test_metrics;
268 use crate::pipeline::Processor;
269 use crate::pipeline::concurrent::BatchStatus;
270 use crate::types::full_checkpoint_content::Checkpoint;
271
272 use super::*;
273
274 #[derive(Clone)]
275 struct Entry;
276
277 struct TestHandler;
278
279 const TEST_MAX_CHUNK_ROWS: usize = 1024;
281
282 #[async_trait]
283 impl Processor for TestHandler {
284 type Value = Entry;
285 const NAME: &'static str = "test_handler";
286
287 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
288 Ok(vec![])
289 }
290 }
291
292 #[async_trait]
293 impl Handler for TestHandler {
294 type Store = Db;
295 type Batch = Vec<Entry>;
296
297 const MIN_EAGER_ROWS: usize = 10;
298 const MAX_PENDING_ROWS: usize = 10000;
299
300 fn batch(
301 &self,
302 batch: &mut Self::Batch,
303 values: &mut std::vec::IntoIter<Self::Value>,
304 ) -> BatchStatus {
305 let remaining_capacity = TEST_MAX_CHUNK_ROWS.saturating_sub(batch.len());
307 let to_take = remaining_capacity.min(values.len());
308 batch.extend(values.take(to_take));
309
310 if batch.len() >= TEST_MAX_CHUNK_ROWS {
311 BatchStatus::Ready
312 } else {
313 BatchStatus::Pending
314 }
315 }
316
317 async fn commit<'a>(
318 &self,
319 _batch: &Self::Batch,
320 _conn: &mut Connection<'a>,
321 ) -> anyhow::Result<usize> {
322 tokio::time::sleep(Duration::from_millis(1000)).await;
323 Ok(0)
324 }
325 }
326
327 async fn expect_timeout<H: Handler + 'static>(
329 rx: &mut mpsc::Receiver<BatchedRows<H>>,
330 duration: Duration,
331 ) {
332 match tokio::time::timeout(duration, rx.recv()).await {
333 Err(_) => (), Ok(_) => panic!("Expected timeout but received data instead"),
335 }
336 }
337
338 async fn recv_with_timeout<H: Handler + 'static>(
341 rx: &mut mpsc::Receiver<BatchedRows<H>>,
342 timeout: Duration,
343 ) -> BatchedRows<H> {
344 match tokio::time::timeout(timeout, rx.recv()).await {
345 Ok(Some(batch)) => batch,
346 Ok(None) => panic!("Collector channel was closed unexpectedly"),
347 Err(_) => panic!("Test timed out waiting for batch from collector"),
348 }
349 }
350
351 #[tokio::test]
352 async fn test_collector_batches_data() {
353 let (processor_tx, processor_rx) = mpsc::channel(10);
354 let (collector_tx, mut collector_rx) = mpsc::channel(10);
355 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
356
357 let handler = Arc::new(TestHandler);
358 let _collector = collector::<TestHandler>(
359 handler,
360 CommitterConfig::default(),
361 processor_rx,
362 collector_tx,
363 main_reader_lo.clone(),
364 test_metrics(),
365 TestHandler::MIN_EAGER_ROWS,
366 TestHandler::MAX_PENDING_ROWS,
367 TestHandler::MAX_WATERMARK_UPDATES,
368 );
369
370 let part1_length = TEST_MAX_CHUNK_ROWS / 2;
371 let part2_length = TEST_MAX_CHUNK_ROWS - part1_length - 1;
372
373 let test_data = vec![
375 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]),
376 IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]),
377 IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]),
378 ];
379
380 for data in test_data {
381 processor_tx.send(data).await.unwrap();
382 }
383
384 let batch1 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
385 assert_eq!(batch1.batch_len, TEST_MAX_CHUNK_ROWS);
386
387 let batch2 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
388 assert_eq!(batch2.batch_len, 1);
389 }
390
391 #[tokio::test]
392 async fn test_collector_shutdown() {
393 let (processor_tx, processor_rx) = mpsc::channel(10);
394 let (collector_tx, mut collector_rx) = mpsc::channel(10);
395 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
396
397 let handler = Arc::new(TestHandler);
398 let mut collector = collector::<TestHandler>(
399 handler,
400 CommitterConfig::default(),
401 processor_rx,
402 collector_tx,
403 main_reader_lo,
404 test_metrics(),
405 TestHandler::MIN_EAGER_ROWS,
406 TestHandler::MAX_PENDING_ROWS,
407 TestHandler::MAX_WATERMARK_UPDATES,
408 );
409
410 processor_tx
411 .send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry]))
412 .await
413 .unwrap();
414
415 tokio::time::sleep(Duration::from_millis(200)).await;
416
417 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
418 assert_eq!(batch.batch_len, 2);
419
420 drop(processor_tx);
422
423 tokio::time::timeout(Duration::from_millis(500), collector.join())
425 .await
426 .expect("collector shutdown timeout")
427 .expect("collector shutdown failed");
428 }
429
430 #[tokio::test]
431 async fn test_collector_respects_max_pending() {
432 let processor_channel_size = 5; let collector_channel_size = 2; let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
435 let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);
436 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
437
438 let metrics = test_metrics();
439
440 let handler = Arc::new(TestHandler);
441 let _collector = collector::<TestHandler>(
442 handler,
443 CommitterConfig::default(),
444 processor_rx,
445 collector_tx,
446 main_reader_lo.clone(),
447 metrics.clone(),
448 TestHandler::MIN_EAGER_ROWS,
449 TestHandler::MAX_PENDING_ROWS,
450 TestHandler::MAX_WATERMARK_UPDATES,
451 );
452
453 let data = IndexedCheckpoint::new(
455 0,
456 1,
457 10,
458 1000,
459 vec![
460 Entry;
461 TestHandler::MAX_PENDING_ROWS
463 + TEST_MAX_CHUNK_ROWS * collector_channel_size
464 ],
465 );
466 processor_tx.send(data).await.unwrap();
467
468 tokio::time::sleep(Duration::from_millis(200)).await;
469
470 for _ in 0..processor_channel_size {
472 let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]);
473 processor_tx.send(more_data).await.unwrap();
474 }
475
476 let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]);
478
479 let send_result = processor_tx.try_send(even_more_data);
480 assert!(matches!(
481 send_result,
482 Err(mpsc::error::TrySendError::Full(_))
483 ));
484 }
485
486 #[tokio::test]
487 async fn test_collector_accumulates_across_checkpoints_until_eager_threshold() {
488 let (processor_tx, processor_rx) = mpsc::channel(10);
489 let (collector_tx, mut collector_rx) = mpsc::channel(10);
490 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
491
492 let config = CommitterConfig {
494 collect_interval_ms: 60_000,
495 ..CommitterConfig::default()
496 };
497 let handler = Arc::new(TestHandler);
498 let _collector = collector::<TestHandler>(
499 handler,
500 config,
501 processor_rx,
502 collector_tx,
503 main_reader_lo.clone(),
504 test_metrics(),
505 TestHandler::MIN_EAGER_ROWS,
506 TestHandler::MAX_PENDING_ROWS,
507 TestHandler::MAX_WATERMARK_UPDATES,
508 );
509
510 let start_time = std::time::Instant::now();
511
512 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
514 assert_eq!(initial_batch.batch_len, 0);
515
516 let below_threshold =
518 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
519 processor_tx.send(below_threshold).await.unwrap();
520
521 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
523
524 let threshold_trigger = IndexedCheckpoint::new(
526 0,
527 2,
528 20,
529 2000,
530 vec![Entry; 1], );
532 processor_tx.send(threshold_trigger).await.unwrap();
533
534 let eager_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
536 assert_eq!(eager_batch.batch_len, TestHandler::MIN_EAGER_ROWS);
537
538 let elapsed = start_time.elapsed();
540 assert!(elapsed < Duration::from_secs(10));
541 }
542
543 #[tokio::test]
544 async fn test_immediate_batch_on_min_eager_rows() {
545 let (processor_tx, processor_rx) = mpsc::channel(10);
546 let (collector_tx, mut collector_rx) = mpsc::channel(10);
547 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
548
549 let config = CommitterConfig {
551 collect_interval_ms: 60_000,
552 ..CommitterConfig::default()
553 };
554 let handler = Arc::new(TestHandler);
555 let _collector = collector::<TestHandler>(
556 handler,
557 config,
558 processor_rx,
559 collector_tx,
560 main_reader_lo.clone(),
561 test_metrics(),
562 TestHandler::MIN_EAGER_ROWS,
563 TestHandler::MAX_PENDING_ROWS,
564 TestHandler::MAX_WATERMARK_UPDATES,
565 );
566
567 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
569 assert_eq!(initial_batch.batch_len, 0);
570 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
572
573 let start_time = std::time::Instant::now();
574
575 let exact_threshold =
577 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS]);
578 processor_tx.send(exact_threshold).await.unwrap();
579
580 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
582 assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS);
583
584 let elapsed = start_time.elapsed();
586 assert!(elapsed < Duration::from_secs(10));
587 }
588
589 #[tokio::test]
590 async fn test_collector_waits_for_timer_when_below_eager_threshold() {
591 let (processor_tx, processor_rx) = mpsc::channel(10);
592 let (collector_tx, mut collector_rx) = mpsc::channel(10);
593 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
594
595 let config = CommitterConfig {
597 collect_interval_ms: 3000,
598 ..CommitterConfig::default()
599 };
600 let handler = Arc::new(TestHandler);
601 let _collector = collector::<TestHandler>(
602 handler,
603 config,
604 processor_rx,
605 collector_tx,
606 main_reader_lo.clone(),
607 test_metrics(),
608 TestHandler::MIN_EAGER_ROWS,
609 TestHandler::MAX_PENDING_ROWS,
610 TestHandler::MAX_WATERMARK_UPDATES,
611 );
612
613 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
615 assert_eq!(initial_batch.batch_len, 0);
616
617 let below_threshold =
619 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
620 processor_tx.send(below_threshold).await.unwrap();
621
622 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
624
625 let timer_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(4)).await;
627 assert_eq!(timer_batch.batch_len, TestHandler::MIN_EAGER_ROWS - 1);
628 }
629
630 #[tokio::test(start_paused = true)]
633 async fn test_collector_waits_for_main_reader_lo_init() {
634 let (processor_tx, processor_rx) = mpsc::channel(10);
635 let (collector_tx, mut collector_rx) = mpsc::channel(10);
636 let main_reader_lo = Arc::new(SetOnce::new());
637
638 let handler = Arc::new(TestHandler);
639 let collector = collector(
640 handler,
641 CommitterConfig {
642 collect_interval_ms: 200_000,
645 ..CommitterConfig::default()
646 },
647 processor_rx,
648 collector_tx,
649 main_reader_lo.clone(),
650 test_metrics(),
651 TestHandler::MIN_EAGER_ROWS,
652 TestHandler::MAX_PENDING_ROWS,
653 TestHandler::MAX_WATERMARK_UPDATES,
654 );
655
656 let test_data =
658 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS + 1]);
659 processor_tx.send(test_data).await.unwrap();
660
661 tokio::time::advance(Duration::from_secs(100)).await;
664
665 assert!(collector_rx.try_recv().is_err());
666
667 main_reader_lo.set(AtomicU64::new(0)).ok();
669
670 tokio::time::advance(Duration::from_secs(1)).await;
671
672 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
673
674 assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS + 1);
675
676 collector.shutdown().await.unwrap();
677 }
678
679 #[tokio::test]
682 async fn test_collector_drops_checkpoints_immediately_if_le_main_reader_lo() {
683 let (processor_tx, processor_rx) = mpsc::channel(10);
684 let (collector_tx, mut collector_rx) = mpsc::channel(10);
685 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(5))));
686 let metrics = test_metrics();
687
688 let collector = collector(
689 Arc::new(TestHandler),
690 CommitterConfig {
691 collect_interval_ms: 200_000,
694 ..CommitterConfig::default()
695 },
696 processor_rx,
697 collector_tx,
698 main_reader_lo.clone(),
699 metrics.clone(),
700 TestHandler::MIN_EAGER_ROWS,
701 TestHandler::MAX_PENDING_ROWS,
702 TestHandler::MAX_WATERMARK_UPDATES,
703 );
704
705 let eager_rows_plus_one = TestHandler::MIN_EAGER_ROWS + 1;
706
707 let test_data: Vec<_> = [1, 5, 2, 6, 4, 3]
708 .into_iter()
709 .map(|cp| IndexedCheckpoint::new(0, cp, 10, 1000, vec![Entry; eager_rows_plus_one]))
710 .collect();
711 for data in test_data {
712 processor_tx.send(data).await.unwrap();
713 }
714 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
715
716 assert_eq!(batch.watermark.len(), 6);
718 assert_eq!(
720 metrics
721 .total_collector_checkpoints_received
722 .with_label_values(&[TestHandler::NAME])
723 .get(),
724 6
725 );
726 assert_eq!(
728 metrics
729 .total_collector_skipped_checkpoints
730 .with_label_values(&[TestHandler::NAME])
731 .get(),
732 4
733 );
734 assert_eq!(batch.batch_len, eager_rows_plus_one * 2);
736
737 collector.shutdown().await.unwrap();
738 }
739
740 #[tokio::test(start_paused = true)]
745 async fn test_collector_only_filters_whole_checkpoints() {
746 let (processor_tx, processor_rx) = mpsc::channel(10);
747 let (collector_tx, mut collector_rx) = mpsc::channel(10);
748 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
749
750 let metrics = test_metrics();
751
752 let collector = collector(
753 Arc::new(TestHandler),
754 CommitterConfig::default(),
755 processor_rx,
756 collector_tx,
757 main_reader_lo.clone(),
758 metrics.clone(),
759 TestHandler::MIN_EAGER_ROWS,
760 TestHandler::MAX_PENDING_ROWS,
761 TestHandler::MAX_WATERMARK_UPDATES,
762 );
763
764 let more_than_max_chunk_rows = TEST_MAX_CHUNK_ROWS + 10;
765
766 let test_data =
767 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; more_than_max_chunk_rows]);
768 processor_tx.send(test_data).await.unwrap();
769 tokio::time::advance(Duration::from_secs(1)).await;
770 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
771
772 assert_eq!(batch.batch_len, TEST_MAX_CHUNK_ROWS);
774
775 let test_data: Vec<_> = (2..=5)
777 .map(|cp| {
778 IndexedCheckpoint::new(
779 0,
780 cp,
781 10,
782 1000,
783 vec![Entry; TestHandler::MIN_EAGER_ROWS + 1],
784 )
785 })
786 .collect();
787 for data in test_data {
788 processor_tx.send(data).await.unwrap();
789 }
790 let atomic = main_reader_lo.get().unwrap();
791 atomic.store(4, Ordering::Relaxed);
792 tokio::time::advance(Duration::from_secs(10)).await;
793
794 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
795
796 assert_eq!(batch.batch_len, 10);
798 assert_eq!(batch.watermark[0].watermark.checkpoint_hi_inclusive, 1);
799
800 recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
801
802 assert_eq!(
803 metrics
804 .total_collector_skipped_checkpoints
805 .with_label_values(&[TestHandler::NAME])
806 .get(),
807 2
808 );
809 assert_eq!(
810 metrics
811 .total_collector_checkpoints_received
812 .with_label_values(&[TestHandler::NAME])
813 .get(),
814 5
815 );
816
817 collector.shutdown().await.unwrap();
818 }
819}