1use std::{
5 cmp::Reverse,
6 collections::{BTreeMap, BTreeSet, BinaryHeap},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27 CommitIndex,
28 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29 block_verifier::BlockVerifier,
30 commit::{CommitAPI as _, CommitRange, TrustedCommit},
31 commit_vote_monitor::CommitVoteMonitor,
32 context::Context,
33 core_thread::CoreThreadDispatcher,
34 dag_state::DagState,
35 error::{ConsensusError, ConsensusResult},
36 network::{
37 BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverNetworkService,
38 PeerId, ValidatorNetworkService,
39 },
40 round_tracker::RoundTracker,
41 stake_aggregator::{QuorumThreshold, StakeAggregator},
42 storage::Store,
43 synchronizer::SynchronizerHandle,
44 transaction_certifier::TransactionCertifier,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51 context: Arc<Context>,
52 commit_vote_monitor: Arc<CommitVoteMonitor>,
53 block_verifier: Arc<dyn BlockVerifier>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57 subscription_counter: Arc<SubscriptionCounter>,
58 transaction_certifier: TransactionCertifier,
59 dag_state: Arc<RwLock<DagState>>,
60 store: Arc<dyn Store>,
61 round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65 pub(crate) fn new(
66 context: Arc<Context>,
67 block_verifier: Arc<dyn BlockVerifier>,
68 commit_vote_monitor: Arc<CommitVoteMonitor>,
69 round_tracker: Arc<RwLock<RoundTracker>>,
70 synchronizer: Arc<SynchronizerHandle>,
71 core_dispatcher: Arc<C>,
72 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73 transaction_certifier: TransactionCertifier,
74 dag_state: Arc<RwLock<DagState>>,
75 store: Arc<dyn Store>,
76 ) -> Self {
77 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78 Self {
79 context,
80 block_verifier,
81 commit_vote_monitor,
82 synchronizer,
83 core_dispatcher,
84 rx_block_broadcast,
85 subscription_counter,
86 transaction_certifier,
87 dag_state,
88 store,
89 round_tracker,
90 }
91 }
92
93 fn parse_excluded_ancestors(
95 &self,
96 peer: AuthorityIndex,
97 block: &VerifiedBlock,
98 mut excluded_ancestors: Vec<Vec<u8>>,
99 ) -> ConsensusResult<Vec<BlockRef>> {
100 let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102 let excluded_ancestors_limit = self.context.committee.size() * 2;
103 if excluded_ancestors.len() > excluded_ancestors_limit {
104 debug!(
105 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106 excluded_ancestors.len() - excluded_ancestors_limit,
107 peer,
108 peer_hostname,
109 );
110 excluded_ancestors.truncate(excluded_ancestors_limit);
111 }
112
113 let excluded_ancestors = excluded_ancestors
114 .into_iter()
115 .map(|serialized| {
116 let block_ref: BlockRef =
117 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118 if !self.context.committee.is_valid_index(block_ref.author) {
119 return Err(ConsensusError::InvalidAuthorityIndex {
120 index: block_ref.author,
121 max: self.context.committee.size(),
122 });
123 }
124 if block_ref.round >= block.round() {
125 return Err(ConsensusError::InvalidAncestorRound {
126 ancestor: block_ref.round,
127 block: block.round(),
128 });
129 }
130 Ok(block_ref)
131 })
132 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134 for excluded_ancestor in &excluded_ancestors {
135 let excluded_ancestor_hostname = &self
136 .context
137 .committee
138 .authority(excluded_ancestor.author)
139 .hostname;
140 self.context
141 .metrics
142 .node_metrics
143 .network_excluded_ancestors_count_by_authority
144 .with_label_values(&[excluded_ancestor_hostname])
145 .inc();
146 }
147 self.context
148 .metrics
149 .node_metrics
150 .network_received_excluded_ancestors_from_authority
151 .with_label_values(&[peer_hostname])
152 .inc_by(excluded_ancestors.len() as u64);
153
154 Ok(excluded_ancestors)
155 }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160 async fn handle_send_block(
161 &self,
162 peer: AuthorityIndex,
163 serialized_block: ExtendedSerializedBlock,
164 ) -> ConsensusResult<()> {
165 fail_point_async!("consensus-rpc-response");
166
167 let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169 let signed_block: SignedBlock =
171 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173 if peer != signed_block.author() {
175 self.context
176 .metrics
177 .node_metrics
178 .invalid_blocks
179 .with_label_values(&[
180 peer_hostname.as_str(),
181 "handle_send_block",
182 "UnexpectedAuthority",
183 ])
184 .inc();
185 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186 info!("Block with wrong authority from {}: {}", peer, e);
187 return Err(e);
188 }
189
190 let (verified_block, reject_txn_votes) = self
192 .block_verifier
193 .verify_and_vote(signed_block, serialized_block.block)
194 .tap_err(|e| {
195 self.context
196 .metrics
197 .node_metrics
198 .invalid_blocks
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200 .inc();
201 info!("Invalid block from {}: {}", peer, e);
202 })?;
203 let excluded_ancestors = self
204 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205 .tap_err(|e| {
206 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207 self.context
208 .metrics
209 .node_metrics
210 .invalid_blocks
211 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212 .inc();
213 })?;
214
215 let block_ref = verified_block.reference();
216 debug!("Received block {} via send block.", block_ref);
217
218 self.context
219 .metrics
220 .node_metrics
221 .verified_blocks
222 .with_label_values(&[peer_hostname])
223 .inc();
224
225 let now = self.context.clock.timestamp_utc_ms();
226 let forward_time_drift =
227 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229 self.context
230 .metrics
231 .node_metrics
232 .block_timestamp_drift_ms
233 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234 .inc_by(forward_time_drift.as_millis() as u64);
235
236 self.commit_vote_monitor.observe_block(&verified_block);
239
240 self.round_tracker
242 .write()
243 .update_from_verified_block(&ExtendedBlock {
244 block: verified_block.clone(),
245 excluded_ancestors: excluded_ancestors.clone(),
246 });
247
248 let last_commit_index = self.dag_state.read().last_commit_index();
257 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258 if last_commit_index
261 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262 < quorum_commit_index
263 {
264 self.context
265 .metrics
266 .node_metrics
267 .rejected_blocks
268 .with_label_values(&["commit_lagging"])
269 .inc();
270 debug!(
271 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272 block_ref, last_commit_index, quorum_commit_index,
273 );
274 return Err(ConsensusError::BlockRejected {
275 block_ref,
276 reason: format!(
277 "Last commit index is lagging quorum commit index too much ({} < {})",
278 last_commit_index, quorum_commit_index,
279 ),
280 });
281 }
282
283 if self.context.protocol_config.transaction_voting_enabled() {
286 self.transaction_certifier
287 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288 }
289
290 let missing_ancestors = self
292 .core_dispatcher
293 .add_blocks(vec![verified_block.clone()])
294 .await
295 .map_err(|_| ConsensusError::Shutdown)?;
296
297 if !missing_ancestors.is_empty() {
299 self.context
300 .metrics
301 .node_metrics
302 .handler_received_block_missing_ancestors
303 .with_label_values(&[peer_hostname])
304 .inc_by(missing_ancestors.len() as u64);
305 let synchronizer = self.synchronizer.clone();
306 spawn_monitored_task!(async move {
307 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313 }
314 });
315 }
316
317 let missing_excluded_ancestors = self
319 .core_dispatcher
320 .check_block_refs(excluded_ancestors)
321 .await
322 .map_err(|_| ConsensusError::Shutdown)?;
323 if !missing_excluded_ancestors.is_empty() {
324 self.context
325 .metrics
326 .node_metrics
327 .network_excluded_ancestors_sent_to_fetch
328 .with_label_values(&[peer_hostname])
329 .inc_by(missing_excluded_ancestors.len() as u64);
330
331 let synchronizer = self.synchronizer.clone();
332 spawn_monitored_task!(async move {
333 if let Err(err) = synchronizer
334 .fetch_blocks(missing_excluded_ancestors, peer)
335 .await
336 {
337 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338 }
339 });
340 }
341
342 Ok(())
343 }
344
345 async fn handle_subscribe_blocks(
346 &self,
347 peer: AuthorityIndex,
348 last_received: Round,
349 ) -> ConsensusResult<BlockStream> {
350 fail_point_async!("consensus-rpc-response");
351
352 let past_proposed_blocks = {
361 let dag_state = self.dag_state.read();
362
363 let mut proposed_blocks =
364 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365 if proposed_blocks.is_empty() {
366 let last_proposed_block = dag_state.get_last_proposed_block();
367 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
368 vec![last_proposed_block]
369 } else {
370 vec![]
371 };
372 }
373 stream::iter(
374 proposed_blocks
375 .into_iter()
376 .map(|block| ExtendedSerializedBlock {
377 block: block.serialized().clone(),
378 excluded_ancestors: vec![],
379 }),
380 )
381 };
382
383 let broadcasted_blocks = BroadcastedBlockStream::new(
384 PeerId::Validator(peer),
385 self.rx_block_broadcast.resubscribe(),
386 self.subscription_counter.clone(),
387 );
388
389 Ok(Box::pin(past_proposed_blocks.chain(
391 broadcasted_blocks.map(ExtendedSerializedBlock::from),
392 )))
393 }
394
395 async fn handle_fetch_blocks(
411 &self,
412 _peer: AuthorityIndex,
413 mut block_refs: Vec<BlockRef>,
414 fetch_after_rounds: Vec<Round>,
415 fetch_missing_ancestors: bool,
416 ) -> ConsensusResult<Vec<Bytes>> {
417 fail_point_async!("consensus-rpc-response");
418
419 if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
420 return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
421 }
422 if !fetch_after_rounds.is_empty()
423 && fetch_after_rounds.len() != self.context.committee.size()
424 {
425 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
426 fetch_after_rounds.len(),
427 self.context.committee.size(),
428 ));
429 }
430
431 let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
433 self.context.parameters.max_blocks_per_sync
434 } else {
435 self.context.parameters.max_blocks_per_fetch
436 };
437 if block_refs.len() > max_response_num_blocks {
438 block_refs.truncate(max_response_num_blocks);
439 }
440
441 for block in &block_refs {
443 if !self.context.committee.is_valid_index(block.author) {
444 return Err(ConsensusError::InvalidAuthorityIndex {
445 index: block.author,
446 max: self.context.committee.size(),
447 });
448 }
449 if block.round == GENESIS_ROUND {
450 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
451 }
452 }
453
454 let mut blocks = self
456 .dag_state
457 .read()
458 .get_blocks(&block_refs)
459 .into_iter()
460 .flatten()
461 .collect::<Vec<_>>();
462
463 if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
466 if fetch_missing_ancestors {
467 let missing_ancestors = blocks
470 .iter()
471 .flat_map(|block| block.ancestors().to_vec())
472 .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
473 .collect::<BTreeSet<_>>()
474 .into_iter()
475 .collect::<Vec<_>>();
476
477 let selected_num_blocks = max_response_num_blocks
480 .saturating_sub(blocks.len())
481 .min(missing_ancestors.len());
482 if selected_num_blocks > 0 {
483 let selected_ancestor_refs = missing_ancestors
484 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
485 .copied()
486 .collect::<Vec<_>>();
487 let ancestor_blocks = self
488 .dag_state
489 .read()
490 .get_blocks(&selected_ancestor_refs)
491 .into_iter()
492 .flatten();
493 blocks.extend(ancestor_blocks);
494 }
495 } else {
496 let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
499 if block_refs.is_empty() {
500 let dag_state = self.dag_state.read();
501 for (index, _authority) in self.context.committee.authorities() {
502 let last_block = dag_state.get_last_block_for_authority(index);
503 limit_rounds.insert(index, last_block.round());
504 }
505 } else {
506 for block_ref in &block_refs {
507 let entry = limit_rounds
508 .entry(block_ref.author)
509 .or_insert(block_ref.round);
510 *entry = (*entry).min(block_ref.round);
511 }
512 }
513
514 let mut heap = BinaryHeap::new();
517 for (authority, limit_round) in &limit_rounds {
518 let fetch_start = fetch_after_rounds[*authority] + 1;
519 if fetch_start < *limit_round {
520 heap.push(Reverse((fetch_start, *authority, *limit_round)));
521 }
522 }
523
524 while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
525 let fetched = self.store.scan_blocks_by_author_in_range(
526 authority,
527 fetch_start,
528 limit_round,
529 1,
530 )?;
531 if let Some(block) = fetched.into_iter().next() {
532 let next_start = block.round() + 1;
533 blocks.push(block);
534 if blocks.len() >= max_response_num_blocks {
535 blocks.truncate(max_response_num_blocks);
536 break;
537 }
538 if next_start < limit_round {
539 heap.push(Reverse((next_start, authority, limit_round)));
540 }
541 }
542 }
543 }
544 }
545
546 let bytes = blocks
548 .into_iter()
549 .map(|block| block.serialized().clone())
550 .collect::<Vec<_>>();
551 Ok(bytes)
552 }
553
554 async fn handle_fetch_commits(
555 &self,
556 _peer: AuthorityIndex,
557 commit_range: CommitRange,
558 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
559 fail_point_async!("consensus-rpc-response");
560
561 let inclusive_end = commit_range.end().min(
563 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
564 - 1,
565 );
566 let mut commits = self
567 .store
568 .scan_commits((commit_range.start()..=inclusive_end).into())?;
569 let mut certifier_block_refs = vec![];
570 'commit: while let Some(c) = commits.last() {
571 let index = c.index();
572 let votes = self.store.read_commit_votes(index)?;
573 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
574 for v in &votes {
575 stake_aggregator.add(v.author, &self.context.committee);
576 }
577 if stake_aggregator.reached_threshold(&self.context.committee) {
578 certifier_block_refs = votes;
579 break 'commit;
580 } else {
581 debug!(
582 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
583 index,
584 stake_aggregator.stake(),
585 stake_aggregator.threshold(&self.context.committee)
586 );
587 self.context
588 .metrics
589 .node_metrics
590 .commit_sync_fetch_commits_handler_uncertified_skipped
591 .inc();
592 commits.pop();
593 }
594 }
595 let certifier_blocks = self
596 .store
597 .read_blocks(&certifier_block_refs)?
598 .into_iter()
599 .flatten()
600 .collect();
601 Ok((commits, certifier_blocks))
602 }
603
604 async fn handle_fetch_latest_blocks(
605 &self,
606 peer: AuthorityIndex,
607 authorities: Vec<AuthorityIndex>,
608 ) -> ConsensusResult<Vec<Bytes>> {
609 fail_point_async!("consensus-rpc-response");
610
611 if authorities.len() > self.context.committee.size() {
612 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
613 }
614
615 for authority in &authorities {
617 if !self.context.committee.is_valid_index(*authority) {
618 return Err(ConsensusError::InvalidAuthorityIndex {
619 index: *authority,
620 max: self.context.committee.size(),
621 });
622 }
623 }
624
625 let mut blocks = vec![];
629 let dag_state = self.dag_state.read();
630 for authority in authorities {
631 let block = dag_state.get_last_block_for_authority(authority);
632
633 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
634
635 if block.round() != GENESIS_ROUND {
637 blocks.push(block);
638 }
639 }
640
641 let result = blocks
643 .into_iter()
644 .map(|block| block.serialized().clone())
645 .collect::<Vec<_>>();
646
647 Ok(result)
648 }
649
650 async fn handle_get_latest_rounds(
651 &self,
652 _peer: AuthorityIndex,
653 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
654 fail_point_async!("consensus-rpc-response");
655
656 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
657
658 let blocks = self
659 .dag_state
660 .read()
661 .get_last_cached_block_per_authority(Round::MAX);
662 let highest_accepted_rounds = blocks
663 .into_iter()
664 .map(|(block, _)| block.round())
665 .collect::<Vec<_>>();
666
667 Ok((highest_received_rounds, highest_accepted_rounds))
668 }
669}
670
671#[async_trait]
672impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
673 async fn handle_stream_blocks(
674 &self,
675 _peer: NodeId,
676 _highest_round_per_authority: Vec<u64>,
677 ) -> ConsensusResult<ObserverBlockStream> {
678 todo!("Observer block streaming not yet implemented")
680 }
681
682 async fn handle_fetch_blocks(
683 &self,
684 _peer: NodeId,
685 _block_refs: Vec<BlockRef>,
686 ) -> ConsensusResult<Vec<Bytes>> {
687 Err(ConsensusError::NetworkRequest(
690 "Observer fetch blocks not yet implemented".to_string(),
691 ))
692 }
693
694 async fn handle_fetch_commits(
695 &self,
696 _peer: NodeId,
697 _commit_range: CommitRange,
698 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
699 Err(ConsensusError::NetworkRequest(
701 "Observer fetch commits not yet implemented".to_string(),
702 ))
703 }
704}
705struct Counter {
706 count: usize,
707 subscriptions_by_peer: BTreeMap<PeerId, usize>,
708}
709
710pub(crate) struct SubscriptionCounter {
712 context: Arc<Context>,
713 counter: parking_lot::Mutex<Counter>,
714}
715
716impl SubscriptionCounter {
717 pub(crate) fn new(context: Arc<Context>) -> Self {
718 for (_, authority) in context.committee.authorities() {
720 context
721 .metrics
722 .node_metrics
723 .subscribed_by
724 .with_label_values(&[authority.hostname.as_str()])
725 .set(0);
726 }
727
728 Self {
729 counter: parking_lot::Mutex::new(Counter {
730 count: 0,
731 subscriptions_by_peer: BTreeMap::new(),
732 }),
733 context,
734 }
735 }
736
737 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
738 let mut counter = self.counter.lock();
739 counter.count += 1;
740 *counter
741 .subscriptions_by_peer
742 .entry(peer.clone())
743 .or_insert(0) += 1;
744
745 match peer {
746 PeerId::Validator(authority) => {
747 let peer_hostname = &self.context.committee.authority(*authority).hostname;
748 self.context
749 .metrics
750 .node_metrics
751 .subscribed_by
752 .with_label_values(&[peer_hostname])
753 .set(1);
754 }
755 PeerId::Observer(_) => {
756 self.context
757 .metrics
758 .node_metrics
759 .subscribed_by
760 .with_label_values(&["observer"])
761 .inc();
762 }
763 }
764
765 Ok(())
766 }
767
768 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
769 let mut counter = self.counter.lock();
770 counter.count -= 1;
771 *counter
772 .subscriptions_by_peer
773 .entry(peer.clone())
774 .or_insert(0) -= 1;
775
776 if counter.subscriptions_by_peer[peer] == 0 {
777 match peer {
778 PeerId::Validator(authority) => {
779 let peer_hostname = &self.context.committee.authority(*authority).hostname;
780 self.context
781 .metrics
782 .node_metrics
783 .subscribed_by
784 .with_label_values(&[peer_hostname])
785 .set(0);
786 }
787 PeerId::Observer(_) => {
788 self.context
789 .metrics
790 .node_metrics
791 .subscribed_by
792 .with_label_values(&["observer"])
793 .dec();
794 }
795 }
796 }
797
798 Ok(())
799 }
800}
801
802type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
805
806pub(crate) struct BroadcastStream<T> {
809 peer: PeerId,
810 inner: ReusableBoxFuture<
812 'static,
813 (
814 Result<T, broadcast::error::RecvError>,
815 broadcast::Receiver<T>,
816 ),
817 >,
818 subscription_counter: Arc<SubscriptionCounter>,
820}
821
822impl<T: 'static + Clone + Send> BroadcastStream<T> {
823 pub fn new(
824 peer: PeerId,
825 rx: broadcast::Receiver<T>,
826 subscription_counter: Arc<SubscriptionCounter>,
827 ) -> Self {
828 if let Err(err) = subscription_counter.increment(&peer) {
829 match err {
830 ConsensusError::Shutdown => {}
831 _ => panic!("Unexpected error: {err}"),
832 }
833 }
834 Self {
835 peer,
836 inner: ReusableBoxFuture::new(make_recv_future(rx)),
837 subscription_counter,
838 }
839 }
840}
841
842impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
843 type Item = T;
844
845 fn poll_next(
846 mut self: Pin<&mut Self>,
847 cx: &mut task::Context<'_>,
848 ) -> task::Poll<Option<Self::Item>> {
849 let peer = self.peer.clone();
850 let maybe_item = loop {
851 let (result, rx) = ready!(self.inner.poll(cx));
852 self.inner.set(make_recv_future(rx));
853
854 match result {
855 Ok(item) => break Some(item),
856 Err(broadcast::error::RecvError::Closed) => {
857 info!("Block BroadcastedBlockStream {} closed", peer);
858 break None;
859 }
860 Err(broadcast::error::RecvError::Lagged(n)) => {
861 warn!(
862 "Block BroadcastedBlockStream {} lagged by {} messages",
863 peer, n
864 );
865 continue;
866 }
867 }
868 };
869 task::Poll::Ready(maybe_item)
870 }
871}
872
873impl<T> Drop for BroadcastStream<T> {
874 fn drop(&mut self) {
875 if let Err(err) = self.subscription_counter.decrement(&self.peer) {
876 match err {
877 ConsensusError::Shutdown => {}
878 _ => panic!("Unexpected error: {err}"),
879 }
880 }
881 }
882}
883
884async fn make_recv_future<T: Clone>(
885 mut rx: broadcast::Receiver<T>,
886) -> (
887 Result<T, broadcast::error::RecvError>,
888 broadcast::Receiver<T>,
889) {
890 let result = rx.recv().await;
891 (result, rx)
892}
893
894#[cfg(test)]
897mod tests {
898 use std::{
899 collections::{BTreeMap, BTreeSet},
900 sync::Arc,
901 time::Duration,
902 };
903
904 use async_trait::async_trait;
905 use bytes::Bytes;
906 use consensus_config::AuthorityIndex;
907 use consensus_types::block::{BlockDigest, BlockRef, Round};
908 use parking_lot::{Mutex, RwLock};
909 use tokio::{sync::broadcast, time::sleep};
910
911 use futures::StreamExt as _;
912
913 use crate::{
914 authority_service::AuthorityService,
915 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
916 commit::{CertifiedCommits, CommitRange},
917 commit_vote_monitor::CommitVoteMonitor,
918 context::Context,
919 core_thread::{CoreError, CoreThreadDispatcher},
920 dag_state::DagState,
921 error::ConsensusResult,
922 network::{
923 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
924 ValidatorNetworkClient, ValidatorNetworkService,
925 },
926 round_tracker::RoundTracker,
927 storage::mem_store::MemStore,
928 synchronizer::Synchronizer,
929 test_dag_builder::DagBuilder,
930 transaction_certifier::TransactionCertifier,
931 };
932 struct FakeCoreThreadDispatcher {
933 blocks: Mutex<Vec<VerifiedBlock>>,
934 }
935
936 impl FakeCoreThreadDispatcher {
937 fn new() -> Self {
938 Self {
939 blocks: Mutex::new(vec![]),
940 }
941 }
942
943 fn get_blocks(&self) -> Vec<VerifiedBlock> {
944 self.blocks.lock().clone()
945 }
946 }
947
948 #[async_trait]
949 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
950 async fn add_blocks(
951 &self,
952 blocks: Vec<VerifiedBlock>,
953 ) -> Result<BTreeSet<BlockRef>, CoreError> {
954 let block_refs = blocks.iter().map(|b| b.reference()).collect();
955 self.blocks.lock().extend(blocks);
956 Ok(block_refs)
957 }
958
959 async fn check_block_refs(
960 &self,
961 _block_refs: Vec<BlockRef>,
962 ) -> Result<BTreeSet<BlockRef>, CoreError> {
963 Ok(BTreeSet::new())
964 }
965
966 async fn add_certified_commits(
967 &self,
968 _commits: CertifiedCommits,
969 ) -> Result<BTreeSet<BlockRef>, CoreError> {
970 todo!()
971 }
972
973 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
974 Ok(())
975 }
976
977 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
978 Ok(Default::default())
979 }
980
981 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
982 todo!()
983 }
984
985 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
986 todo!()
987 }
988 }
989
990 #[derive(Default)]
991 struct FakeNetworkClient {}
992
993 #[async_trait]
994 impl ValidatorNetworkClient for FakeNetworkClient {
995 async fn send_block(
996 &self,
997 _peer: AuthorityIndex,
998 _block: &VerifiedBlock,
999 _timeout: Duration,
1000 ) -> ConsensusResult<()> {
1001 unimplemented!("Unimplemented")
1002 }
1003
1004 async fn subscribe_blocks(
1005 &self,
1006 _peer: AuthorityIndex,
1007 _last_received: Round,
1008 _timeout: Duration,
1009 ) -> ConsensusResult<BlockStream> {
1010 unimplemented!("Unimplemented")
1011 }
1012
1013 async fn fetch_blocks(
1014 &self,
1015 _peer: AuthorityIndex,
1016 _block_refs: Vec<BlockRef>,
1017 _fetch_after_rounds: Vec<Round>,
1018 _fetch_missing_ancestors: bool,
1019 _timeout: Duration,
1020 ) -> ConsensusResult<Vec<Bytes>> {
1021 unimplemented!("Unimplemented")
1022 }
1023
1024 async fn fetch_commits(
1025 &self,
1026 _peer: AuthorityIndex,
1027 _commit_range: CommitRange,
1028 _timeout: Duration,
1029 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1030 unimplemented!("Unimplemented")
1031 }
1032
1033 async fn fetch_latest_blocks(
1034 &self,
1035 _peer: AuthorityIndex,
1036 _authorities: Vec<AuthorityIndex>,
1037 _timeout: Duration,
1038 ) -> ConsensusResult<Vec<Bytes>> {
1039 unimplemented!("Unimplemented")
1040 }
1041
1042 async fn get_latest_rounds(
1043 &self,
1044 _peer: AuthorityIndex,
1045 _timeout: Duration,
1046 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1047 unimplemented!("Unimplemented")
1048 }
1049 }
1050
1051 #[async_trait]
1052 impl ObserverNetworkClient for FakeNetworkClient {
1053 async fn stream_blocks(
1054 &self,
1055 _peer: crate::network::PeerId,
1056 _highest_round_per_authority: Vec<u64>,
1057 _timeout: Duration,
1058 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1059 unimplemented!("Unimplemented")
1060 }
1061
1062 async fn fetch_blocks(
1063 &self,
1064 _peer: crate::network::PeerId,
1065 _block_refs: Vec<BlockRef>,
1066 _timeout: Duration,
1067 ) -> ConsensusResult<Vec<Bytes>> {
1068 unimplemented!("Unimplemented")
1069 }
1070
1071 async fn fetch_commits(
1072 &self,
1073 _peer: crate::network::PeerId,
1074 _commit_range: CommitRange,
1075 _timeout: Duration,
1076 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1077 unimplemented!("Unimplemented")
1078 }
1079 }
1080
1081 #[tokio::test(flavor = "current_thread", start_paused = true)]
1082 async fn test_handle_send_block() {
1083 let (context, _keys) = Context::new_for_test(4);
1084 let context = Arc::new(context);
1085 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1086 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1087 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1088 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1089 let fake_client = Arc::new(FakeNetworkClient::default());
1090 let network_client = Arc::new(SynchronizerClient::new(
1091 context.clone(),
1092 Some(fake_client.clone()),
1093 Some(fake_client.clone()),
1094 ));
1095 let store = Arc::new(MemStore::new());
1096 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1097 let transaction_certifier =
1098 TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1099 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1100 let synchronizer = Synchronizer::start(
1101 network_client,
1102 context.clone(),
1103 core_dispatcher.clone(),
1104 commit_vote_monitor.clone(),
1105 block_verifier.clone(),
1106 transaction_certifier.clone(),
1107 round_tracker.clone(),
1108 dag_state.clone(),
1109 false,
1110 );
1111 let authority_service = Arc::new(AuthorityService::new(
1112 context.clone(),
1113 block_verifier,
1114 commit_vote_monitor,
1115 round_tracker,
1116 synchronizer,
1117 core_dispatcher.clone(),
1118 rx_block_broadcast,
1119 transaction_certifier,
1120 dag_state,
1121 store,
1122 ));
1123
1124 let now = context.clock.timestamp_utc_ms();
1126 let max_drift = context.parameters.max_forward_time_drift;
1127 let input_block = VerifiedBlock::new_for_test(
1128 TestBlock::new(9, 0)
1129 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1130 .build(),
1131 );
1132
1133 let service = authority_service.clone();
1134 let serialized = ExtendedSerializedBlock {
1135 block: input_block.serialized().clone(),
1136 excluded_ancestors: vec![],
1137 };
1138
1139 tokio::spawn({
1140 let service = service.clone();
1141 let context = context.clone();
1142 async move {
1143 service
1144 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1145 .await
1146 .unwrap();
1147 }
1148 });
1149
1150 sleep(max_drift / 2).await;
1151
1152 let blocks = core_dispatcher.get_blocks();
1153 assert_eq!(blocks.len(), 1);
1154 assert_eq!(blocks[0], input_block);
1155
1156 let invalid_block =
1158 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1159 let extended_block = ExtendedSerializedBlock {
1160 block: invalid_block.serialized().clone(),
1161 excluded_ancestors: vec![],
1162 };
1163 service
1164 .handle_send_block(
1165 context.committee.to_authority_index(0).unwrap(),
1166 extended_block,
1167 )
1168 .await
1169 .unwrap_err();
1170
1171 let invalid_excluded_ancestors = vec![
1173 bcs::to_bytes(&BlockRef::new(
1174 10,
1175 AuthorityIndex::new_for_test(1000),
1176 BlockDigest::MIN,
1177 ))
1178 .unwrap(),
1179 vec![3u8; 40],
1180 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1181 ];
1182 let extended_block = ExtendedSerializedBlock {
1183 block: input_block.serialized().clone(),
1184 excluded_ancestors: invalid_excluded_ancestors,
1185 };
1186 service
1187 .handle_send_block(
1188 context.committee.to_authority_index(0).unwrap(),
1189 extended_block,
1190 )
1191 .await
1192 .unwrap_err();
1193 }
1194
1195 #[tokio::test(flavor = "current_thread", start_paused = true)]
1196 async fn test_handle_fetch_blocks() {
1197 const NUM_AUTHORITIES: usize = 40;
1200 const NUM_ROUNDS: usize = 40;
1201 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1202 context.parameters.max_blocks_per_fetch = 50;
1203 let context = Arc::new(context);
1204 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1205 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1206 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1207 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1208 let fake_client = Arc::new(FakeNetworkClient::default());
1209 let network_client = Arc::new(SynchronizerClient::new(
1210 context.clone(),
1211 Some(fake_client.clone()),
1212 Some(fake_client.clone()),
1213 ));
1214 let store = Arc::new(MemStore::new());
1215 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1216 let transaction_certifier =
1217 TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1218 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1219 let synchronizer = Synchronizer::start(
1220 network_client,
1221 context.clone(),
1222 core_dispatcher.clone(),
1223 commit_vote_monitor.clone(),
1224 block_verifier.clone(),
1225 transaction_certifier.clone(),
1226 round_tracker.clone(),
1227 dag_state.clone(),
1228 false,
1229 );
1230 let authority_service = Arc::new(AuthorityService::new(
1231 context.clone(),
1232 block_verifier,
1233 commit_vote_monitor,
1234 round_tracker,
1235 synchronizer,
1236 core_dispatcher.clone(),
1237 rx_block_broadcast,
1238 transaction_certifier,
1239 dag_state.clone(),
1240 store,
1241 ));
1242
1243 let mut dag_builder = DagBuilder::new(context.clone());
1245 dag_builder
1246 .layers(1..=(NUM_ROUNDS as u32))
1247 .build()
1248 .persist_layers(dag_state.clone());
1249 dag_state.write().flush();
1250 let all_blocks = dag_builder.all_blocks();
1251
1252 let missing_block_refs: Vec<BlockRef> = all_blocks
1254 .iter()
1255 .rev()
1256 .take(2)
1257 .map(|b| b.reference())
1258 .collect();
1259 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1260 let results = authority_service
1261 .handle_fetch_blocks(
1262 AuthorityIndex::new_for_test(0),
1263 missing_block_refs.clone(),
1264 highest_accepted_rounds,
1265 true,
1266 )
1267 .await
1268 .unwrap();
1269
1270 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1272 .iter()
1273 .map(|b| {
1274 let signed = bcs::from_bytes(b).unwrap();
1275 let block = VerifiedBlock::new_verified(signed, b.clone());
1276 (block.reference(), block)
1277 })
1278 .collect();
1279 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1280 for b in &missing_block_refs {
1282 assert!(blocks.contains_key(b));
1283 }
1284 let num_missing_ancestors = blocks
1285 .keys()
1286 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1287 .count();
1288 assert_eq!(
1289 num_missing_ancestors,
1290 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1291 );
1292
1293 let missing_round = NUM_ROUNDS as Round - 3;
1295 let missing_block_refs: Vec<BlockRef> = all_blocks
1296 .iter()
1297 .filter(|b| b.reference().round == missing_round)
1298 .map(|b| b.reference())
1299 .take(2)
1300 .collect();
1301 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1302 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1304 let results = authority_service
1305 .handle_fetch_blocks(
1306 AuthorityIndex::new_for_test(0),
1307 missing_block_refs.clone(),
1308 highest_accepted_rounds,
1309 false,
1310 )
1311 .await
1312 .unwrap();
1313
1314 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1316 .iter()
1317 .map(|b| {
1318 let signed = bcs::from_bytes(b).unwrap();
1319 let block = VerifiedBlock::new_verified(signed, b.clone());
1320 (block.reference(), block)
1321 })
1322 .collect();
1323 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1324 for b in &missing_block_refs {
1326 assert!(blocks.contains_key(b));
1327 }
1328 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1330 for b in blocks.keys() {
1331 assert!(b.round <= missing_round);
1332 assert!(expected_authors.contains(&b.author));
1333 }
1334
1335 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1337 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1339 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1340 let results = authority_service
1341 .handle_fetch_blocks(
1342 AuthorityIndex::new_for_test(0),
1343 vec![],
1344 highest_accepted_rounds.clone(),
1345 false,
1346 )
1347 .await
1348 .unwrap();
1349
1350 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1352 .iter()
1353 .map(|b| {
1354 let signed = bcs::from_bytes(b).unwrap();
1355 let block = VerifiedBlock::new_verified(signed, b.clone());
1356 (block.reference(), block)
1357 })
1358 .collect();
1359 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1360 for block_ref in blocks.keys() {
1362 let accepted = highest_accepted_rounds[block_ref.author];
1363 assert!(block_ref.round > accepted);
1364 }
1365 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1368 assert!(
1371 max_round_in_result <= 4,
1372 "Expected low rounds from fair round-order fetching, got max round {}",
1373 max_round_in_result
1374 );
1375
1376 let missing_block_refs: Vec<BlockRef> = all_blocks
1378 .iter()
1379 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1380 .map(|b| b.reference())
1381 .take(5)
1382 .collect();
1383 let results = authority_service
1384 .handle_fetch_blocks(
1385 AuthorityIndex::new_for_test(0),
1386 missing_block_refs.clone(),
1387 vec![],
1388 false,
1389 )
1390 .await
1391 .unwrap();
1392
1393 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1395 .iter()
1396 .map(|b| {
1397 let signed = bcs::from_bytes(b).unwrap();
1398 let block = VerifiedBlock::new_verified(signed, b.clone());
1399 (block.reference(), block)
1400 })
1401 .collect();
1402 assert_eq!(blocks.len(), 5);
1403 for b in &missing_block_refs {
1404 assert!(blocks.contains_key(b));
1405 }
1406 }
1407
1408 #[tokio::test(flavor = "current_thread", start_paused = true)]
1409 async fn test_handle_fetch_latest_blocks() {
1410 let (context, _keys) = Context::new_for_test(4);
1412 let context = Arc::new(context);
1413 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1414 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1415 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1416 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1417 let fake_client = Arc::new(FakeNetworkClient::default());
1418 let network_client = Arc::new(SynchronizerClient::new(
1419 context.clone(),
1420 Some(fake_client.clone()),
1421 Some(fake_client.clone()),
1422 ));
1423 let store = Arc::new(MemStore::new());
1424 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1425 let transaction_certifier =
1426 TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1427 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1428 let synchronizer = Synchronizer::start(
1429 network_client,
1430 context.clone(),
1431 core_dispatcher.clone(),
1432 commit_vote_monitor.clone(),
1433 block_verifier.clone(),
1434 transaction_certifier.clone(),
1435 round_tracker.clone(),
1436 dag_state.clone(),
1437 true,
1438 );
1439 let authority_service = Arc::new(AuthorityService::new(
1440 context.clone(),
1441 block_verifier,
1442 commit_vote_monitor,
1443 round_tracker,
1444 synchronizer,
1445 core_dispatcher.clone(),
1446 rx_block_broadcast,
1447 transaction_certifier,
1448 dag_state.clone(),
1449 store,
1450 ));
1451
1452 let mut dag_builder = DagBuilder::new(context.clone());
1454 dag_builder
1455 .layers(1..=10)
1456 .authorities(vec![AuthorityIndex::new_for_test(2)])
1457 .equivocate(1)
1458 .build()
1459 .persist_layers(dag_state);
1460
1461 let authorities_to_request = vec![
1463 AuthorityIndex::new_for_test(1),
1464 AuthorityIndex::new_for_test(2),
1465 ];
1466 let results = authority_service
1467 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1468 .await;
1469
1470 let serialised_blocks = results.unwrap();
1472 for serialised_block in serialised_blocks {
1473 let signed_block: SignedBlock =
1474 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1475 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1476
1477 assert_eq!(verified_block.round(), 10);
1478 }
1479 }
1480
1481 #[tokio::test(flavor = "current_thread", start_paused = true)]
1482 async fn test_handle_subscribe_blocks() {
1483 let (context, _keys) = Context::new_for_test(4);
1484 let context = Arc::new(context);
1485 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1486 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1487 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1488 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1489 let fake_client = Arc::new(FakeNetworkClient::default());
1490 let network_client = Arc::new(SynchronizerClient::new(
1491 context.clone(),
1492 Some(fake_client.clone()),
1493 Some(fake_client.clone()),
1494 ));
1495 let store = Arc::new(MemStore::new());
1496 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1497 let transaction_certifier =
1498 TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1499 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1500 let synchronizer = Synchronizer::start(
1501 network_client,
1502 context.clone(),
1503 core_dispatcher.clone(),
1504 commit_vote_monitor.clone(),
1505 block_verifier.clone(),
1506 transaction_certifier.clone(),
1507 round_tracker.clone(),
1508 dag_state.clone(),
1509 false,
1510 );
1511
1512 dag_state
1514 .write()
1515 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1516 dag_state
1517 .write()
1518 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1519 dag_state
1520 .write()
1521 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1522
1523 let authority_service = Arc::new(AuthorityService::new(
1524 context.clone(),
1525 block_verifier,
1526 commit_vote_monitor,
1527 round_tracker,
1528 synchronizer,
1529 core_dispatcher.clone(),
1530 rx_block_broadcast,
1531 transaction_certifier,
1532 dag_state.clone(),
1533 store,
1534 ));
1535
1536 let peer = context.committee.to_authority_index(1).unwrap();
1537
1538 {
1541 let mut stream = authority_service
1542 .handle_subscribe_blocks(peer, 100)
1543 .await
1544 .unwrap();
1545 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1546 assert_eq!(
1547 block.round(),
1548 15,
1549 "Should return last proposed block as fallback"
1550 );
1551 assert_eq!(block.author().value(), 0);
1552 }
1553
1554 {
1557 let mut stream = authority_service
1558 .handle_subscribe_blocks(peer, 7)
1559 .await
1560 .unwrap();
1561
1562 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1563 assert_eq!(block1.round(), 10, "Should return block at round 10");
1564
1565 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1566 assert_eq!(block2.round(), 15, "Should return block at round 15");
1567 }
1568 }
1569
1570 #[tokio::test(flavor = "current_thread", start_paused = true)]
1571 async fn test_handle_subscribe_blocks_not_proposed() {
1572 let (context, _keys) = Context::new_for_test(4);
1573 let context = Arc::new(context);
1574 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1575 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1576 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1577 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1578 let fake_client = Arc::new(FakeNetworkClient::default());
1579 let network_client = Arc::new(SynchronizerClient::new(
1580 context.clone(),
1581 Some(fake_client.clone()),
1582 Some(fake_client.clone()),
1583 ));
1584 let store = Arc::new(MemStore::new());
1585 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1586 let transaction_certifier =
1587 TransactionCertifier::new(context.clone(), block_verifier.clone(), dag_state.clone());
1588 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1589 let synchronizer = Synchronizer::start(
1590 network_client,
1591 context.clone(),
1592 core_dispatcher.clone(),
1593 commit_vote_monitor.clone(),
1594 block_verifier.clone(),
1595 transaction_certifier.clone(),
1596 round_tracker.clone(),
1597 dag_state.clone(),
1598 false,
1599 );
1600
1601 let authority_service = Arc::new(AuthorityService::new(
1604 context.clone(),
1605 block_verifier,
1606 commit_vote_monitor,
1607 round_tracker,
1608 synchronizer,
1609 core_dispatcher.clone(),
1610 rx_block_broadcast,
1611 transaction_certifier,
1612 dag_state.clone(),
1613 store,
1614 ));
1615
1616 let peer = context.committee.to_authority_index(1).unwrap();
1617
1618 let mut stream = authority_service
1620 .handle_subscribe_blocks(peer, 0)
1621 .await
1622 .unwrap();
1623
1624 use futures::poll;
1626 use std::task::Poll;
1627 let poll_result = poll!(stream.next());
1628 assert!(
1629 matches!(poll_result, Poll::Pending),
1630 "Should not receive genesis block on subscription stream"
1631 );
1632 }
1633}