1use std::{
5 cmp::Reverse,
6 collections::{BTreeMap, BTreeSet, BinaryHeap},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27 CommitIndex,
28 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29 block_verifier::BlockVerifier,
30 commit::{CommitAPI as _, CommitRange, TrustedCommit},
31 commit_vote_monitor::CommitVoteMonitor,
32 context::Context,
33 core_thread::CoreThreadDispatcher,
34 dag_state::DagState,
35 error::{ConsensusError, ConsensusResult},
36 network::{
37 BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38 ObserverNetworkService, PeerId, ValidatorNetworkService,
39 },
40 round_tracker::RoundTracker,
41 stake_aggregator::{QuorumThreshold, StakeAggregator},
42 storage::Store,
43 synchronizer::SynchronizerHandle,
44 transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51 context: Arc<Context>,
52 commit_vote_monitor: Arc<CommitVoteMonitor>,
53 block_verifier: Arc<dyn BlockVerifier>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57 subscription_counter: Arc<SubscriptionCounter>,
58 transaction_vote_tracker: TransactionVoteTracker,
59 dag_state: Arc<RwLock<DagState>>,
60 store: Arc<dyn Store>,
61 round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65 pub(crate) fn new(
66 context: Arc<Context>,
67 block_verifier: Arc<dyn BlockVerifier>,
68 commit_vote_monitor: Arc<CommitVoteMonitor>,
69 round_tracker: Arc<RwLock<RoundTracker>>,
70 synchronizer: Arc<SynchronizerHandle>,
71 core_dispatcher: Arc<C>,
72 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73 transaction_vote_tracker: TransactionVoteTracker,
74 dag_state: Arc<RwLock<DagState>>,
75 store: Arc<dyn Store>,
76 ) -> Self {
77 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78 Self {
79 context,
80 block_verifier,
81 commit_vote_monitor,
82 synchronizer,
83 core_dispatcher,
84 rx_block_broadcast,
85 subscription_counter,
86 transaction_vote_tracker,
87 dag_state,
88 store,
89 round_tracker,
90 }
91 }
92
93 fn parse_excluded_ancestors(
95 &self,
96 peer: AuthorityIndex,
97 block: &VerifiedBlock,
98 mut excluded_ancestors: Vec<Vec<u8>>,
99 ) -> ConsensusResult<Vec<BlockRef>> {
100 let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102 let excluded_ancestors_limit = self.context.committee.size() * 2;
103 if excluded_ancestors.len() > excluded_ancestors_limit {
104 debug!(
105 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106 excluded_ancestors.len() - excluded_ancestors_limit,
107 peer,
108 peer_hostname,
109 );
110 excluded_ancestors.truncate(excluded_ancestors_limit);
111 }
112
113 let excluded_ancestors = excluded_ancestors
114 .into_iter()
115 .map(|serialized| {
116 let block_ref: BlockRef =
117 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118 if !self.context.committee.is_valid_index(block_ref.author) {
119 return Err(ConsensusError::InvalidAuthorityIndex {
120 index: block_ref.author,
121 max: self.context.committee.size(),
122 });
123 }
124 if block_ref.round >= block.round() {
125 return Err(ConsensusError::InvalidAncestorRound {
126 ancestor: block_ref.round,
127 block: block.round(),
128 });
129 }
130 Ok(block_ref)
131 })
132 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134 for excluded_ancestor in &excluded_ancestors {
135 let excluded_ancestor_hostname = &self
136 .context
137 .committee
138 .authority(excluded_ancestor.author)
139 .hostname;
140 self.context
141 .metrics
142 .node_metrics
143 .network_excluded_ancestors_count_by_authority
144 .with_label_values(&[excluded_ancestor_hostname])
145 .inc();
146 }
147 self.context
148 .metrics
149 .node_metrics
150 .network_received_excluded_ancestors_from_authority
151 .with_label_values(&[peer_hostname])
152 .inc_by(excluded_ancestors.len() as u64);
153
154 Ok(excluded_ancestors)
155 }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160 async fn handle_send_block(
161 &self,
162 peer: AuthorityIndex,
163 serialized_block: ExtendedSerializedBlock,
164 ) -> ConsensusResult<()> {
165 fail_point_async!("consensus-rpc-response");
166
167 let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169 let signed_block: SignedBlock =
171 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173 if peer != signed_block.author() {
175 self.context
176 .metrics
177 .node_metrics
178 .invalid_blocks
179 .with_label_values(&[
180 peer_hostname.as_str(),
181 "handle_send_block",
182 "UnexpectedAuthority",
183 ])
184 .inc();
185 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186 info!("Block with wrong authority from {}: {}", peer, e);
187 return Err(e);
188 }
189
190 let (verified_block, reject_txn_votes) = self
192 .block_verifier
193 .verify_and_vote(signed_block, serialized_block.block)
194 .tap_err(|e| {
195 self.context
196 .metrics
197 .node_metrics
198 .invalid_blocks
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200 .inc();
201 info!("Invalid block from {}: {}", peer, e);
202 })?;
203 let excluded_ancestors = self
204 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205 .tap_err(|e| {
206 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207 self.context
208 .metrics
209 .node_metrics
210 .invalid_blocks
211 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212 .inc();
213 })?;
214
215 let block_ref = verified_block.reference();
216 debug!("Received block {} via send block.", block_ref);
217
218 self.context
219 .metrics
220 .node_metrics
221 .verified_blocks
222 .with_label_values(&[peer_hostname])
223 .inc();
224
225 let now = self.context.clock.timestamp_utc_ms();
226 let forward_time_drift =
227 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229 self.context
230 .metrics
231 .node_metrics
232 .block_timestamp_drift_ms
233 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234 .inc_by(forward_time_drift.as_millis() as u64);
235
236 self.commit_vote_monitor.observe_block(&verified_block);
239
240 self.round_tracker
242 .write()
243 .update_from_verified_block(&ExtendedBlock {
244 block: verified_block.clone(),
245 excluded_ancestors: excluded_ancestors.clone(),
246 });
247
248 let last_commit_index = self.dag_state.read().last_commit_index();
257 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258 if last_commit_index
261 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262 < quorum_commit_index
263 {
264 self.context
265 .metrics
266 .node_metrics
267 .rejected_blocks
268 .with_label_values(&["commit_lagging"])
269 .inc();
270 debug!(
271 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272 block_ref, last_commit_index, quorum_commit_index,
273 );
274 return Err(ConsensusError::BlockRejected {
275 block_ref,
276 reason: format!(
277 "Last commit index is lagging quorum commit index too much ({} < {})",
278 last_commit_index, quorum_commit_index,
279 ),
280 });
281 }
282
283 if self.context.protocol_config.transaction_voting_enabled() {
286 self.transaction_vote_tracker
287 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288 }
289
290 let missing_ancestors = self
292 .core_dispatcher
293 .add_blocks(vec![verified_block.clone()])
294 .await
295 .map_err(|_| ConsensusError::Shutdown)?;
296
297 if !missing_ancestors.is_empty() {
299 self.context
300 .metrics
301 .node_metrics
302 .handler_received_block_missing_ancestors
303 .with_label_values(&[peer_hostname])
304 .inc_by(missing_ancestors.len() as u64);
305 let synchronizer = self.synchronizer.clone();
306 spawn_monitored_task!(async move {
307 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313 }
314 });
315 }
316
317 let missing_excluded_ancestors = self
319 .core_dispatcher
320 .check_block_refs(excluded_ancestors)
321 .await
322 .map_err(|_| ConsensusError::Shutdown)?;
323 if !missing_excluded_ancestors.is_empty() {
324 self.context
325 .metrics
326 .node_metrics
327 .network_excluded_ancestors_sent_to_fetch
328 .with_label_values(&[peer_hostname])
329 .inc_by(missing_excluded_ancestors.len() as u64);
330
331 let synchronizer = self.synchronizer.clone();
332 spawn_monitored_task!(async move {
333 if let Err(err) = synchronizer
334 .fetch_blocks(missing_excluded_ancestors, peer)
335 .await
336 {
337 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338 }
339 });
340 }
341
342 Ok(())
343 }
344
345 async fn handle_subscribe_blocks(
346 &self,
347 peer: AuthorityIndex,
348 last_received: Round,
349 ) -> ConsensusResult<BlockStream> {
350 fail_point_async!("consensus-rpc-response");
351
352 let past_proposed_blocks = {
361 let dag_state = self.dag_state.read();
362
363 let mut proposed_blocks =
364 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365 if proposed_blocks.is_empty() {
366 let last_proposed_block = dag_state.get_last_proposed_block();
367 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
368 vec![last_proposed_block]
369 } else {
370 vec![]
371 };
372 }
373 stream::iter(
374 proposed_blocks
375 .into_iter()
376 .map(|block| ExtendedSerializedBlock {
377 block: block.serialized().clone(),
378 excluded_ancestors: vec![],
379 }),
380 )
381 };
382
383 let broadcasted_blocks = BroadcastedBlockStream::new(
384 PeerId::Validator(peer),
385 self.rx_block_broadcast.resubscribe(),
386 self.subscription_counter.clone(),
387 );
388
389 Ok(Box::pin(past_proposed_blocks.chain(
391 broadcasted_blocks.map(ExtendedSerializedBlock::from),
392 )))
393 }
394
395 async fn handle_fetch_blocks(
411 &self,
412 _peer: AuthorityIndex,
413 mut block_refs: Vec<BlockRef>,
414 fetch_after_rounds: Vec<Round>,
415 fetch_missing_ancestors: bool,
416 ) -> ConsensusResult<Vec<Bytes>> {
417 fail_point_async!("consensus-rpc-response");
418
419 if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
420 return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
421 }
422 if !fetch_after_rounds.is_empty()
423 && fetch_after_rounds.len() != self.context.committee.size()
424 {
425 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
426 fetch_after_rounds.len(),
427 self.context.committee.size(),
428 ));
429 }
430
431 let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
433 self.context.parameters.max_blocks_per_sync
434 } else {
435 self.context.parameters.max_blocks_per_fetch
436 };
437 if block_refs.len() > max_response_num_blocks {
438 block_refs.truncate(max_response_num_blocks);
439 }
440
441 for block in &block_refs {
443 if !self.context.committee.is_valid_index(block.author) {
444 return Err(ConsensusError::InvalidAuthorityIndex {
445 index: block.author,
446 max: self.context.committee.size(),
447 });
448 }
449 if block.round == GENESIS_ROUND {
450 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
451 }
452 }
453
454 let mut blocks = self
456 .dag_state
457 .read()
458 .get_blocks(&block_refs)
459 .into_iter()
460 .flatten()
461 .collect::<Vec<_>>();
462
463 if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
466 if fetch_missing_ancestors {
467 let missing_ancestors = blocks
470 .iter()
471 .flat_map(|block| block.ancestors().to_vec())
472 .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
473 .collect::<BTreeSet<_>>()
474 .into_iter()
475 .collect::<Vec<_>>();
476
477 let selected_num_blocks = max_response_num_blocks
480 .saturating_sub(blocks.len())
481 .min(missing_ancestors.len());
482 if selected_num_blocks > 0 {
483 let selected_ancestor_refs = missing_ancestors
484 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
485 .copied()
486 .collect::<Vec<_>>();
487 let ancestor_blocks = self
488 .dag_state
489 .read()
490 .get_blocks(&selected_ancestor_refs)
491 .into_iter()
492 .flatten();
493 blocks.extend(ancestor_blocks);
494 }
495 } else {
496 let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
499 if block_refs.is_empty() {
500 let dag_state = self.dag_state.read();
501 for (index, _authority) in self.context.committee.authorities() {
502 let last_block = dag_state.get_last_block_for_authority(index);
503 limit_rounds.insert(index, last_block.round());
504 }
505 } else {
506 for block_ref in &block_refs {
507 let entry = limit_rounds
508 .entry(block_ref.author)
509 .or_insert(block_ref.round);
510 *entry = (*entry).min(block_ref.round);
511 }
512 }
513
514 let mut heap = BinaryHeap::new();
517 for (authority, limit_round) in &limit_rounds {
518 let fetch_start = fetch_after_rounds[*authority] + 1;
519 if fetch_start < *limit_round {
520 heap.push(Reverse((fetch_start, *authority, *limit_round)));
521 }
522 }
523
524 while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
525 let fetched = self.store.scan_blocks_by_author_in_range(
526 authority,
527 fetch_start,
528 limit_round,
529 1,
530 )?;
531 if let Some(block) = fetched.into_iter().next() {
532 let next_start = block.round() + 1;
533 blocks.push(block);
534 if blocks.len() >= max_response_num_blocks {
535 blocks.truncate(max_response_num_blocks);
536 break;
537 }
538 if next_start < limit_round {
539 heap.push(Reverse((next_start, authority, limit_round)));
540 }
541 }
542 }
543 }
544 }
545
546 let bytes = blocks
548 .into_iter()
549 .map(|block| block.serialized().clone())
550 .collect::<Vec<_>>();
551 Ok(bytes)
552 }
553
554 async fn handle_fetch_commits(
555 &self,
556 _peer: AuthorityIndex,
557 commit_range: CommitRange,
558 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
559 fail_point_async!("consensus-rpc-response");
560
561 let inclusive_end = commit_range.end().min(
563 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
564 - 1,
565 );
566 let mut commits = self
567 .store
568 .scan_commits((commit_range.start()..=inclusive_end).into())?;
569 let mut certifier_block_refs = vec![];
570 'commit: while let Some(c) = commits.last() {
571 let index = c.index();
572 let votes = self.store.read_commit_votes(index)?;
573 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
574 for v in &votes {
575 stake_aggregator.add(v.author, &self.context.committee);
576 }
577 if stake_aggregator.reached_threshold(&self.context.committee) {
578 certifier_block_refs = votes;
579 break 'commit;
580 } else {
581 debug!(
582 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
583 index,
584 stake_aggregator.stake(),
585 stake_aggregator.threshold(&self.context.committee)
586 );
587 self.context
588 .metrics
589 .node_metrics
590 .commit_sync_fetch_commits_handler_uncertified_skipped
591 .inc();
592 commits.pop();
593 }
594 }
595 let certifier_blocks = self
596 .store
597 .read_blocks(&certifier_block_refs)?
598 .into_iter()
599 .flatten()
600 .collect();
601 Ok((commits, certifier_blocks))
602 }
603
604 async fn handle_fetch_latest_blocks(
605 &self,
606 peer: AuthorityIndex,
607 authorities: Vec<AuthorityIndex>,
608 ) -> ConsensusResult<Vec<Bytes>> {
609 fail_point_async!("consensus-rpc-response");
610
611 if authorities.len() > self.context.committee.size() {
612 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
613 }
614
615 for authority in &authorities {
617 if !self.context.committee.is_valid_index(*authority) {
618 return Err(ConsensusError::InvalidAuthorityIndex {
619 index: *authority,
620 max: self.context.committee.size(),
621 });
622 }
623 }
624
625 let mut blocks = vec![];
629 let dag_state = self.dag_state.read();
630 for authority in authorities {
631 let block = dag_state.get_last_block_for_authority(authority);
632
633 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
634
635 if block.round() != GENESIS_ROUND {
637 blocks.push(block);
638 }
639 }
640
641 let result = blocks
643 .into_iter()
644 .map(|block| block.serialized().clone())
645 .collect::<Vec<_>>();
646
647 Ok(result)
648 }
649
650 async fn handle_get_latest_rounds(
651 &self,
652 _peer: AuthorityIndex,
653 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
654 fail_point_async!("consensus-rpc-response");
655
656 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
657
658 let blocks = self
659 .dag_state
660 .read()
661 .get_last_cached_block_per_authority(Round::MAX);
662 let highest_accepted_rounds = blocks
663 .into_iter()
664 .map(|(block, _)| block.round())
665 .collect::<Vec<_>>();
666
667 Ok((highest_received_rounds, highest_accepted_rounds))
668 }
669}
670
671#[async_trait]
672impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
673 async fn handle_block(
674 &self,
675 _peer: PeerId,
676 _item: ObserverBlockStreamItem,
677 ) -> ConsensusResult<()> {
678 Err(ConsensusError::NetworkRequest(
680 "Observer block handling not yet implemented".to_string(),
681 ))
682 }
683
684 async fn handle_stream_blocks(
685 &self,
686 _peer: NodeId,
687 _highest_round_per_authority: Vec<u64>,
688 ) -> ConsensusResult<ObserverBlockStream> {
689 todo!("Observer block streaming not yet implemented")
691 }
692
693 async fn handle_fetch_blocks(
694 &self,
695 _peer: NodeId,
696 _block_refs: Vec<BlockRef>,
697 ) -> ConsensusResult<Vec<Bytes>> {
698 Err(ConsensusError::NetworkRequest(
701 "Observer fetch blocks not yet implemented".to_string(),
702 ))
703 }
704
705 async fn handle_fetch_commits(
706 &self,
707 _peer: NodeId,
708 _commit_range: CommitRange,
709 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
710 Err(ConsensusError::NetworkRequest(
712 "Observer fetch commits not yet implemented".to_string(),
713 ))
714 }
715}
716struct Counter {
717 count: usize,
718 subscriptions_by_peer: BTreeMap<PeerId, usize>,
719}
720
721pub(crate) struct SubscriptionCounter {
723 context: Arc<Context>,
724 counter: parking_lot::Mutex<Counter>,
725}
726
727impl SubscriptionCounter {
728 pub(crate) fn new(context: Arc<Context>) -> Self {
729 for (_, authority) in context.committee.authorities() {
731 context
732 .metrics
733 .node_metrics
734 .subscribed_by
735 .with_label_values(&[authority.hostname.as_str()])
736 .set(0);
737 }
738
739 Self {
740 counter: parking_lot::Mutex::new(Counter {
741 count: 0,
742 subscriptions_by_peer: BTreeMap::new(),
743 }),
744 context,
745 }
746 }
747
748 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
749 let mut counter = self.counter.lock();
750 counter.count += 1;
751 *counter
752 .subscriptions_by_peer
753 .entry(peer.clone())
754 .or_insert(0) += 1;
755
756 match peer {
757 PeerId::Validator(authority) => {
758 let peer_hostname = &self.context.committee.authority(*authority).hostname;
759 self.context
760 .metrics
761 .node_metrics
762 .subscribed_by
763 .with_label_values(&[peer_hostname])
764 .set(1);
765 }
766 PeerId::Observer(_) => {
767 self.context
768 .metrics
769 .node_metrics
770 .subscribed_by
771 .with_label_values(&["observer"])
772 .inc();
773 }
774 }
775
776 Ok(())
777 }
778
779 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
780 let mut counter = self.counter.lock();
781 counter.count -= 1;
782 *counter
783 .subscriptions_by_peer
784 .entry(peer.clone())
785 .or_insert(0) -= 1;
786
787 if counter.subscriptions_by_peer[peer] == 0 {
788 match peer {
789 PeerId::Validator(authority) => {
790 let peer_hostname = &self.context.committee.authority(*authority).hostname;
791 self.context
792 .metrics
793 .node_metrics
794 .subscribed_by
795 .with_label_values(&[peer_hostname])
796 .set(0);
797 }
798 PeerId::Observer(_) => {
799 self.context
800 .metrics
801 .node_metrics
802 .subscribed_by
803 .with_label_values(&["observer"])
804 .dec();
805 }
806 }
807 }
808
809 Ok(())
810 }
811}
812
813type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
816
817pub(crate) struct BroadcastStream<T> {
820 peer: PeerId,
821 inner: ReusableBoxFuture<
823 'static,
824 (
825 Result<T, broadcast::error::RecvError>,
826 broadcast::Receiver<T>,
827 ),
828 >,
829 subscription_counter: Arc<SubscriptionCounter>,
831}
832
833impl<T: 'static + Clone + Send> BroadcastStream<T> {
834 pub fn new(
835 peer: PeerId,
836 rx: broadcast::Receiver<T>,
837 subscription_counter: Arc<SubscriptionCounter>,
838 ) -> Self {
839 if let Err(err) = subscription_counter.increment(&peer) {
840 match err {
841 ConsensusError::Shutdown => {}
842 _ => panic!("Unexpected error: {err}"),
843 }
844 }
845 Self {
846 peer,
847 inner: ReusableBoxFuture::new(make_recv_future(rx)),
848 subscription_counter,
849 }
850 }
851}
852
853impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
854 type Item = T;
855
856 fn poll_next(
857 mut self: Pin<&mut Self>,
858 cx: &mut task::Context<'_>,
859 ) -> task::Poll<Option<Self::Item>> {
860 let peer = self.peer.clone();
861 let maybe_item = loop {
862 let (result, rx) = ready!(self.inner.poll(cx));
863 self.inner.set(make_recv_future(rx));
864
865 match result {
866 Ok(item) => break Some(item),
867 Err(broadcast::error::RecvError::Closed) => {
868 info!("Block BroadcastedBlockStream {} closed", peer);
869 break None;
870 }
871 Err(broadcast::error::RecvError::Lagged(n)) => {
872 warn!(
873 "Block BroadcastedBlockStream {} lagged by {} messages",
874 peer, n
875 );
876 continue;
877 }
878 }
879 };
880 task::Poll::Ready(maybe_item)
881 }
882}
883
884impl<T> Drop for BroadcastStream<T> {
885 fn drop(&mut self) {
886 if let Err(err) = self.subscription_counter.decrement(&self.peer) {
887 match err {
888 ConsensusError::Shutdown => {}
889 _ => panic!("Unexpected error: {err}"),
890 }
891 }
892 }
893}
894
895async fn make_recv_future<T: Clone>(
896 mut rx: broadcast::Receiver<T>,
897) -> (
898 Result<T, broadcast::error::RecvError>,
899 broadcast::Receiver<T>,
900) {
901 let result = rx.recv().await;
902 (result, rx)
903}
904
905#[cfg(test)]
908mod tests {
909 use std::{
910 collections::{BTreeMap, BTreeSet},
911 sync::Arc,
912 time::Duration,
913 };
914
915 use async_trait::async_trait;
916 use bytes::Bytes;
917 use consensus_config::AuthorityIndex;
918 use consensus_types::block::{BlockDigest, BlockRef, Round};
919 use parking_lot::{Mutex, RwLock};
920 use tokio::{sync::broadcast, time::sleep};
921
922 use futures::StreamExt as _;
923
924 use crate::{
925 authority_service::AuthorityService,
926 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
927 commit::{CertifiedCommits, CommitRange},
928 commit_vote_monitor::CommitVoteMonitor,
929 context::Context,
930 core_thread::{CoreError, CoreThreadDispatcher},
931 dag_state::DagState,
932 error::ConsensusResult,
933 network::{
934 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
935 ValidatorNetworkClient, ValidatorNetworkService,
936 },
937 round_tracker::RoundTracker,
938 storage::mem_store::MemStore,
939 synchronizer::Synchronizer,
940 test_dag_builder::DagBuilder,
941 transaction_vote_tracker::TransactionVoteTracker,
942 };
943 struct FakeCoreThreadDispatcher {
944 blocks: Mutex<Vec<VerifiedBlock>>,
945 }
946
947 impl FakeCoreThreadDispatcher {
948 fn new() -> Self {
949 Self {
950 blocks: Mutex::new(vec![]),
951 }
952 }
953
954 fn get_blocks(&self) -> Vec<VerifiedBlock> {
955 self.blocks.lock().clone()
956 }
957 }
958
959 #[async_trait]
960 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
961 async fn add_blocks(
962 &self,
963 blocks: Vec<VerifiedBlock>,
964 ) -> Result<BTreeSet<BlockRef>, CoreError> {
965 let block_refs = blocks.iter().map(|b| b.reference()).collect();
966 self.blocks.lock().extend(blocks);
967 Ok(block_refs)
968 }
969
970 async fn check_block_refs(
971 &self,
972 _block_refs: Vec<BlockRef>,
973 ) -> Result<BTreeSet<BlockRef>, CoreError> {
974 Ok(BTreeSet::new())
975 }
976
977 async fn add_certified_commits(
978 &self,
979 _commits: CertifiedCommits,
980 ) -> Result<BTreeSet<BlockRef>, CoreError> {
981 todo!()
982 }
983
984 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
985 Ok(())
986 }
987
988 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
989 Ok(Default::default())
990 }
991
992 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
993 todo!()
994 }
995
996 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
997 todo!()
998 }
999 }
1000
1001 #[derive(Default)]
1002 struct FakeNetworkClient {}
1003
1004 #[async_trait]
1005 impl ValidatorNetworkClient for FakeNetworkClient {
1006 async fn send_block(
1007 &self,
1008 _peer: AuthorityIndex,
1009 _block: &VerifiedBlock,
1010 _timeout: Duration,
1011 ) -> ConsensusResult<()> {
1012 unimplemented!("Unimplemented")
1013 }
1014
1015 async fn subscribe_blocks(
1016 &self,
1017 _peer: AuthorityIndex,
1018 _last_received: Round,
1019 _timeout: Duration,
1020 ) -> ConsensusResult<BlockStream> {
1021 unimplemented!("Unimplemented")
1022 }
1023
1024 async fn fetch_blocks(
1025 &self,
1026 _peer: AuthorityIndex,
1027 _block_refs: Vec<BlockRef>,
1028 _fetch_after_rounds: Vec<Round>,
1029 _fetch_missing_ancestors: bool,
1030 _timeout: Duration,
1031 ) -> ConsensusResult<Vec<Bytes>> {
1032 unimplemented!("Unimplemented")
1033 }
1034
1035 async fn fetch_commits(
1036 &self,
1037 _peer: AuthorityIndex,
1038 _commit_range: CommitRange,
1039 _timeout: Duration,
1040 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1041 unimplemented!("Unimplemented")
1042 }
1043
1044 async fn fetch_latest_blocks(
1045 &self,
1046 _peer: AuthorityIndex,
1047 _authorities: Vec<AuthorityIndex>,
1048 _timeout: Duration,
1049 ) -> ConsensusResult<Vec<Bytes>> {
1050 unimplemented!("Unimplemented")
1051 }
1052
1053 async fn get_latest_rounds(
1054 &self,
1055 _peer: AuthorityIndex,
1056 _timeout: Duration,
1057 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1058 unimplemented!("Unimplemented")
1059 }
1060 }
1061
1062 #[async_trait]
1063 impl ObserverNetworkClient for FakeNetworkClient {
1064 async fn stream_blocks(
1065 &self,
1066 _peer: crate::network::PeerId,
1067 _highest_round_per_authority: Vec<u64>,
1068 _timeout: Duration,
1069 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1070 unimplemented!("Unimplemented")
1071 }
1072
1073 async fn fetch_blocks(
1074 &self,
1075 _peer: crate::network::PeerId,
1076 _block_refs: Vec<BlockRef>,
1077 _timeout: Duration,
1078 ) -> ConsensusResult<Vec<Bytes>> {
1079 unimplemented!("Unimplemented")
1080 }
1081
1082 async fn fetch_commits(
1083 &self,
1084 _peer: crate::network::PeerId,
1085 _commit_range: CommitRange,
1086 _timeout: Duration,
1087 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1088 unimplemented!("Unimplemented")
1089 }
1090 }
1091
1092 #[tokio::test(flavor = "current_thread", start_paused = true)]
1093 async fn test_handle_send_block() {
1094 let (context, _keys) = Context::new_for_test(4);
1095 let context = Arc::new(context);
1096 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1097 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1098 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1099 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1100 let fake_client = Arc::new(FakeNetworkClient::default());
1101 let network_client = Arc::new(SynchronizerClient::new(
1102 context.clone(),
1103 Some(fake_client.clone()),
1104 Some(fake_client.clone()),
1105 ));
1106 let store = Arc::new(MemStore::new());
1107 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1108 let transaction_vote_tracker =
1109 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1110 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1111 let synchronizer = Synchronizer::start(
1112 network_client,
1113 context.clone(),
1114 core_dispatcher.clone(),
1115 commit_vote_monitor.clone(),
1116 block_verifier.clone(),
1117 transaction_vote_tracker.clone(),
1118 round_tracker.clone(),
1119 dag_state.clone(),
1120 false,
1121 );
1122 let authority_service = Arc::new(AuthorityService::new(
1123 context.clone(),
1124 block_verifier,
1125 commit_vote_monitor,
1126 round_tracker,
1127 synchronizer,
1128 core_dispatcher.clone(),
1129 rx_block_broadcast,
1130 transaction_vote_tracker,
1131 dag_state,
1132 store,
1133 ));
1134
1135 let now = context.clock.timestamp_utc_ms();
1137 let max_drift = context.parameters.max_forward_time_drift;
1138 let input_block = VerifiedBlock::new_for_test(
1139 TestBlock::new(9, 0)
1140 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1141 .build(),
1142 );
1143
1144 let service = authority_service.clone();
1145 let serialized = ExtendedSerializedBlock {
1146 block: input_block.serialized().clone(),
1147 excluded_ancestors: vec![],
1148 };
1149
1150 tokio::spawn({
1151 let service = service.clone();
1152 let context = context.clone();
1153 async move {
1154 service
1155 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1156 .await
1157 .unwrap();
1158 }
1159 });
1160
1161 sleep(max_drift / 2).await;
1162
1163 let blocks = core_dispatcher.get_blocks();
1164 assert_eq!(blocks.len(), 1);
1165 assert_eq!(blocks[0], input_block);
1166
1167 let invalid_block =
1169 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1170 let extended_block = ExtendedSerializedBlock {
1171 block: invalid_block.serialized().clone(),
1172 excluded_ancestors: vec![],
1173 };
1174 service
1175 .handle_send_block(
1176 context.committee.to_authority_index(0).unwrap(),
1177 extended_block,
1178 )
1179 .await
1180 .unwrap_err();
1181
1182 let invalid_excluded_ancestors = vec![
1184 bcs::to_bytes(&BlockRef::new(
1185 10,
1186 AuthorityIndex::new_for_test(1000),
1187 BlockDigest::MIN,
1188 ))
1189 .unwrap(),
1190 vec![3u8; 40],
1191 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1192 ];
1193 let extended_block = ExtendedSerializedBlock {
1194 block: input_block.serialized().clone(),
1195 excluded_ancestors: invalid_excluded_ancestors,
1196 };
1197 service
1198 .handle_send_block(
1199 context.committee.to_authority_index(0).unwrap(),
1200 extended_block,
1201 )
1202 .await
1203 .unwrap_err();
1204 }
1205
1206 #[tokio::test(flavor = "current_thread", start_paused = true)]
1207 async fn test_handle_fetch_blocks() {
1208 const NUM_AUTHORITIES: usize = 40;
1211 const NUM_ROUNDS: usize = 40;
1212 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1213 context.parameters.max_blocks_per_fetch = 50;
1214 let context = Arc::new(context);
1215 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1216 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1217 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1218 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1219 let fake_client = Arc::new(FakeNetworkClient::default());
1220 let network_client = Arc::new(SynchronizerClient::new(
1221 context.clone(),
1222 Some(fake_client.clone()),
1223 Some(fake_client.clone()),
1224 ));
1225 let store = Arc::new(MemStore::new());
1226 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1227 let transaction_vote_tracker =
1228 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1229 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1230 let synchronizer = Synchronizer::start(
1231 network_client,
1232 context.clone(),
1233 core_dispatcher.clone(),
1234 commit_vote_monitor.clone(),
1235 block_verifier.clone(),
1236 transaction_vote_tracker.clone(),
1237 round_tracker.clone(),
1238 dag_state.clone(),
1239 false,
1240 );
1241 let authority_service = Arc::new(AuthorityService::new(
1242 context.clone(),
1243 block_verifier,
1244 commit_vote_monitor,
1245 round_tracker,
1246 synchronizer,
1247 core_dispatcher.clone(),
1248 rx_block_broadcast,
1249 transaction_vote_tracker,
1250 dag_state.clone(),
1251 store,
1252 ));
1253
1254 let mut dag_builder = DagBuilder::new(context.clone());
1256 dag_builder
1257 .layers(1..=(NUM_ROUNDS as u32))
1258 .build()
1259 .persist_layers(dag_state.clone());
1260 dag_state.write().flush();
1261 let all_blocks = dag_builder.all_blocks();
1262
1263 let missing_block_refs: Vec<BlockRef> = all_blocks
1265 .iter()
1266 .rev()
1267 .take(2)
1268 .map(|b| b.reference())
1269 .collect();
1270 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1271 let results = authority_service
1272 .handle_fetch_blocks(
1273 AuthorityIndex::new_for_test(0),
1274 missing_block_refs.clone(),
1275 highest_accepted_rounds,
1276 true,
1277 )
1278 .await
1279 .unwrap();
1280
1281 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1283 .iter()
1284 .map(|b| {
1285 let signed = bcs::from_bytes(b).unwrap();
1286 let block = VerifiedBlock::new_verified(signed, b.clone());
1287 (block.reference(), block)
1288 })
1289 .collect();
1290 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1291 for b in &missing_block_refs {
1293 assert!(blocks.contains_key(b));
1294 }
1295 let num_missing_ancestors = blocks
1296 .keys()
1297 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1298 .count();
1299 assert_eq!(
1300 num_missing_ancestors,
1301 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1302 );
1303
1304 let missing_round = NUM_ROUNDS as Round - 3;
1306 let missing_block_refs: Vec<BlockRef> = all_blocks
1307 .iter()
1308 .filter(|b| b.reference().round == missing_round)
1309 .map(|b| b.reference())
1310 .take(2)
1311 .collect();
1312 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1313 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1315 let results = authority_service
1316 .handle_fetch_blocks(
1317 AuthorityIndex::new_for_test(0),
1318 missing_block_refs.clone(),
1319 highest_accepted_rounds,
1320 false,
1321 )
1322 .await
1323 .unwrap();
1324
1325 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1327 .iter()
1328 .map(|b| {
1329 let signed = bcs::from_bytes(b).unwrap();
1330 let block = VerifiedBlock::new_verified(signed, b.clone());
1331 (block.reference(), block)
1332 })
1333 .collect();
1334 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1335 for b in &missing_block_refs {
1337 assert!(blocks.contains_key(b));
1338 }
1339 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1341 for b in blocks.keys() {
1342 assert!(b.round <= missing_round);
1343 assert!(expected_authors.contains(&b.author));
1344 }
1345
1346 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1348 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1350 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1351 let results = authority_service
1352 .handle_fetch_blocks(
1353 AuthorityIndex::new_for_test(0),
1354 vec![],
1355 highest_accepted_rounds.clone(),
1356 false,
1357 )
1358 .await
1359 .unwrap();
1360
1361 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1363 .iter()
1364 .map(|b| {
1365 let signed = bcs::from_bytes(b).unwrap();
1366 let block = VerifiedBlock::new_verified(signed, b.clone());
1367 (block.reference(), block)
1368 })
1369 .collect();
1370 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1371 for block_ref in blocks.keys() {
1373 let accepted = highest_accepted_rounds[block_ref.author];
1374 assert!(block_ref.round > accepted);
1375 }
1376 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1379 assert!(
1382 max_round_in_result <= 4,
1383 "Expected low rounds from fair round-order fetching, got max round {}",
1384 max_round_in_result
1385 );
1386
1387 let missing_block_refs: Vec<BlockRef> = all_blocks
1389 .iter()
1390 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1391 .map(|b| b.reference())
1392 .take(5)
1393 .collect();
1394 let results = authority_service
1395 .handle_fetch_blocks(
1396 AuthorityIndex::new_for_test(0),
1397 missing_block_refs.clone(),
1398 vec![],
1399 false,
1400 )
1401 .await
1402 .unwrap();
1403
1404 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1406 .iter()
1407 .map(|b| {
1408 let signed = bcs::from_bytes(b).unwrap();
1409 let block = VerifiedBlock::new_verified(signed, b.clone());
1410 (block.reference(), block)
1411 })
1412 .collect();
1413 assert_eq!(blocks.len(), 5);
1414 for b in &missing_block_refs {
1415 assert!(blocks.contains_key(b));
1416 }
1417 }
1418
1419 #[tokio::test(flavor = "current_thread", start_paused = true)]
1420 async fn test_handle_fetch_latest_blocks() {
1421 let (context, _keys) = Context::new_for_test(4);
1423 let context = Arc::new(context);
1424 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1425 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1426 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1427 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1428 let fake_client = Arc::new(FakeNetworkClient::default());
1429 let network_client = Arc::new(SynchronizerClient::new(
1430 context.clone(),
1431 Some(fake_client.clone()),
1432 Some(fake_client.clone()),
1433 ));
1434 let store = Arc::new(MemStore::new());
1435 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1436 let transaction_vote_tracker =
1437 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1438 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1439 let synchronizer = Synchronizer::start(
1440 network_client,
1441 context.clone(),
1442 core_dispatcher.clone(),
1443 commit_vote_monitor.clone(),
1444 block_verifier.clone(),
1445 transaction_vote_tracker.clone(),
1446 round_tracker.clone(),
1447 dag_state.clone(),
1448 true,
1449 );
1450 let authority_service = Arc::new(AuthorityService::new(
1451 context.clone(),
1452 block_verifier,
1453 commit_vote_monitor,
1454 round_tracker,
1455 synchronizer,
1456 core_dispatcher.clone(),
1457 rx_block_broadcast,
1458 transaction_vote_tracker,
1459 dag_state.clone(),
1460 store,
1461 ));
1462
1463 let mut dag_builder = DagBuilder::new(context.clone());
1465 dag_builder
1466 .layers(1..=10)
1467 .authorities(vec![AuthorityIndex::new_for_test(2)])
1468 .equivocate(1)
1469 .build()
1470 .persist_layers(dag_state);
1471
1472 let authorities_to_request = vec![
1474 AuthorityIndex::new_for_test(1),
1475 AuthorityIndex::new_for_test(2),
1476 ];
1477 let results = authority_service
1478 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1479 .await;
1480
1481 let serialised_blocks = results.unwrap();
1483 for serialised_block in serialised_blocks {
1484 let signed_block: SignedBlock =
1485 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1486 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1487
1488 assert_eq!(verified_block.round(), 10);
1489 }
1490 }
1491
1492 #[tokio::test(flavor = "current_thread", start_paused = true)]
1493 async fn test_handle_subscribe_blocks() {
1494 let (context, _keys) = Context::new_for_test(4);
1495 let context = Arc::new(context);
1496 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1497 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1498 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1499 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1500 let fake_client = Arc::new(FakeNetworkClient::default());
1501 let network_client = Arc::new(SynchronizerClient::new(
1502 context.clone(),
1503 Some(fake_client.clone()),
1504 Some(fake_client.clone()),
1505 ));
1506 let store = Arc::new(MemStore::new());
1507 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1508 let transaction_vote_tracker =
1509 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1510 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1511 let synchronizer = Synchronizer::start(
1512 network_client,
1513 context.clone(),
1514 core_dispatcher.clone(),
1515 commit_vote_monitor.clone(),
1516 block_verifier.clone(),
1517 transaction_vote_tracker.clone(),
1518 round_tracker.clone(),
1519 dag_state.clone(),
1520 false,
1521 );
1522
1523 dag_state
1525 .write()
1526 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1527 dag_state
1528 .write()
1529 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1530 dag_state
1531 .write()
1532 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1533
1534 let authority_service = Arc::new(AuthorityService::new(
1535 context.clone(),
1536 block_verifier,
1537 commit_vote_monitor,
1538 round_tracker,
1539 synchronizer,
1540 core_dispatcher.clone(),
1541 rx_block_broadcast,
1542 transaction_vote_tracker,
1543 dag_state.clone(),
1544 store,
1545 ));
1546
1547 let peer = context.committee.to_authority_index(1).unwrap();
1548
1549 {
1552 let mut stream = authority_service
1553 .handle_subscribe_blocks(peer, 100)
1554 .await
1555 .unwrap();
1556 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1557 assert_eq!(
1558 block.round(),
1559 15,
1560 "Should return last proposed block as fallback"
1561 );
1562 assert_eq!(block.author().value(), 0);
1563 }
1564
1565 {
1568 let mut stream = authority_service
1569 .handle_subscribe_blocks(peer, 7)
1570 .await
1571 .unwrap();
1572
1573 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1574 assert_eq!(block1.round(), 10, "Should return block at round 10");
1575
1576 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1577 assert_eq!(block2.round(), 15, "Should return block at round 15");
1578 }
1579 }
1580
1581 #[tokio::test(flavor = "current_thread", start_paused = true)]
1582 async fn test_handle_subscribe_blocks_not_proposed() {
1583 let (context, _keys) = Context::new_for_test(4);
1584 let context = Arc::new(context);
1585 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1586 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1587 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1588 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1589 let fake_client = Arc::new(FakeNetworkClient::default());
1590 let network_client = Arc::new(SynchronizerClient::new(
1591 context.clone(),
1592 Some(fake_client.clone()),
1593 Some(fake_client.clone()),
1594 ));
1595 let store = Arc::new(MemStore::new());
1596 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1597 let transaction_vote_tracker =
1598 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1599 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1600 let synchronizer = Synchronizer::start(
1601 network_client,
1602 context.clone(),
1603 core_dispatcher.clone(),
1604 commit_vote_monitor.clone(),
1605 block_verifier.clone(),
1606 transaction_vote_tracker.clone(),
1607 round_tracker.clone(),
1608 dag_state.clone(),
1609 false,
1610 );
1611
1612 let authority_service = Arc::new(AuthorityService::new(
1615 context.clone(),
1616 block_verifier,
1617 commit_vote_monitor,
1618 round_tracker,
1619 synchronizer,
1620 core_dispatcher.clone(),
1621 rx_block_broadcast,
1622 transaction_vote_tracker,
1623 dag_state.clone(),
1624 store,
1625 ));
1626
1627 let peer = context.committee.to_authority_index(1).unwrap();
1628
1629 let mut stream = authority_service
1631 .handle_subscribe_blocks(peer, 0)
1632 .await
1633 .unwrap();
1634
1635 use futures::poll;
1637 use std::task::Poll;
1638 let poll_result = poll!(stream.next());
1639 assert!(
1640 matches!(poll_result, Poll::Pending),
1641 "Should not receive genesis block on subscription stream"
1642 );
1643 }
1644}