1use std::{
5 cmp::Reverse,
6 collections::{BTreeMap, BTreeSet, BinaryHeap},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27 CommitIndex,
28 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29 block_verifier::BlockVerifier,
30 commit::{CommitAPI as _, CommitRange, TrustedCommit},
31 commit_vote_monitor::CommitVoteMonitor,
32 context::Context,
33 core_thread::CoreThreadDispatcher,
34 dag_state::DagState,
35 error::{ConsensusError, ConsensusResult},
36 network::{
37 BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38 ObserverNetworkService, PeerId, ValidatorNetworkService,
39 },
40 round_tracker::RoundTracker,
41 stake_aggregator::{QuorumThreshold, StakeAggregator},
42 storage::Store,
43 synchronizer::SynchronizerHandle,
44 transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51 context: Arc<Context>,
52 commit_vote_monitor: Arc<CommitVoteMonitor>,
53 block_verifier: Arc<dyn BlockVerifier>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57 subscription_counter: Arc<SubscriptionCounter>,
58 transaction_vote_tracker: TransactionVoteTracker,
59 dag_state: Arc<RwLock<DagState>>,
60 store: Arc<dyn Store>,
61 round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65 pub(crate) fn new(
66 context: Arc<Context>,
67 block_verifier: Arc<dyn BlockVerifier>,
68 commit_vote_monitor: Arc<CommitVoteMonitor>,
69 round_tracker: Arc<RwLock<RoundTracker>>,
70 synchronizer: Arc<SynchronizerHandle>,
71 core_dispatcher: Arc<C>,
72 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73 transaction_vote_tracker: TransactionVoteTracker,
74 dag_state: Arc<RwLock<DagState>>,
75 store: Arc<dyn Store>,
76 ) -> Self {
77 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78 Self {
79 context,
80 block_verifier,
81 commit_vote_monitor,
82 synchronizer,
83 core_dispatcher,
84 rx_block_broadcast,
85 subscription_counter,
86 transaction_vote_tracker,
87 dag_state,
88 store,
89 round_tracker,
90 }
91 }
92
93 fn parse_excluded_ancestors(
95 &self,
96 peer: AuthorityIndex,
97 block: &VerifiedBlock,
98 mut excluded_ancestors: Vec<Vec<u8>>,
99 ) -> ConsensusResult<Vec<BlockRef>> {
100 let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102 let excluded_ancestors_limit = self.context.committee.size() * 2;
103 if excluded_ancestors.len() > excluded_ancestors_limit {
104 debug!(
105 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106 excluded_ancestors.len() - excluded_ancestors_limit,
107 peer,
108 peer_hostname,
109 );
110 excluded_ancestors.truncate(excluded_ancestors_limit);
111 }
112
113 let excluded_ancestors = excluded_ancestors
114 .into_iter()
115 .map(|serialized| {
116 let block_ref: BlockRef =
117 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118 if !self.context.committee.is_valid_index(block_ref.author) {
119 return Err(ConsensusError::InvalidAuthorityIndex {
120 index: block_ref.author,
121 max: self.context.committee.size(),
122 });
123 }
124 if block_ref.round >= block.round() {
125 return Err(ConsensusError::InvalidAncestorRound {
126 ancestor: block_ref.round,
127 block: block.round(),
128 });
129 }
130 Ok(block_ref)
131 })
132 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134 for excluded_ancestor in &excluded_ancestors {
135 let excluded_ancestor_hostname = &self
136 .context
137 .committee
138 .authority(excluded_ancestor.author)
139 .hostname;
140 self.context
141 .metrics
142 .node_metrics
143 .network_excluded_ancestors_count_by_authority
144 .with_label_values(&[excluded_ancestor_hostname])
145 .inc();
146 }
147 self.context
148 .metrics
149 .node_metrics
150 .network_received_excluded_ancestors_from_authority
151 .with_label_values(&[peer_hostname])
152 .inc_by(excluded_ancestors.len() as u64);
153
154 Ok(excluded_ancestors)
155 }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160 async fn handle_send_block(
161 &self,
162 peer: AuthorityIndex,
163 serialized_block: ExtendedSerializedBlock,
164 ) -> ConsensusResult<()> {
165 fail_point_async!("consensus-rpc-response");
166
167 let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169 let signed_block: SignedBlock =
171 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173 if peer != signed_block.author() {
175 self.context
176 .metrics
177 .node_metrics
178 .invalid_blocks
179 .with_label_values(&[
180 peer_hostname.as_str(),
181 "handle_send_block",
182 "UnexpectedAuthority",
183 ])
184 .inc();
185 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186 info!("Block with wrong authority from {}: {}", peer, e);
187 return Err(e);
188 }
189
190 let (verified_block, reject_txn_votes) = self
192 .block_verifier
193 .verify_and_vote(signed_block, serialized_block.block)
194 .tap_err(|e| {
195 self.context
196 .metrics
197 .node_metrics
198 .invalid_blocks
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200 .inc();
201 info!("Invalid block from {}: {}", peer, e);
202 })?;
203 let excluded_ancestors = self
204 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205 .tap_err(|e| {
206 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207 self.context
208 .metrics
209 .node_metrics
210 .invalid_blocks
211 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212 .inc();
213 })?;
214
215 let block_ref = verified_block.reference();
216 debug!("Received block {} via send block.", block_ref);
217
218 self.context
219 .metrics
220 .node_metrics
221 .verified_blocks
222 .with_label_values(&[peer_hostname])
223 .inc();
224
225 let now = self.context.clock.timestamp_utc_ms();
226 let forward_time_drift =
227 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229 self.context
230 .metrics
231 .node_metrics
232 .block_timestamp_drift_ms
233 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234 .inc_by(forward_time_drift.as_millis() as u64);
235
236 self.commit_vote_monitor.observe_block(&verified_block);
239
240 self.round_tracker
242 .write()
243 .update_from_verified_block(&ExtendedBlock {
244 block: verified_block.clone(),
245 excluded_ancestors: excluded_ancestors.clone(),
246 });
247
248 let last_commit_index = self.dag_state.read().last_commit_index();
257 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258 if last_commit_index
261 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262 < quorum_commit_index
263 {
264 self.context
265 .metrics
266 .node_metrics
267 .rejected_blocks
268 .with_label_values(&["commit_lagging"])
269 .inc();
270 debug!(
271 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272 block_ref, last_commit_index, quorum_commit_index,
273 );
274 return Err(ConsensusError::BlockRejected {
275 block_ref,
276 reason: format!(
277 "Last commit index is lagging quorum commit index too much ({} < {})",
278 last_commit_index, quorum_commit_index,
279 ),
280 });
281 }
282
283 if self.context.protocol_config.transaction_voting_enabled() {
286 self.transaction_vote_tracker
287 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288 }
289
290 let missing_ancestors = self
292 .core_dispatcher
293 .add_blocks(vec![verified_block.clone()])
294 .await
295 .map_err(|_| ConsensusError::Shutdown)?;
296
297 if !missing_ancestors.is_empty() {
299 self.context
300 .metrics
301 .node_metrics
302 .handler_received_block_missing_ancestors
303 .with_label_values(&[peer_hostname])
304 .inc_by(missing_ancestors.len() as u64);
305 let synchronizer = self.synchronizer.clone();
306 spawn_monitored_task!(async move {
307 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
312 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
313 }
314 });
315 }
316
317 let missing_excluded_ancestors = self
319 .core_dispatcher
320 .check_block_refs(excluded_ancestors)
321 .await
322 .map_err(|_| ConsensusError::Shutdown)?;
323 if !missing_excluded_ancestors.is_empty() {
324 self.context
325 .metrics
326 .node_metrics
327 .network_excluded_ancestors_sent_to_fetch
328 .with_label_values(&[peer_hostname])
329 .inc_by(missing_excluded_ancestors.len() as u64);
330
331 let synchronizer = self.synchronizer.clone();
332 spawn_monitored_task!(async move {
333 if let Err(err) = synchronizer
334 .fetch_blocks(missing_excluded_ancestors, peer)
335 .await
336 {
337 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
338 }
339 });
340 }
341
342 Ok(())
343 }
344
345 async fn handle_subscribe_blocks(
346 &self,
347 peer: AuthorityIndex,
348 last_received: Round,
349 ) -> ConsensusResult<BlockStream> {
350 fail_point_async!("consensus-rpc-response");
351
352 let past_proposed_blocks = {
361 let dag_state = self.dag_state.read();
362
363 let mut proposed_blocks =
364 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
365 if proposed_blocks.is_empty() {
366 let last_proposed_block = dag_state
367 .get_last_proposed_block()
368 .expect("Last proposed block should be returned on validators");
369 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
370 vec![last_proposed_block]
371 } else {
372 vec![]
373 };
374 }
375 stream::iter(
376 proposed_blocks
377 .into_iter()
378 .map(|block| ExtendedSerializedBlock {
379 block: block.serialized().clone(),
380 excluded_ancestors: vec![],
381 }),
382 )
383 };
384
385 let broadcasted_blocks = BroadcastedBlockStream::new(
386 PeerId::Validator(peer),
387 self.rx_block_broadcast.resubscribe(),
388 self.subscription_counter.clone(),
389 );
390
391 Ok(Box::pin(past_proposed_blocks.chain(
393 broadcasted_blocks.map(ExtendedSerializedBlock::from),
394 )))
395 }
396
397 async fn handle_fetch_blocks(
413 &self,
414 _peer: AuthorityIndex,
415 mut block_refs: Vec<BlockRef>,
416 fetch_after_rounds: Vec<Round>,
417 fetch_missing_ancestors: bool,
418 ) -> ConsensusResult<Vec<Bytes>> {
419 fail_point_async!("consensus-rpc-response");
420
421 if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
422 return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
423 }
424 if !fetch_after_rounds.is_empty()
425 && fetch_after_rounds.len() != self.context.committee.size()
426 {
427 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
428 fetch_after_rounds.len(),
429 self.context.committee.size(),
430 ));
431 }
432
433 let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
435 self.context.parameters.max_blocks_per_sync
436 } else {
437 self.context.parameters.max_blocks_per_fetch
438 };
439 if block_refs.len() > max_response_num_blocks {
440 block_refs.truncate(max_response_num_blocks);
441 }
442
443 for block in &block_refs {
445 if !self.context.committee.is_valid_index(block.author) {
446 return Err(ConsensusError::InvalidAuthorityIndex {
447 index: block.author,
448 max: self.context.committee.size(),
449 });
450 }
451 if block.round == GENESIS_ROUND {
452 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
453 }
454 }
455
456 let mut blocks = self
458 .dag_state
459 .read()
460 .get_blocks(&block_refs)
461 .into_iter()
462 .flatten()
463 .collect::<Vec<_>>();
464
465 if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
468 if fetch_missing_ancestors {
469 let missing_ancestors = blocks
472 .iter()
473 .flat_map(|block| block.ancestors().to_vec())
474 .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
475 .collect::<BTreeSet<_>>()
476 .into_iter()
477 .collect::<Vec<_>>();
478
479 let selected_num_blocks = max_response_num_blocks
482 .saturating_sub(blocks.len())
483 .min(missing_ancestors.len());
484 if selected_num_blocks > 0 {
485 let selected_ancestor_refs = missing_ancestors
486 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
487 .copied()
488 .collect::<Vec<_>>();
489 let ancestor_blocks = self
490 .dag_state
491 .read()
492 .get_blocks(&selected_ancestor_refs)
493 .into_iter()
494 .flatten();
495 blocks.extend(ancestor_blocks);
496 }
497 } else {
498 let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
501 if block_refs.is_empty() {
502 let dag_state = self.dag_state.read();
503 for (index, _authority) in self.context.committee.authorities() {
504 let last_block = dag_state.get_last_block_for_authority(index);
505 limit_rounds.insert(index, last_block.round());
506 }
507 } else {
508 for block_ref in &block_refs {
509 let entry = limit_rounds
510 .entry(block_ref.author)
511 .or_insert(block_ref.round);
512 *entry = (*entry).min(block_ref.round);
513 }
514 }
515
516 let mut heap = BinaryHeap::new();
519 for (authority, limit_round) in &limit_rounds {
520 let fetch_start = fetch_after_rounds[*authority] + 1;
521 if fetch_start < *limit_round {
522 heap.push(Reverse((fetch_start, *authority, *limit_round)));
523 }
524 }
525
526 while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
527 let fetched = self.store.scan_blocks_by_author_in_range(
528 authority,
529 fetch_start,
530 limit_round,
531 1,
532 )?;
533 if let Some(block) = fetched.into_iter().next() {
534 let next_start = block.round() + 1;
535 blocks.push(block);
536 if blocks.len() >= max_response_num_blocks {
537 blocks.truncate(max_response_num_blocks);
538 break;
539 }
540 if next_start < limit_round {
541 heap.push(Reverse((next_start, authority, limit_round)));
542 }
543 }
544 }
545 }
546 }
547
548 let bytes = blocks
550 .into_iter()
551 .map(|block| block.serialized().clone())
552 .collect::<Vec<_>>();
553 Ok(bytes)
554 }
555
556 async fn handle_fetch_commits(
557 &self,
558 _peer: AuthorityIndex,
559 commit_range: CommitRange,
560 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
561 fail_point_async!("consensus-rpc-response");
562
563 let inclusive_end = commit_range.end().min(
565 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
566 - 1,
567 );
568 let mut commits = self
569 .store
570 .scan_commits((commit_range.start()..=inclusive_end).into())?;
571 let mut certifier_block_refs = vec![];
572 'commit: while let Some(c) = commits.last() {
573 let index = c.index();
574 let votes = self.store.read_commit_votes(index)?;
575 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
576 for v in &votes {
577 stake_aggregator.add(v.author, &self.context.committee);
578 }
579 if stake_aggregator.reached_threshold(&self.context.committee) {
580 certifier_block_refs = votes;
581 break 'commit;
582 } else {
583 debug!(
584 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
585 index,
586 stake_aggregator.stake(),
587 stake_aggregator.threshold(&self.context.committee)
588 );
589 self.context
590 .metrics
591 .node_metrics
592 .commit_sync_fetch_commits_handler_uncertified_skipped
593 .inc();
594 commits.pop();
595 }
596 }
597 let certifier_blocks = self
598 .store
599 .read_blocks(&certifier_block_refs)?
600 .into_iter()
601 .flatten()
602 .collect();
603 Ok((commits, certifier_blocks))
604 }
605
606 async fn handle_fetch_latest_blocks(
607 &self,
608 peer: AuthorityIndex,
609 authorities: Vec<AuthorityIndex>,
610 ) -> ConsensusResult<Vec<Bytes>> {
611 fail_point_async!("consensus-rpc-response");
612
613 if authorities.len() > self.context.committee.size() {
614 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
615 }
616
617 for authority in &authorities {
619 if !self.context.committee.is_valid_index(*authority) {
620 return Err(ConsensusError::InvalidAuthorityIndex {
621 index: *authority,
622 max: self.context.committee.size(),
623 });
624 }
625 }
626
627 let mut blocks = vec![];
631 let dag_state = self.dag_state.read();
632 for authority in authorities {
633 let block = dag_state.get_last_block_for_authority(authority);
634
635 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
636
637 if block.round() != GENESIS_ROUND {
639 blocks.push(block);
640 }
641 }
642
643 let result = blocks
645 .into_iter()
646 .map(|block| block.serialized().clone())
647 .collect::<Vec<_>>();
648
649 Ok(result)
650 }
651
652 async fn handle_get_latest_rounds(
653 &self,
654 _peer: AuthorityIndex,
655 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
656 fail_point_async!("consensus-rpc-response");
657
658 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
659
660 let blocks = self
661 .dag_state
662 .read()
663 .get_last_cached_block_per_authority(Round::MAX);
664 let highest_accepted_rounds = blocks
665 .into_iter()
666 .map(|(block, _)| block.round())
667 .collect::<Vec<_>>();
668
669 Ok((highest_received_rounds, highest_accepted_rounds))
670 }
671}
672
673#[async_trait]
674impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
675 async fn handle_block(
676 &self,
677 _peer: PeerId,
678 _item: ObserverBlockStreamItem,
679 ) -> ConsensusResult<()> {
680 Err(ConsensusError::NetworkRequest(
682 "Observer block handling not yet implemented".to_string(),
683 ))
684 }
685
686 async fn handle_stream_blocks(
687 &self,
688 _peer: NodeId,
689 _highest_round_per_authority: Vec<u64>,
690 ) -> ConsensusResult<ObserverBlockStream> {
691 todo!("Observer block streaming not yet implemented")
693 }
694
695 async fn handle_fetch_blocks(
696 &self,
697 _peer: NodeId,
698 _block_refs: Vec<BlockRef>,
699 ) -> ConsensusResult<Vec<Bytes>> {
700 Err(ConsensusError::NetworkRequest(
703 "Observer fetch blocks not yet implemented".to_string(),
704 ))
705 }
706
707 async fn handle_fetch_commits(
708 &self,
709 _peer: NodeId,
710 _commit_range: CommitRange,
711 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
712 Err(ConsensusError::NetworkRequest(
714 "Observer fetch commits not yet implemented".to_string(),
715 ))
716 }
717}
718struct Counter {
719 count: usize,
720 subscriptions_by_peer: BTreeMap<PeerId, usize>,
721}
722
723pub(crate) struct SubscriptionCounter {
725 context: Arc<Context>,
726 counter: parking_lot::Mutex<Counter>,
727}
728
729impl SubscriptionCounter {
730 pub(crate) fn new(context: Arc<Context>) -> Self {
731 for (_, authority) in context.committee.authorities() {
733 context
734 .metrics
735 .node_metrics
736 .subscribed_by
737 .with_label_values(&[authority.hostname.as_str()])
738 .set(0);
739 }
740
741 Self {
742 counter: parking_lot::Mutex::new(Counter {
743 count: 0,
744 subscriptions_by_peer: BTreeMap::new(),
745 }),
746 context,
747 }
748 }
749
750 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
751 let mut counter = self.counter.lock();
752 counter.count += 1;
753 *counter
754 .subscriptions_by_peer
755 .entry(peer.clone())
756 .or_insert(0) += 1;
757
758 match peer {
759 PeerId::Validator(authority) => {
760 let peer_hostname = &self.context.committee.authority(*authority).hostname;
761 self.context
762 .metrics
763 .node_metrics
764 .subscribed_by
765 .with_label_values(&[peer_hostname])
766 .set(1);
767 }
768 PeerId::Observer(_) => {
769 self.context
770 .metrics
771 .node_metrics
772 .subscribed_by
773 .with_label_values(&["observer"])
774 .inc();
775 }
776 }
777
778 Ok(())
779 }
780
781 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
782 let mut counter = self.counter.lock();
783 counter.count -= 1;
784 *counter
785 .subscriptions_by_peer
786 .entry(peer.clone())
787 .or_insert(0) -= 1;
788
789 if counter.subscriptions_by_peer[peer] == 0 {
790 match peer {
791 PeerId::Validator(authority) => {
792 let peer_hostname = &self.context.committee.authority(*authority).hostname;
793 self.context
794 .metrics
795 .node_metrics
796 .subscribed_by
797 .with_label_values(&[peer_hostname])
798 .set(0);
799 }
800 PeerId::Observer(_) => {
801 self.context
802 .metrics
803 .node_metrics
804 .subscribed_by
805 .with_label_values(&["observer"])
806 .dec();
807 }
808 }
809 }
810
811 Ok(())
812 }
813}
814
815type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
818
819pub(crate) struct BroadcastStream<T> {
822 peer: PeerId,
823 inner: ReusableBoxFuture<
825 'static,
826 (
827 Result<T, broadcast::error::RecvError>,
828 broadcast::Receiver<T>,
829 ),
830 >,
831 subscription_counter: Arc<SubscriptionCounter>,
833}
834
835impl<T: 'static + Clone + Send> BroadcastStream<T> {
836 pub fn new(
837 peer: PeerId,
838 rx: broadcast::Receiver<T>,
839 subscription_counter: Arc<SubscriptionCounter>,
840 ) -> Self {
841 if let Err(err) = subscription_counter.increment(&peer) {
842 match err {
843 ConsensusError::Shutdown => {}
844 _ => panic!("Unexpected error: {err}"),
845 }
846 }
847 Self {
848 peer,
849 inner: ReusableBoxFuture::new(make_recv_future(rx)),
850 subscription_counter,
851 }
852 }
853}
854
855impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
856 type Item = T;
857
858 fn poll_next(
859 mut self: Pin<&mut Self>,
860 cx: &mut task::Context<'_>,
861 ) -> task::Poll<Option<Self::Item>> {
862 let peer = self.peer.clone();
863 let maybe_item = loop {
864 let (result, rx) = ready!(self.inner.poll(cx));
865 self.inner.set(make_recv_future(rx));
866
867 match result {
868 Ok(item) => break Some(item),
869 Err(broadcast::error::RecvError::Closed) => {
870 info!("Block BroadcastedBlockStream {} closed", peer);
871 break None;
872 }
873 Err(broadcast::error::RecvError::Lagged(n)) => {
874 warn!(
875 "Block BroadcastedBlockStream {} lagged by {} messages",
876 peer, n
877 );
878 continue;
879 }
880 }
881 };
882 task::Poll::Ready(maybe_item)
883 }
884}
885
886impl<T> Drop for BroadcastStream<T> {
887 fn drop(&mut self) {
888 if let Err(err) = self.subscription_counter.decrement(&self.peer) {
889 match err {
890 ConsensusError::Shutdown => {}
891 _ => panic!("Unexpected error: {err}"),
892 }
893 }
894 }
895}
896
897async fn make_recv_future<T: Clone>(
898 mut rx: broadcast::Receiver<T>,
899) -> (
900 Result<T, broadcast::error::RecvError>,
901 broadcast::Receiver<T>,
902) {
903 let result = rx.recv().await;
904 (result, rx)
905}
906
907#[cfg(test)]
910mod tests {
911 use std::{
912 collections::{BTreeMap, BTreeSet},
913 sync::Arc,
914 time::Duration,
915 };
916
917 use async_trait::async_trait;
918 use bytes::Bytes;
919 use consensus_config::AuthorityIndex;
920 use consensus_types::block::{BlockDigest, BlockRef, Round};
921 use parking_lot::{Mutex, RwLock};
922 use tokio::{sync::broadcast, time::sleep};
923
924 use futures::StreamExt as _;
925
926 use crate::{
927 authority_service::AuthorityService,
928 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
929 commit::{CertifiedCommits, CommitRange},
930 commit_vote_monitor::CommitVoteMonitor,
931 context::Context,
932 core_thread::{CoreError, CoreThreadDispatcher},
933 dag_state::DagState,
934 error::ConsensusResult,
935 network::{
936 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
937 ValidatorNetworkClient, ValidatorNetworkService,
938 },
939 round_tracker::RoundTracker,
940 storage::mem_store::MemStore,
941 synchronizer::Synchronizer,
942 test_dag_builder::DagBuilder,
943 transaction_vote_tracker::TransactionVoteTracker,
944 };
945 struct FakeCoreThreadDispatcher {
946 blocks: Mutex<Vec<VerifiedBlock>>,
947 }
948
949 impl FakeCoreThreadDispatcher {
950 fn new() -> Self {
951 Self {
952 blocks: Mutex::new(vec![]),
953 }
954 }
955
956 fn get_blocks(&self) -> Vec<VerifiedBlock> {
957 self.blocks.lock().clone()
958 }
959 }
960
961 #[async_trait]
962 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
963 async fn add_blocks(
964 &self,
965 blocks: Vec<VerifiedBlock>,
966 ) -> Result<BTreeSet<BlockRef>, CoreError> {
967 let block_refs = blocks.iter().map(|b| b.reference()).collect();
968 self.blocks.lock().extend(blocks);
969 Ok(block_refs)
970 }
971
972 async fn check_block_refs(
973 &self,
974 _block_refs: Vec<BlockRef>,
975 ) -> Result<BTreeSet<BlockRef>, CoreError> {
976 Ok(BTreeSet::new())
977 }
978
979 async fn add_certified_commits(
980 &self,
981 _commits: CertifiedCommits,
982 ) -> Result<BTreeSet<BlockRef>, CoreError> {
983 todo!()
984 }
985
986 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
987 Ok(())
988 }
989
990 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
991 Ok(Default::default())
992 }
993
994 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
995 todo!()
996 }
997
998 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
999 todo!()
1000 }
1001 }
1002
1003 #[derive(Default)]
1004 struct FakeNetworkClient {}
1005
1006 #[async_trait]
1007 impl ValidatorNetworkClient for FakeNetworkClient {
1008 async fn send_block(
1009 &self,
1010 _peer: AuthorityIndex,
1011 _block: &VerifiedBlock,
1012 _timeout: Duration,
1013 ) -> ConsensusResult<()> {
1014 unimplemented!("Unimplemented")
1015 }
1016
1017 async fn subscribe_blocks(
1018 &self,
1019 _peer: AuthorityIndex,
1020 _last_received: Round,
1021 _timeout: Duration,
1022 ) -> ConsensusResult<BlockStream> {
1023 unimplemented!("Unimplemented")
1024 }
1025
1026 async fn fetch_blocks(
1027 &self,
1028 _peer: AuthorityIndex,
1029 _block_refs: Vec<BlockRef>,
1030 _fetch_after_rounds: Vec<Round>,
1031 _fetch_missing_ancestors: bool,
1032 _timeout: Duration,
1033 ) -> ConsensusResult<Vec<Bytes>> {
1034 unimplemented!("Unimplemented")
1035 }
1036
1037 async fn fetch_commits(
1038 &self,
1039 _peer: AuthorityIndex,
1040 _commit_range: CommitRange,
1041 _timeout: Duration,
1042 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1043 unimplemented!("Unimplemented")
1044 }
1045
1046 async fn fetch_latest_blocks(
1047 &self,
1048 _peer: AuthorityIndex,
1049 _authorities: Vec<AuthorityIndex>,
1050 _timeout: Duration,
1051 ) -> ConsensusResult<Vec<Bytes>> {
1052 unimplemented!("Unimplemented")
1053 }
1054
1055 async fn get_latest_rounds(
1056 &self,
1057 _peer: AuthorityIndex,
1058 _timeout: Duration,
1059 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1060 unimplemented!("Unimplemented")
1061 }
1062 }
1063
1064 #[async_trait]
1065 impl ObserverNetworkClient for FakeNetworkClient {
1066 async fn stream_blocks(
1067 &self,
1068 _peer: crate::network::PeerId,
1069 _highest_round_per_authority: Vec<u64>,
1070 _timeout: Duration,
1071 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1072 unimplemented!("Unimplemented")
1073 }
1074
1075 async fn fetch_blocks(
1076 &self,
1077 _peer: crate::network::PeerId,
1078 _block_refs: Vec<BlockRef>,
1079 _timeout: Duration,
1080 ) -> ConsensusResult<Vec<Bytes>> {
1081 unimplemented!("Unimplemented")
1082 }
1083
1084 async fn fetch_commits(
1085 &self,
1086 _peer: crate::network::PeerId,
1087 _commit_range: CommitRange,
1088 _timeout: Duration,
1089 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1090 unimplemented!("Unimplemented")
1091 }
1092 }
1093
1094 #[tokio::test(flavor = "current_thread", start_paused = true)]
1095 async fn test_handle_send_block() {
1096 let (context, _keys) = Context::new_for_test(4);
1097 let context = Arc::new(context);
1098 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1099 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1100 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1101 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1102 let fake_client = Arc::new(FakeNetworkClient::default());
1103 let network_client = Arc::new(SynchronizerClient::new(
1104 context.clone(),
1105 Some(fake_client.clone()),
1106 Some(fake_client.clone()),
1107 ));
1108 let store = Arc::new(MemStore::new());
1109 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1110 let transaction_vote_tracker =
1111 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1112 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1113 let synchronizer = Synchronizer::start(
1114 network_client,
1115 context.clone(),
1116 core_dispatcher.clone(),
1117 commit_vote_monitor.clone(),
1118 block_verifier.clone(),
1119 transaction_vote_tracker.clone(),
1120 round_tracker.clone(),
1121 dag_state.clone(),
1122 false,
1123 );
1124 let authority_service = Arc::new(AuthorityService::new(
1125 context.clone(),
1126 block_verifier,
1127 commit_vote_monitor,
1128 round_tracker,
1129 synchronizer,
1130 core_dispatcher.clone(),
1131 rx_block_broadcast,
1132 transaction_vote_tracker,
1133 dag_state,
1134 store,
1135 ));
1136
1137 let now = context.clock.timestamp_utc_ms();
1139 let max_drift = context.parameters.max_forward_time_drift;
1140 let input_block = VerifiedBlock::new_for_test(
1141 TestBlock::new(9, 0)
1142 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1143 .build(),
1144 );
1145
1146 let service = authority_service.clone();
1147 let serialized = ExtendedSerializedBlock {
1148 block: input_block.serialized().clone(),
1149 excluded_ancestors: vec![],
1150 };
1151
1152 tokio::spawn({
1153 let service = service.clone();
1154 let context = context.clone();
1155 async move {
1156 service
1157 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1158 .await
1159 .unwrap();
1160 }
1161 });
1162
1163 sleep(max_drift / 2).await;
1164
1165 let blocks = core_dispatcher.get_blocks();
1166 assert_eq!(blocks.len(), 1);
1167 assert_eq!(blocks[0], input_block);
1168
1169 let invalid_block =
1171 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1172 let extended_block = ExtendedSerializedBlock {
1173 block: invalid_block.serialized().clone(),
1174 excluded_ancestors: vec![],
1175 };
1176 service
1177 .handle_send_block(
1178 context.committee.to_authority_index(0).unwrap(),
1179 extended_block,
1180 )
1181 .await
1182 .unwrap_err();
1183
1184 let invalid_excluded_ancestors = vec![
1186 bcs::to_bytes(&BlockRef::new(
1187 10,
1188 AuthorityIndex::new_for_test(1000),
1189 BlockDigest::MIN,
1190 ))
1191 .unwrap(),
1192 vec![3u8; 40],
1193 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1194 ];
1195 let extended_block = ExtendedSerializedBlock {
1196 block: input_block.serialized().clone(),
1197 excluded_ancestors: invalid_excluded_ancestors,
1198 };
1199 service
1200 .handle_send_block(
1201 context.committee.to_authority_index(0).unwrap(),
1202 extended_block,
1203 )
1204 .await
1205 .unwrap_err();
1206 }
1207
1208 #[tokio::test(flavor = "current_thread", start_paused = true)]
1209 async fn test_handle_fetch_blocks() {
1210 const NUM_AUTHORITIES: usize = 40;
1213 const NUM_ROUNDS: usize = 40;
1214 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1215 context.parameters.max_blocks_per_fetch = 50;
1216 let context = Arc::new(context);
1217 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1218 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1219 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1220 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1221 let fake_client = Arc::new(FakeNetworkClient::default());
1222 let network_client = Arc::new(SynchronizerClient::new(
1223 context.clone(),
1224 Some(fake_client.clone()),
1225 Some(fake_client.clone()),
1226 ));
1227 let store = Arc::new(MemStore::new());
1228 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1229 let transaction_vote_tracker =
1230 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1231 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1232 let synchronizer = Synchronizer::start(
1233 network_client,
1234 context.clone(),
1235 core_dispatcher.clone(),
1236 commit_vote_monitor.clone(),
1237 block_verifier.clone(),
1238 transaction_vote_tracker.clone(),
1239 round_tracker.clone(),
1240 dag_state.clone(),
1241 false,
1242 );
1243 let authority_service = Arc::new(AuthorityService::new(
1244 context.clone(),
1245 block_verifier,
1246 commit_vote_monitor,
1247 round_tracker,
1248 synchronizer,
1249 core_dispatcher.clone(),
1250 rx_block_broadcast,
1251 transaction_vote_tracker,
1252 dag_state.clone(),
1253 store,
1254 ));
1255
1256 let mut dag_builder = DagBuilder::new(context.clone());
1258 dag_builder
1259 .layers(1..=(NUM_ROUNDS as u32))
1260 .build()
1261 .persist_layers(dag_state.clone());
1262 dag_state.write().flush();
1263 let all_blocks = dag_builder.all_blocks();
1264
1265 let missing_block_refs: Vec<BlockRef> = all_blocks
1267 .iter()
1268 .rev()
1269 .take(2)
1270 .map(|b| b.reference())
1271 .collect();
1272 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1273 let results = authority_service
1274 .handle_fetch_blocks(
1275 AuthorityIndex::new_for_test(0),
1276 missing_block_refs.clone(),
1277 highest_accepted_rounds,
1278 true,
1279 )
1280 .await
1281 .unwrap();
1282
1283 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1285 .iter()
1286 .map(|b| {
1287 let signed = bcs::from_bytes(b).unwrap();
1288 let block = VerifiedBlock::new_verified(signed, b.clone());
1289 (block.reference(), block)
1290 })
1291 .collect();
1292 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1293 for b in &missing_block_refs {
1295 assert!(blocks.contains_key(b));
1296 }
1297 let num_missing_ancestors = blocks
1298 .keys()
1299 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1300 .count();
1301 assert_eq!(
1302 num_missing_ancestors,
1303 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1304 );
1305
1306 let missing_round = NUM_ROUNDS as Round - 3;
1308 let missing_block_refs: Vec<BlockRef> = all_blocks
1309 .iter()
1310 .filter(|b| b.reference().round == missing_round)
1311 .map(|b| b.reference())
1312 .take(2)
1313 .collect();
1314 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1315 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1317 let results = authority_service
1318 .handle_fetch_blocks(
1319 AuthorityIndex::new_for_test(0),
1320 missing_block_refs.clone(),
1321 highest_accepted_rounds,
1322 false,
1323 )
1324 .await
1325 .unwrap();
1326
1327 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1329 .iter()
1330 .map(|b| {
1331 let signed = bcs::from_bytes(b).unwrap();
1332 let block = VerifiedBlock::new_verified(signed, b.clone());
1333 (block.reference(), block)
1334 })
1335 .collect();
1336 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1337 for b in &missing_block_refs {
1339 assert!(blocks.contains_key(b));
1340 }
1341 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1343 for b in blocks.keys() {
1344 assert!(b.round <= missing_round);
1345 assert!(expected_authors.contains(&b.author));
1346 }
1347
1348 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1350 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1352 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1353 let results = authority_service
1354 .handle_fetch_blocks(
1355 AuthorityIndex::new_for_test(0),
1356 vec![],
1357 highest_accepted_rounds.clone(),
1358 false,
1359 )
1360 .await
1361 .unwrap();
1362
1363 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1365 .iter()
1366 .map(|b| {
1367 let signed = bcs::from_bytes(b).unwrap();
1368 let block = VerifiedBlock::new_verified(signed, b.clone());
1369 (block.reference(), block)
1370 })
1371 .collect();
1372 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1373 for block_ref in blocks.keys() {
1375 let accepted = highest_accepted_rounds[block_ref.author];
1376 assert!(block_ref.round > accepted);
1377 }
1378 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1381 assert!(
1384 max_round_in_result <= 4,
1385 "Expected low rounds from fair round-order fetching, got max round {}",
1386 max_round_in_result
1387 );
1388
1389 let missing_block_refs: Vec<BlockRef> = all_blocks
1391 .iter()
1392 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1393 .map(|b| b.reference())
1394 .take(5)
1395 .collect();
1396 let results = authority_service
1397 .handle_fetch_blocks(
1398 AuthorityIndex::new_for_test(0),
1399 missing_block_refs.clone(),
1400 vec![],
1401 false,
1402 )
1403 .await
1404 .unwrap();
1405
1406 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1408 .iter()
1409 .map(|b| {
1410 let signed = bcs::from_bytes(b).unwrap();
1411 let block = VerifiedBlock::new_verified(signed, b.clone());
1412 (block.reference(), block)
1413 })
1414 .collect();
1415 assert_eq!(blocks.len(), 5);
1416 for b in &missing_block_refs {
1417 assert!(blocks.contains_key(b));
1418 }
1419 }
1420
1421 #[tokio::test(flavor = "current_thread", start_paused = true)]
1422 async fn test_handle_fetch_latest_blocks() {
1423 let (context, _keys) = Context::new_for_test(4);
1425 let context = Arc::new(context);
1426 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1427 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1428 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1429 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1430 let fake_client = Arc::new(FakeNetworkClient::default());
1431 let network_client = Arc::new(SynchronizerClient::new(
1432 context.clone(),
1433 Some(fake_client.clone()),
1434 Some(fake_client.clone()),
1435 ));
1436 let store = Arc::new(MemStore::new());
1437 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1438 let transaction_vote_tracker =
1439 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1440 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1441 let synchronizer = Synchronizer::start(
1442 network_client,
1443 context.clone(),
1444 core_dispatcher.clone(),
1445 commit_vote_monitor.clone(),
1446 block_verifier.clone(),
1447 transaction_vote_tracker.clone(),
1448 round_tracker.clone(),
1449 dag_state.clone(),
1450 true,
1451 );
1452 let authority_service = Arc::new(AuthorityService::new(
1453 context.clone(),
1454 block_verifier,
1455 commit_vote_monitor,
1456 round_tracker,
1457 synchronizer,
1458 core_dispatcher.clone(),
1459 rx_block_broadcast,
1460 transaction_vote_tracker,
1461 dag_state.clone(),
1462 store,
1463 ));
1464
1465 let mut dag_builder = DagBuilder::new(context.clone());
1467 dag_builder
1468 .layers(1..=10)
1469 .authorities(vec![AuthorityIndex::new_for_test(2)])
1470 .equivocate(1)
1471 .build()
1472 .persist_layers(dag_state);
1473
1474 let authorities_to_request = vec![
1476 AuthorityIndex::new_for_test(1),
1477 AuthorityIndex::new_for_test(2),
1478 ];
1479 let results = authority_service
1480 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1481 .await;
1482
1483 let serialised_blocks = results.unwrap();
1485 for serialised_block in serialised_blocks {
1486 let signed_block: SignedBlock =
1487 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1488 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1489
1490 assert_eq!(verified_block.round(), 10);
1491 }
1492 }
1493
1494 #[tokio::test(flavor = "current_thread", start_paused = true)]
1495 async fn test_handle_subscribe_blocks() {
1496 let (context, _keys) = Context::new_for_test(4);
1497 let context = Arc::new(context);
1498 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1499 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1500 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1501 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1502 let fake_client = Arc::new(FakeNetworkClient::default());
1503 let network_client = Arc::new(SynchronizerClient::new(
1504 context.clone(),
1505 Some(fake_client.clone()),
1506 Some(fake_client.clone()),
1507 ));
1508 let store = Arc::new(MemStore::new());
1509 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1510 let transaction_vote_tracker =
1511 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1512 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1513 let synchronizer = Synchronizer::start(
1514 network_client,
1515 context.clone(),
1516 core_dispatcher.clone(),
1517 commit_vote_monitor.clone(),
1518 block_verifier.clone(),
1519 transaction_vote_tracker.clone(),
1520 round_tracker.clone(),
1521 dag_state.clone(),
1522 false,
1523 );
1524
1525 dag_state
1527 .write()
1528 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1529 dag_state
1530 .write()
1531 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1532 dag_state
1533 .write()
1534 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1535
1536 let authority_service = Arc::new(AuthorityService::new(
1537 context.clone(),
1538 block_verifier,
1539 commit_vote_monitor,
1540 round_tracker,
1541 synchronizer,
1542 core_dispatcher.clone(),
1543 rx_block_broadcast,
1544 transaction_vote_tracker,
1545 dag_state.clone(),
1546 store,
1547 ));
1548
1549 let peer = context.committee.to_authority_index(1).unwrap();
1550
1551 {
1554 let mut stream = authority_service
1555 .handle_subscribe_blocks(peer, 100)
1556 .await
1557 .unwrap();
1558 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1559 assert_eq!(
1560 block.round(),
1561 15,
1562 "Should return last proposed block as fallback"
1563 );
1564 assert_eq!(block.author().value(), 0);
1565 }
1566
1567 {
1570 let mut stream = authority_service
1571 .handle_subscribe_blocks(peer, 7)
1572 .await
1573 .unwrap();
1574
1575 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1576 assert_eq!(block1.round(), 10, "Should return block at round 10");
1577
1578 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1579 assert_eq!(block2.round(), 15, "Should return block at round 15");
1580 }
1581 }
1582
1583 #[tokio::test(flavor = "current_thread", start_paused = true)]
1584 async fn test_handle_subscribe_blocks_not_proposed() {
1585 let (context, _keys) = Context::new_for_test(4);
1586 let context = Arc::new(context);
1587 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1588 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1589 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1590 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1591 let fake_client = Arc::new(FakeNetworkClient::default());
1592 let network_client = Arc::new(SynchronizerClient::new(
1593 context.clone(),
1594 Some(fake_client.clone()),
1595 Some(fake_client.clone()),
1596 ));
1597 let store = Arc::new(MemStore::new());
1598 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1599 let transaction_vote_tracker =
1600 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1601 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1602 let synchronizer = Synchronizer::start(
1603 network_client,
1604 context.clone(),
1605 core_dispatcher.clone(),
1606 commit_vote_monitor.clone(),
1607 block_verifier.clone(),
1608 transaction_vote_tracker.clone(),
1609 round_tracker.clone(),
1610 dag_state.clone(),
1611 false,
1612 );
1613
1614 let authority_service = Arc::new(AuthorityService::new(
1617 context.clone(),
1618 block_verifier,
1619 commit_vote_monitor,
1620 round_tracker,
1621 synchronizer,
1622 core_dispatcher.clone(),
1623 rx_block_broadcast,
1624 transaction_vote_tracker,
1625 dag_state.clone(),
1626 store,
1627 ));
1628
1629 let peer = context.committee.to_authority_index(1).unwrap();
1630
1631 let mut stream = authority_service
1633 .handle_subscribe_blocks(peer, 0)
1634 .await
1635 .unwrap();
1636
1637 use futures::poll;
1639 use std::task::Poll;
1640 let poll_result = poll!(stream.next());
1641 assert!(
1642 matches!(poll_result, Poll::Pending),
1643 "Should not receive genesis block on subscription stream"
1644 );
1645 }
1646}