1use std::{
5 collections::BTreeMap,
6 sync::{
7 Arc,
8 atomic::{AtomicU64, Ordering},
9 },
10};
11
12use sui_futures::service::Service;
13use tokio::{
14 sync::{SetOnce, mpsc},
15 time::{MissedTickBehavior, interval},
16};
17use tracing::{debug, info};
18
19use crate::{
20 metrics::{CheckpointLagMetricReporter, IndexerMetrics},
21 pipeline::{CommitterConfig, IndexedCheckpoint, WatermarkPart},
22};
23
24use super::{BatchStatus, BatchedRows, Handler};
25
26struct PendingCheckpoint<H: Handler> {
29 values: std::vec::IntoIter<H::Value>,
31 watermark: WatermarkPart,
33}
34
35impl<H: Handler> PendingCheckpoint<H> {
36 fn is_empty(&self) -> bool {
38 let empty = self.values.len() == 0;
39 debug_assert!(!empty || self.watermark.batch_rows == 0);
40 empty
41 }
42}
43
44impl<H: Handler> From<IndexedCheckpoint<H>> for PendingCheckpoint<H> {
45 fn from(indexed: IndexedCheckpoint<H>) -> Self {
46 let total_rows = indexed.values.len();
47 Self {
48 watermark: WatermarkPart {
49 watermark: indexed.watermark,
50 batch_rows: total_rows,
51 total_rows,
52 },
53 values: indexed.values.into_iter(),
54 }
55 }
56}
57
58pub(super) fn collector<H: Handler + 'static>(
74 handler: Arc<H>,
75 config: CommitterConfig,
76 mut rx: mpsc::Receiver<IndexedCheckpoint<H>>,
77 tx: mpsc::Sender<BatchedRows<H>>,
78 main_reader_lo: Arc<SetOnce<AtomicU64>>,
79 metrics: Arc<IndexerMetrics>,
80) -> Service {
81 Service::new().spawn_aborting(async move {
82 let mut poll = interval(config.collect_interval());
85 poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
86
87 let checkpoint_lag_reporter = CheckpointLagMetricReporter::new_for_pipeline::<H>(
88 &metrics.collected_checkpoint_timestamp_lag,
89 &metrics.latest_collected_checkpoint_timestamp_lag_ms,
90 &metrics.latest_collected_checkpoint,
91 );
92
93 let mut pending: BTreeMap<u64, PendingCheckpoint<H>> = BTreeMap::new();
95 let mut pending_rows = 0;
96
97 info!(pipeline = H::NAME, "Starting collector");
98
99 loop {
100 tokio::select! {
101 _ = poll.tick() => {
103 let guard = metrics
104 .collector_gather_latency
105 .with_label_values(&[H::NAME])
106 .start_timer();
107
108 let mut batch = H::Batch::default();
109 let mut watermark = Vec::new();
110 let mut batch_len = 0;
111
112 loop {
113 let Some(mut entry) = pending.first_entry() else {
114 break;
115 };
116
117 if watermark.len() >= H::MAX_WATERMARK_UPDATES {
118 break;
119 }
120
121 let indexed = entry.get_mut();
122 let before = indexed.values.len();
123 let status = handler.batch(&mut batch, &mut indexed.values);
124 let taken = before - indexed.values.len();
125
126 batch_len += taken;
127 watermark.push(indexed.watermark.take(taken));
128 if indexed.is_empty() {
129 checkpoint_lag_reporter.report_lag(
130 indexed.watermark.checkpoint(),
131 indexed.watermark.timestamp_ms(),
132 );
133 entry.remove();
134 }
135
136 if status == BatchStatus::Ready {
137 break;
139 }
140 }
141 pending_rows -= batch_len;
142 let elapsed = guard.stop_and_record();
143 debug!(
144 pipeline = H::NAME,
145 elapsed_ms = elapsed * 1000.0,
146 rows = batch_len,
147 pending_rows = pending_rows,
148 "Gathered batch",
149 );
150
151 metrics
152 .total_collector_batches_created
153 .with_label_values(&[H::NAME])
154 .inc();
155
156 metrics
157 .collector_batch_size
158 .with_label_values(&[H::NAME])
159 .observe(batch_len as f64);
160
161 let batched_rows = BatchedRows {
162 batch,
163 batch_len,
164 watermark,
165 };
166
167 if tx.send(batched_rows).await.is_err() {
168 info!(pipeline = H::NAME, "Committer closed channel, stopping collector");
169 break;
170 }
171
172 if pending_rows > 0 {
173 poll.reset_immediately();
174 } else if rx.is_closed() && rx.is_empty() {
175 info!(
176 pipeline = H::NAME,
177 "Processor closed channel, pending rows empty, stopping collector",
178 );
179 break;
180 }
181 }
182
183 Some(mut indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_ROWS => {
185 let reader_lo = main_reader_lo.wait().await.load(Ordering::Relaxed);
188 if indexed.checkpoint() < reader_lo {
189 indexed.values.clear();
190 metrics.total_collector_skipped_checkpoints
191 .with_label_values(&[H::NAME])
192 .inc();
193 }
194
195 metrics
196 .total_collector_rows_received
197 .with_label_values(&[H::NAME])
198 .inc_by(indexed.len() as u64);
199 metrics
200 .total_collector_checkpoints_received
201 .with_label_values(&[H::NAME])
202 .inc();
203 metrics
204 .collector_reader_lo
205 .with_label_values(&[H::NAME])
206 .set(reader_lo as i64);
207
208 pending_rows += indexed.len();
209 pending.insert(indexed.checkpoint(), indexed.into());
210
211 if pending_rows >= H::MIN_EAGER_ROWS {
212 poll.reset_immediately()
213 }
214 }
215 }
217 }
218
219 Ok(())
220 })
221}
222
223#[cfg(test)]
224mod tests {
225 use std::time::Duration;
226
227 use async_trait::async_trait;
228 use sui_pg_db::{Connection, Db};
229 use tokio::sync::mpsc;
230
231 use crate::{
232 metrics::tests::test_metrics,
233 pipeline::{Processor, concurrent::BatchStatus},
234 types::full_checkpoint_content::Checkpoint,
235 };
236
237 use super::*;
238
239 #[derive(Clone)]
240 struct Entry;
241
242 struct TestHandler;
243
244 const TEST_MAX_CHUNK_ROWS: usize = 1024;
246
247 #[async_trait]
248 impl Processor for TestHandler {
249 type Value = Entry;
250 const NAME: &'static str = "test_handler";
251 const FANOUT: usize = 1;
252
253 async fn process(&self, _checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
254 Ok(vec![])
255 }
256 }
257
258 #[async_trait]
259 impl Handler for TestHandler {
260 type Store = Db;
261 type Batch = Vec<Entry>;
262
263 const MIN_EAGER_ROWS: usize = 10;
264 const MAX_PENDING_ROWS: usize = 10000;
265
266 fn batch(
267 &self,
268 batch: &mut Self::Batch,
269 values: &mut std::vec::IntoIter<Self::Value>,
270 ) -> BatchStatus {
271 let remaining_capacity = TEST_MAX_CHUNK_ROWS.saturating_sub(batch.len());
273 let to_take = remaining_capacity.min(values.len());
274 batch.extend(values.take(to_take));
275
276 if batch.len() >= TEST_MAX_CHUNK_ROWS {
277 BatchStatus::Ready
278 } else {
279 BatchStatus::Pending
280 }
281 }
282
283 async fn commit<'a>(
284 &self,
285 _batch: &Self::Batch,
286 _conn: &mut Connection<'a>,
287 ) -> anyhow::Result<usize> {
288 tokio::time::sleep(Duration::from_millis(1000)).await;
289 Ok(0)
290 }
291 }
292
293 async fn expect_timeout<H: Handler + 'static>(
295 rx: &mut mpsc::Receiver<BatchedRows<H>>,
296 duration: Duration,
297 ) {
298 match tokio::time::timeout(duration, rx.recv()).await {
299 Err(_) => (), Ok(_) => panic!("Expected timeout but received data instead"),
301 }
302 }
303
304 async fn recv_with_timeout<H: Handler + 'static>(
307 rx: &mut mpsc::Receiver<BatchedRows<H>>,
308 timeout: Duration,
309 ) -> BatchedRows<H> {
310 match tokio::time::timeout(timeout, rx.recv()).await {
311 Ok(Some(batch)) => batch,
312 Ok(None) => panic!("Collector channel was closed unexpectedly"),
313 Err(_) => panic!("Test timed out waiting for batch from collector"),
314 }
315 }
316
317 #[tokio::test]
318 async fn test_collector_batches_data() {
319 let (processor_tx, processor_rx) = mpsc::channel(10);
320 let (collector_tx, mut collector_rx) = mpsc::channel(10);
321 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
322
323 let handler = Arc::new(TestHandler);
324 let _collector = collector::<TestHandler>(
325 handler,
326 CommitterConfig::default(),
327 processor_rx,
328 collector_tx,
329 main_reader_lo.clone(),
330 test_metrics(),
331 );
332
333 let part1_length = TEST_MAX_CHUNK_ROWS / 2;
334 let part2_length = TEST_MAX_CHUNK_ROWS - part1_length - 1;
335
336 let test_data = vec![
338 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; part1_length]),
339 IndexedCheckpoint::new(0, 2, 20, 2000, vec![Entry; part2_length]),
340 IndexedCheckpoint::new(0, 3, 30, 3000, vec![Entry, Entry]),
341 ];
342
343 for data in test_data {
344 processor_tx.send(data).await.unwrap();
345 }
346
347 let batch1 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
348 assert_eq!(batch1.batch_len, TEST_MAX_CHUNK_ROWS);
349
350 let batch2 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
351 assert_eq!(batch2.batch_len, 1);
352
353 let batch3 = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
354 assert_eq!(batch3.batch_len, 0);
355 }
356
357 #[tokio::test]
358 async fn test_collector_shutdown() {
359 let (processor_tx, processor_rx) = mpsc::channel(10);
360 let (collector_tx, mut collector_rx) = mpsc::channel(10);
361 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
362
363 let handler = Arc::new(TestHandler);
364 let mut collector = collector::<TestHandler>(
365 handler,
366 CommitterConfig::default(),
367 processor_rx,
368 collector_tx,
369 main_reader_lo,
370 test_metrics(),
371 );
372
373 processor_tx
374 .send(IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry, Entry]))
375 .await
376 .unwrap();
377
378 tokio::time::sleep(Duration::from_millis(200)).await;
379
380 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
381 assert_eq!(batch.batch_len, 2);
382
383 drop(processor_tx);
385
386 tokio::time::timeout(Duration::from_millis(500), collector.join())
388 .await
389 .expect("collector shutdown timeout")
390 .expect("collector shutdown failed");
391 }
392
393 #[tokio::test]
394 async fn test_collector_respects_max_pending() {
395 let processor_channel_size = 5; let collector_channel_size = 2; let (processor_tx, processor_rx) = mpsc::channel(processor_channel_size);
398 let (collector_tx, _collector_rx) = mpsc::channel(collector_channel_size);
399 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
400
401 let metrics = test_metrics();
402
403 let handler = Arc::new(TestHandler);
404 let _collector = collector::<TestHandler>(
405 handler,
406 CommitterConfig::default(),
407 processor_rx,
408 collector_tx,
409 main_reader_lo.clone(),
410 metrics.clone(),
411 );
412
413 let data = IndexedCheckpoint::new(
415 0,
416 1,
417 10,
418 1000,
419 vec![
420 Entry;
421 TestHandler::MAX_PENDING_ROWS
423 + TEST_MAX_CHUNK_ROWS * collector_channel_size
424 ],
425 );
426 processor_tx.send(data).await.unwrap();
427
428 tokio::time::sleep(Duration::from_millis(200)).await;
429
430 for _ in 0..processor_channel_size {
432 let more_data = IndexedCheckpoint::new(0, 2, 11, 1000, vec![Entry]);
433 processor_tx.send(more_data).await.unwrap();
434 }
435
436 let even_more_data = IndexedCheckpoint::new(0, 3, 12, 1000, vec![Entry]);
438
439 let send_result = processor_tx.try_send(even_more_data);
440 assert!(matches!(
441 send_result,
442 Err(mpsc::error::TrySendError::Full(_))
443 ));
444 }
445
446 #[tokio::test]
447 async fn test_collector_accumulates_across_checkpoints_until_eager_threshold() {
448 let (processor_tx, processor_rx) = mpsc::channel(10);
449 let (collector_tx, mut collector_rx) = mpsc::channel(10);
450 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
451
452 let config = CommitterConfig {
454 collect_interval_ms: 60_000,
455 ..CommitterConfig::default()
456 };
457 let handler = Arc::new(TestHandler);
458 let _collector = collector::<TestHandler>(
459 handler,
460 config,
461 processor_rx,
462 collector_tx,
463 main_reader_lo.clone(),
464 test_metrics(),
465 );
466
467 let start_time = std::time::Instant::now();
468
469 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
471 assert_eq!(initial_batch.batch_len, 0);
472
473 let below_threshold =
475 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
476 processor_tx.send(below_threshold).await.unwrap();
477
478 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
480
481 let threshold_trigger = IndexedCheckpoint::new(
483 0,
484 2,
485 20,
486 2000,
487 vec![Entry; 1], );
489 processor_tx.send(threshold_trigger).await.unwrap();
490
491 let eager_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
493 assert_eq!(eager_batch.batch_len, TestHandler::MIN_EAGER_ROWS);
494
495 let elapsed = start_time.elapsed();
497 assert!(elapsed < Duration::from_secs(10));
498 }
499
500 #[tokio::test]
501 async fn test_immediate_batch_on_min_eager_rows() {
502 let (processor_tx, processor_rx) = mpsc::channel(10);
503 let (collector_tx, mut collector_rx) = mpsc::channel(10);
504 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
505
506 let config = CommitterConfig {
508 collect_interval_ms: 60_000,
509 ..CommitterConfig::default()
510 };
511 let handler = Arc::new(TestHandler);
512 let _collector = collector::<TestHandler>(
513 handler,
514 config,
515 processor_rx,
516 collector_tx,
517 main_reader_lo.clone(),
518 test_metrics(),
519 );
520
521 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
523 assert_eq!(initial_batch.batch_len, 0);
524 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
526
527 let start_time = std::time::Instant::now();
528
529 let exact_threshold =
531 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS]);
532 processor_tx.send(exact_threshold).await.unwrap();
533
534 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
536 assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS);
537
538 let elapsed = start_time.elapsed();
540 assert!(elapsed < Duration::from_secs(10));
541 }
542
543 #[tokio::test]
544 async fn test_collector_waits_for_timer_when_below_eager_threshold() {
545 let (processor_tx, processor_rx) = mpsc::channel(10);
546 let (collector_tx, mut collector_rx) = mpsc::channel(10);
547 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
548
549 let config = CommitterConfig {
551 collect_interval_ms: 3000,
552 ..CommitterConfig::default()
553 };
554 let handler = Arc::new(TestHandler);
555 let _collector = collector::<TestHandler>(
556 handler,
557 config,
558 processor_rx,
559 collector_tx,
560 main_reader_lo.clone(),
561 test_metrics(),
562 );
563
564 let initial_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(1)).await;
566 assert_eq!(initial_batch.batch_len, 0);
567
568 let below_threshold =
570 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS - 1]);
571 processor_tx.send(below_threshold).await.unwrap();
572
573 expect_timeout(&mut collector_rx, Duration::from_secs(1)).await;
575
576 let timer_batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(4)).await;
578 assert_eq!(timer_batch.batch_len, TestHandler::MIN_EAGER_ROWS - 1);
579 }
580
581 #[tokio::test(start_paused = true)]
584 async fn test_collector_waits_for_main_reader_lo_init() {
585 let (processor_tx, processor_rx) = mpsc::channel(10);
586 let (collector_tx, mut collector_rx) = mpsc::channel(10);
587 let main_reader_lo = Arc::new(SetOnce::new());
588
589 let handler = Arc::new(TestHandler);
590 let collector = collector(
591 handler,
592 CommitterConfig {
593 collect_interval_ms: 200_000,
596 ..CommitterConfig::default()
597 },
598 processor_rx,
599 collector_tx,
600 main_reader_lo.clone(),
601 test_metrics(),
602 );
603
604 let test_data =
606 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; TestHandler::MIN_EAGER_ROWS + 1]);
607 processor_tx.send(test_data).await.unwrap();
608
609 tokio::time::advance(Duration::from_secs(100)).await;
612
613 assert!(collector_rx.try_recv().is_err());
614
615 main_reader_lo.set(AtomicU64::new(0)).ok();
617
618 tokio::time::advance(Duration::from_secs(1)).await;
619
620 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
621
622 assert_eq!(batch.batch_len, TestHandler::MIN_EAGER_ROWS + 1);
623
624 collector.shutdown().await.unwrap();
625 }
626
627 #[tokio::test]
630 async fn test_collector_drops_checkpoints_immediately_if_le_main_reader_lo() {
631 let (processor_tx, processor_rx) = mpsc::channel(10);
632 let (collector_tx, mut collector_rx) = mpsc::channel(10);
633 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(5))));
634 let metrics = test_metrics();
635
636 let collector = collector(
637 Arc::new(TestHandler),
638 CommitterConfig {
639 collect_interval_ms: 200_000,
642 ..CommitterConfig::default()
643 },
644 processor_rx,
645 collector_tx,
646 main_reader_lo.clone(),
647 metrics.clone(),
648 );
649
650 let eager_rows_plus_one = TestHandler::MIN_EAGER_ROWS + 1;
651
652 let test_data: Vec<_> = [1, 5, 2, 6, 4, 3]
653 .into_iter()
654 .map(|cp| IndexedCheckpoint::new(0, cp, 10, 1000, vec![Entry; eager_rows_plus_one]))
655 .collect();
656 for data in test_data {
657 processor_tx.send(data).await.unwrap();
658 }
659 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
660
661 assert_eq!(batch.watermark.len(), 6);
663 assert_eq!(
665 metrics
666 .total_collector_checkpoints_received
667 .with_label_values(&[TestHandler::NAME])
668 .get(),
669 6
670 );
671 assert_eq!(
673 metrics
674 .total_collector_skipped_checkpoints
675 .with_label_values(&[TestHandler::NAME])
676 .get(),
677 4
678 );
679 assert_eq!(batch.batch_len, eager_rows_plus_one * 2);
681
682 collector.shutdown().await.unwrap();
683 }
684
685 #[tokio::test(start_paused = true)]
690 async fn test_collector_only_filters_whole_checkpoints() {
691 let (processor_tx, processor_rx) = mpsc::channel(10);
692 let (collector_tx, mut collector_rx) = mpsc::channel(10);
693 let main_reader_lo = Arc::new(SetOnce::new_with(Some(AtomicU64::new(0))));
694
695 let metrics = test_metrics();
696
697 let collector = collector(
698 Arc::new(TestHandler),
699 CommitterConfig::default(),
700 processor_rx,
701 collector_tx,
702 main_reader_lo.clone(),
703 metrics.clone(),
704 );
705
706 let more_than_max_chunk_rows = TEST_MAX_CHUNK_ROWS + 10;
707
708 let test_data =
709 IndexedCheckpoint::new(0, 1, 10, 1000, vec![Entry; more_than_max_chunk_rows]);
710 processor_tx.send(test_data).await.unwrap();
711 tokio::time::advance(Duration::from_secs(1)).await;
712 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
713
714 assert_eq!(batch.batch_len, TEST_MAX_CHUNK_ROWS);
716
717 let test_data: Vec<_> = (2..=5)
719 .map(|cp| {
720 IndexedCheckpoint::new(
721 0,
722 cp,
723 10,
724 1000,
725 vec![Entry; TestHandler::MIN_EAGER_ROWS + 1],
726 )
727 })
728 .collect();
729 for data in test_data {
730 processor_tx.send(data).await.unwrap();
731 }
732 let atomic = main_reader_lo.get().unwrap();
733 atomic.store(4, Ordering::Relaxed);
734 tokio::time::advance(Duration::from_secs(10)).await;
735
736 let batch = recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
737
738 assert_eq!(batch.batch_len, 10);
740 assert_eq!(batch.watermark[0].watermark.checkpoint_hi_inclusive, 1);
741
742 recv_with_timeout(&mut collector_rx, Duration::from_secs(2)).await;
743
744 assert_eq!(
745 metrics
746 .total_collector_skipped_checkpoints
747 .with_label_values(&[TestHandler::NAME])
748 .get(),
749 2
750 );
751 assert_eq!(
752 metrics
753 .total_collector_checkpoints_received
754 .with_label_values(&[TestHandler::NAME])
755 .get(),
756 5
757 );
758
759 collector.shutdown().await.unwrap();
760 }
761}