1use std::sync::Arc;
5use std::time::Duration;
6
7use async_trait::async_trait;
8use serde::Deserialize;
9use serde::Serialize;
10use sui_futures::service::Service;
11use tokio::sync::SetOnce;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::Task;
16use crate::config::ConcurrencyConfig;
17use crate::ingestion::ingestion_client::CheckpointEnvelope;
18use crate::metrics::IndexerMetrics;
19use crate::pipeline::CommitterConfig;
20use crate::pipeline::IngestionConfig;
21use crate::pipeline::Processor;
22use crate::pipeline::WatermarkPart;
23use crate::pipeline::concurrent::collector::collector;
24use crate::pipeline::concurrent::commit_watermark::commit_watermark;
25use crate::pipeline::concurrent::committer::committer;
26use crate::pipeline::concurrent::main_reader_lo::track_main_reader_lo;
27use crate::pipeline::concurrent::pruner::pruner;
28use crate::pipeline::concurrent::reader_watermark::reader_watermark;
29use crate::pipeline::processor::processor;
30use crate::store::ConcurrentStore;
31use crate::store::Store;
32
33mod collector;
34mod commit_watermark;
35mod committer;
36mod main_reader_lo;
37mod pruner;
38mod reader_watermark;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum BatchStatus {
43 Pending,
45 Ready,
47}
48
49#[async_trait]
69pub trait Handler: Processor {
70 type Store: ConcurrentStore;
71 type Batch: Default + Send + Sync + 'static;
72
73 const MIN_EAGER_ROWS: usize = 50;
75
76 const MAX_PENDING_ROWS: usize = 5000;
78
79 const MAX_WATERMARK_UPDATES: usize = 10_000;
83
84 fn batch(
93 &self,
94 batch: &mut Self::Batch,
95 values: &mut std::vec::IntoIter<Self::Value>,
96 ) -> BatchStatus;
97
98 async fn commit<'a>(
100 &self,
101 batch: &Self::Batch,
102 conn: &mut <Self::Store as Store>::Connection<'a>,
103 ) -> anyhow::Result<usize>;
104
105 async fn prune<'a>(
108 &self,
109 _from: u64,
110 _to_exclusive: u64,
111 _conn: &mut <Self::Store as Store>::Connection<'a>,
112 ) -> anyhow::Result<usize> {
113 Ok(0)
114 }
115}
116
117#[derive(Serialize, Deserialize, Debug, Clone, Default)]
119pub struct ConcurrentConfig {
120 pub committer: CommitterConfig,
122
123 pub ingestion: IngestionConfig,
125
126 pub pruner: Option<PrunerConfig>,
128
129 pub fanout: Option<ConcurrencyConfig>,
131
132 pub min_eager_rows: Option<usize>,
134
135 pub max_pending_rows: Option<usize>,
137
138 pub max_watermark_updates: Option<usize>,
140
141 pub processor_channel_size: Option<usize>,
143
144 pub collector_channel_size: Option<usize>,
146
147 pub committer_channel_size: Option<usize>,
149}
150
151#[derive(Serialize, Deserialize, Debug, Clone)]
152pub struct PrunerConfig {
153 pub interval_ms: u64,
155
156 pub delay_ms: u64,
159
160 pub retention: u64,
162
163 pub max_chunk_size: u64,
165
166 pub prune_concurrency: u64,
168}
169
170struct BatchedRows<H: Handler> {
176 batch: H::Batch,
178 batch_len: usize,
180 watermark: Vec<WatermarkPart>,
182}
183
184impl<H, V> BatchedRows<H>
185where
186 H: Handler<Batch = Vec<V>, Value = V>,
187{
188 #[cfg(test)]
189 pub fn from_vec(batch: Vec<V>, watermark: Vec<WatermarkPart>) -> Self {
190 let batch_len = batch.len();
191 Self {
192 batch,
193 batch_len,
194 watermark,
195 }
196 }
197}
198
199impl PrunerConfig {
200 pub fn interval(&self) -> Duration {
201 Duration::from_millis(self.interval_ms)
202 }
203
204 pub fn delay(&self) -> Duration {
205 Duration::from_millis(self.delay_ms)
206 }
207}
208
209impl Default for PrunerConfig {
210 fn default() -> Self {
211 Self {
212 interval_ms: 300_000,
213 delay_ms: 120_000,
214 retention: 4_000_000,
215 max_chunk_size: 2_000,
216 prune_concurrency: 1,
217 }
218 }
219}
220
221pub(crate) fn pipeline<H: Handler>(
241 handler: H,
242 next_checkpoint: u64,
243 config: ConcurrentConfig,
244 store: H::Store,
245 task: Option<Task>,
246 checkpoint_rx: mpsc::Receiver<Arc<CheckpointEnvelope>>,
247 metrics: Arc<IndexerMetrics>,
248) -> Service {
249 info!(
250 pipeline = H::NAME,
251 "Starting pipeline with config: {config:#?}",
252 );
253
254 let ConcurrentConfig {
255 committer: committer_config,
256 ingestion: _,
257 pruner: pruner_config,
258 fanout,
259 min_eager_rows,
260 max_pending_rows,
261 max_watermark_updates,
262 processor_channel_size,
263 collector_channel_size,
264 committer_channel_size,
265 } = config;
266
267 let concurrency = fanout.unwrap_or(ConcurrencyConfig::Adaptive {
268 initial: 1,
269 min: 1,
270 max: num_cpus::get(),
271 dead_band: None,
272 });
273 let min_eager_rows = min_eager_rows.unwrap_or(H::MIN_EAGER_ROWS);
274 let max_pending_rows = max_pending_rows.unwrap_or(H::MAX_PENDING_ROWS);
275 let max_watermark_updates = max_watermark_updates.unwrap_or(H::MAX_WATERMARK_UPDATES);
276
277 let processor_channel_size = processor_channel_size.unwrap_or(num_cpus::get() / 2);
278 let (processor_tx, collector_rx) = mpsc::channel(processor_channel_size);
279
280 let collector_channel_size = collector_channel_size.unwrap_or(num_cpus::get() / 2);
281 let (collector_tx, committer_rx) = mpsc::channel(collector_channel_size);
283 let committer_channel_size = committer_channel_size.unwrap_or(num_cpus::get());
285 let (committer_tx, watermark_rx) = mpsc::channel(committer_channel_size);
286 let main_reader_lo = Arc::new(SetOnce::new());
287
288 let handler = Arc::new(handler);
289
290 let s_processor = processor(
291 handler.clone(),
292 checkpoint_rx,
293 processor_tx,
294 metrics.clone(),
295 concurrency,
296 store.clone(),
297 );
298
299 let s_collector = collector::<H>(
300 handler.clone(),
301 committer_config.clone(),
302 collector_rx,
303 collector_tx,
304 main_reader_lo.clone(),
305 metrics.clone(),
306 min_eager_rows,
307 max_pending_rows,
308 max_watermark_updates,
309 );
310
311 let s_committer = committer::<H>(
312 handler.clone(),
313 committer_config.clone(),
314 committer_rx,
315 committer_tx,
316 store.clone(),
317 metrics.clone(),
318 );
319
320 let s_commit_watermark = commit_watermark::<H>(
321 next_checkpoint,
322 committer_config,
323 watermark_rx,
324 store.clone(),
325 task.as_ref().map(|t| t.task.clone()),
326 metrics.clone(),
327 );
328
329 let s_track_reader_lo = track_main_reader_lo::<H>(
330 main_reader_lo.clone(),
331 task.as_ref().map(|t| t.reader_interval),
332 store.clone(),
333 );
334
335 let s_reader_watermark =
336 reader_watermark::<H>(pruner_config.clone(), store.clone(), metrics.clone());
337
338 let s_pruner = pruner(handler, pruner_config, store, metrics);
339
340 s_processor
341 .merge(s_collector)
342 .merge(s_committer)
343 .merge(s_commit_watermark)
344 .attach(s_track_reader_lo)
345 .attach(s_reader_watermark)
346 .attach(s_pruner)
347}
348
349#[cfg(test)]
350mod tests {
351 use std::sync::Arc;
352 use std::time::Duration;
353
354 use prometheus::Registry;
355 use sui_types::digests::CheckpointDigest;
356 use tokio::sync::mpsc;
357 use tokio::time::timeout;
358
359 use crate::FieldCount;
360 use crate::metrics::IndexerMetrics;
361 use crate::mocks::store::FallibleMockConnection;
362 use crate::mocks::store::FallibleMockStore;
363 use crate::pipeline::Processor;
364 use crate::types::full_checkpoint_content::Checkpoint;
365 use crate::types::test_checkpoint_data_builder::TestCheckpointBuilder;
366
367 use super::*;
368
369 const TEST_TIMEOUT: Duration = Duration::from_secs(60);
370 const TEST_SUBSCRIBER_CHANNEL_SIZE: usize = 3; #[derive(Clone, Debug, FieldCount)]
373 struct TestValue {
374 checkpoint: u64,
375 data: u64,
376 }
377
378 struct DataPipeline;
379
380 #[async_trait]
381 impl Processor for DataPipeline {
382 const NAME: &'static str = "test_handler";
383 type Value = TestValue;
384
385 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
386 let cp_num = checkpoint.summary.sequence_number;
387
388 Ok(vec![
390 TestValue {
391 checkpoint: cp_num,
392 data: cp_num * 10 + 1,
393 },
394 TestValue {
395 checkpoint: cp_num,
396 data: cp_num * 10 + 2,
397 },
398 ])
399 }
400 }
401
402 #[async_trait]
403 impl Handler for DataPipeline {
404 type Store = FallibleMockStore;
405 type Batch = Vec<TestValue>;
406
407 const MIN_EAGER_ROWS: usize = 1000; const MAX_PENDING_ROWS: usize = 4; const MAX_WATERMARK_UPDATES: usize = 1; fn batch(
412 &self,
413 batch: &mut Self::Batch,
414 values: &mut std::vec::IntoIter<Self::Value>,
415 ) -> BatchStatus {
416 batch.extend(values);
418 BatchStatus::Pending
419 }
420
421 async fn commit<'a>(
422 &self,
423 batch: &Self::Batch,
424 conn: &mut FallibleMockConnection<'a>,
425 ) -> anyhow::Result<usize> {
426 let mut grouped: std::collections::HashMap<u64, Vec<u64>> =
428 std::collections::HashMap::new();
429 for value in batch {
430 grouped
431 .entry(value.checkpoint)
432 .or_default()
433 .push(value.data);
434 }
435
436 conn.0.commit_bulk_data(DataPipeline::NAME, grouped).await
438 }
439
440 async fn prune<'a>(
441 &self,
442 from: u64,
443 to_exclusive: u64,
444 conn: &mut FallibleMockConnection<'a>,
445 ) -> anyhow::Result<usize> {
446 conn.0.prune_data(DataPipeline::NAME, from, to_exclusive)
447 }
448 }
449
450 struct TestSetup {
451 store: FallibleMockStore,
452 checkpoint_tx: mpsc::Sender<Arc<CheckpointEnvelope>>,
453 #[allow(unused)]
454 pipeline: Service,
455 }
456
457 impl TestSetup {
458 async fn new(
459 config: ConcurrentConfig,
460 store: FallibleMockStore,
461 next_checkpoint: u64,
462 ) -> Self {
463 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(TEST_SUBSCRIBER_CHANNEL_SIZE);
464 let metrics = IndexerMetrics::new(None, &Registry::default());
465
466 let pipeline = pipeline(
467 DataPipeline,
468 next_checkpoint,
469 config,
470 store.clone(),
471 None,
472 checkpoint_rx,
473 metrics,
474 );
475
476 Self {
477 store,
478 checkpoint_tx,
479 pipeline,
480 }
481 }
482
483 async fn send_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
484 let checkpoint_envelope = Arc::new(CheckpointEnvelope {
485 checkpoint: Arc::new(
486 TestCheckpointBuilder::new(checkpoint)
487 .with_epoch(1)
488 .with_network_total_transactions(checkpoint * 2)
489 .with_timestamp_ms(1000000000 + checkpoint * 1000)
490 .build_checkpoint(),
491 ),
492 chain_id: CheckpointDigest::new([1; 32]).into(),
493 });
494 self.checkpoint_tx.send(checkpoint_envelope).await?;
495 Ok(())
496 }
497
498 async fn send_checkpoint_with_timeout(
499 &self,
500 checkpoint: u64,
501 timeout_duration: Duration,
502 ) -> anyhow::Result<()> {
503 timeout(timeout_duration, self.send_checkpoint(checkpoint)).await?
504 }
505
506 async fn send_checkpoint_expect_timeout(
507 &self,
508 checkpoint: u64,
509 timeout_duration: Duration,
510 ) {
511 timeout(timeout_duration, self.send_checkpoint(checkpoint))
512 .await
513 .unwrap_err(); }
515 }
516
517 #[tokio::test]
518 async fn test_e2e_pipeline() {
519 let config = ConcurrentConfig {
520 pruner: Some(PrunerConfig {
521 interval_ms: 5_000, delay_ms: 100, retention: 3, ..Default::default()
525 }),
526 ..Default::default()
527 };
528 let store = FallibleMockStore::default();
529 let setup = TestSetup::new(config, store, 0).await;
530
531 for i in 0..3 {
533 setup
534 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
535 .await
536 .unwrap();
537 }
538
539 for i in 0..3 {
541 let data = setup
542 .store
543 .wait_for_data(DataPipeline::NAME, i, TEST_TIMEOUT)
544 .await;
545 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
546 }
547
548 for i in 3..6 {
550 setup
551 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
552 .await
553 .unwrap();
554 }
555
556 for i in 0..6 {
559 let data = setup
560 .store
561 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
562 .await;
563 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
564 }
565
566 let pruning_deadline = Duration::from_secs(15);
570 let start = tokio::time::Instant::now();
571 loop {
572 let pruned = {
573 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
574 !data.contains_key(&0) && !data.contains_key(&1) && !data.contains_key(&2)
575 };
576 if pruned {
577 break;
578 }
579 assert!(
580 start.elapsed() < pruning_deadline,
581 "Timed out waiting for pruning to occur"
582 );
583 tokio::time::sleep(Duration::from_millis(100)).await;
584 }
585
586 {
588 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
589 assert!(data.contains_key(&3));
590 assert!(data.contains_key(&4));
591 assert!(data.contains_key(&5));
592 };
593 }
594
595 #[tokio::test]
596 async fn test_e2e_pipeline_without_pruning() {
597 let config = ConcurrentConfig {
598 pruner: None,
599 ..Default::default()
600 };
601 let store = FallibleMockStore::default();
602 let setup = TestSetup::new(config, store, 0).await;
603
604 for i in 0..10 {
606 setup
607 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
608 .await
609 .unwrap();
610 }
611
612 let watermark = setup
614 .store
615 .wait_for_watermark(DataPipeline::NAME, 9, TEST_TIMEOUT)
616 .await;
617
618 for i in 0..10 {
620 let data = setup
621 .store
622 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
623 .await;
624 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
625 }
626
627 assert_eq!(watermark.checkpoint_hi_inclusive, Some(9));
629 assert_eq!(watermark.tx_hi, 18); assert_eq!(watermark.timestamp_ms_hi_inclusive, 1000009000); let total_checkpoints = {
634 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
635 data.len()
636 };
637 assert_eq!(total_checkpoints, 10);
638 }
639
640 #[tokio::test]
641 async fn test_out_of_order_processing() {
642 let config = ConcurrentConfig::default();
643 let store = FallibleMockStore::default();
644 let setup = TestSetup::new(config, store, 0).await;
645
646 let checkpoints = vec![2, 0, 4, 1, 3];
648 for cp in checkpoints {
649 setup
650 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
651 .await
652 .unwrap();
653 }
654
655 setup
657 .store
658 .wait_for_watermark(DataPipeline::NAME, 4, Duration::from_secs(5))
659 .await;
660
661 for i in 0..5 {
663 let data = setup
664 .store
665 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
666 .await;
667 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
668 }
669 }
670
671 #[tokio::test]
672 async fn test_watermark_progression_with_gaps() {
673 let config = ConcurrentConfig::default();
674 let store = FallibleMockStore::default();
675 let setup = TestSetup::new(config, store, 0).await;
676
677 for cp in [0, 1, 3, 4] {
679 setup
680 .send_checkpoint_with_timeout(cp, Duration::from_millis(200))
681 .await
682 .unwrap();
683 }
684
685 tokio::time::sleep(Duration::from_secs(1)).await;
687
688 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
690 assert_eq!(watermark.checkpoint_hi_inclusive, Some(1));
691
692 setup
694 .send_checkpoint_with_timeout(2, Duration::from_millis(200))
695 .await
696 .unwrap();
697
698 let watermark = setup
700 .store
701 .wait_for_watermark(DataPipeline::NAME, 4, TEST_TIMEOUT)
702 .await;
703 assert_eq!(watermark.checkpoint_hi_inclusive, Some(4));
704 }
705
706 #[tokio::test]
709 async fn test_back_pressure_collector_max_pending_rows() {
710 let config = ConcurrentConfig {
723 committer: CommitterConfig {
724 collect_interval_ms: 5_000, write_concurrency: 1,
726 ..Default::default()
727 },
728 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
729 processor_channel_size: Some(7),
730 collector_channel_size: Some(6),
731 ..Default::default()
732 };
733 let store = FallibleMockStore::default();
734 let setup = TestSetup::new(config, store, 0).await;
735
736 tokio::time::sleep(Duration::from_millis(200)).await;
738
739 for i in 0..14 {
752 setup
753 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
754 .await
755 .unwrap();
756 }
757
758 setup
760 .send_checkpoint_expect_timeout(14, Duration::from_millis(200))
761 .await;
762
763 setup
765 .send_checkpoint_with_timeout(14, TEST_TIMEOUT)
766 .await
767 .unwrap();
768
769 let data = setup
771 .store
772 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
773 .await;
774 assert_eq!(data, vec![1, 2]);
775 }
776
777 #[tokio::test]
778 async fn test_back_pressure_committer_slow_commits() {
779 let config = ConcurrentConfig {
792 committer: CommitterConfig {
793 write_concurrency: 1, collect_interval_ms: 10,
798 ..Default::default()
799 },
800 fanout: Some(ConcurrencyConfig::Fixed { value: 2 }),
801 processor_channel_size: Some(7),
802 collector_channel_size: Some(6),
803 ..Default::default()
804 };
805 let store = FallibleMockStore::default().with_commit_delay(10_000); let setup = TestSetup::new(config, store, 0).await;
807
808 for i in 0..19 {
822 setup
823 .send_checkpoint_with_timeout(i, Duration::from_millis(100))
824 .await
825 .unwrap();
826 }
827
828 let mut back_pressure_checkpoint = None;
833 for checkpoint in 19..22 {
834 if setup
835 .send_checkpoint_with_timeout(checkpoint, Duration::from_millis(100))
836 .await
837 .is_err()
838 {
839 back_pressure_checkpoint = Some(checkpoint);
840 break;
841 }
842 }
843 assert!(
844 back_pressure_checkpoint.is_some(),
845 "Back pressure should occur between checkpoints 19-21"
846 );
847
848 setup
850 .store
851 .wait_for_any_data(DataPipeline::NAME, TEST_TIMEOUT)
852 .await;
853
854 setup
856 .send_checkpoint_with_timeout(back_pressure_checkpoint.unwrap(), TEST_TIMEOUT)
857 .await
858 .unwrap();
859 }
860
861 #[tokio::test]
864 async fn test_commit_failure_retry() {
865 let config = ConcurrentConfig::default();
866 let store = FallibleMockStore::default().with_commit_failures(2); let setup = TestSetup::new(config, store, 0).await;
868
869 setup
871 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
872 .await
873 .unwrap();
874
875 setup
877 .store
878 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
879 .await;
880
881 let data = setup
883 .store
884 .wait_for_data(DataPipeline::NAME, 0, Duration::from_secs(1))
885 .await;
886 assert_eq!(data, vec![1, 2]);
887 }
888
889 #[tokio::test]
890 async fn test_prune_failure_retry() {
891 let config = ConcurrentConfig {
892 pruner: Some(PrunerConfig {
893 interval_ms: 2000, delay_ms: 100, retention: 2, ..Default::default()
897 }),
898 ..Default::default()
899 };
900
901 let store = FallibleMockStore::default().with_prune_failures(0, 2, 1);
903 let setup = TestSetup::new(config, store, 0).await;
904
905 for i in 0..4 {
907 setup
908 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
909 .await
910 .unwrap();
911 }
912
913 for i in 0..4 {
916 let data = setup
917 .store
918 .wait_for_data(DataPipeline::NAME, i, Duration::from_secs(1))
919 .await;
920 assert_eq!(data, vec![i * 10 + 1, i * 10 + 2]);
921 }
922
923 setup
925 .store
926 .wait_for_prune_attempts(0, 2, 1, TEST_TIMEOUT)
927 .await;
928 {
929 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
930 for i in 0..4 {
931 assert!(data.contains_key(&i));
932 }
933 };
934
935 setup
937 .store
938 .wait_for_prune_attempts(0, 2, 2, TEST_TIMEOUT)
939 .await;
940 {
941 let data = setup.store.data.get(DataPipeline::NAME).unwrap();
942 assert!(data.contains_key(&2));
944 assert!(data.contains_key(&3));
945
946 assert!(!data.contains_key(&0));
948 assert!(!data.contains_key(&1));
949 };
950 }
951
952 #[tokio::test]
953 async fn test_reader_watermark_failure_retry() {
954 let config = ConcurrentConfig {
955 pruner: Some(PrunerConfig {
956 interval_ms: 100, delay_ms: 100, retention: 3, ..Default::default()
960 }),
961 ..Default::default()
962 };
963
964 let store = FallibleMockStore::default().with_reader_watermark_failures(2);
966 let setup = TestSetup::new(config, store, 0).await;
967
968 for i in 0..6 {
970 setup
971 .send_checkpoint_with_timeout(i, Duration::from_millis(200))
972 .await
973 .unwrap();
974 }
975
976 setup
978 .store
979 .wait_for_watermark(DataPipeline::NAME, 5, TEST_TIMEOUT)
980 .await;
981
982 tokio::time::sleep(Duration::from_secs(2)).await;
984
985 let watermark = setup.store.watermark(DataPipeline::NAME).unwrap();
987 assert_eq!(watermark.reader_lo, 3);
988 }
989
990 #[tokio::test]
991 async fn test_database_connection_failure_retry() {
992 let config = ConcurrentConfig::default();
993 let store = FallibleMockStore::default().with_connection_failures(2); let setup = TestSetup::new(config, store, 0).await;
995
996 setup
998 .send_checkpoint_with_timeout(0, Duration::from_millis(200))
999 .await
1000 .unwrap();
1001
1002 setup
1004 .store
1005 .wait_for_watermark(DataPipeline::NAME, 0, TEST_TIMEOUT)
1006 .await;
1007
1008 let data = setup
1010 .store
1011 .wait_for_data(DataPipeline::NAME, 0, TEST_TIMEOUT)
1012 .await;
1013 assert_eq!(data, vec![1, 2]);
1014 }
1015}