1use std::{
5 cmp::Reverse,
6 collections::{BTreeMap, BTreeSet, BinaryHeap},
7 pin::Pin,
8 sync::Arc,
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use consensus_config::AuthorityIndex;
15use consensus_types::block::{BlockRef, Round};
16use futures::{Stream, StreamExt, ready, stream, task};
17use mysten_metrics::spawn_monitored_task;
18use parking_lot::RwLock;
19use rand::seq::SliceRandom as _;
20use sui_macros::fail_point_async;
21use tap::TapFallible;
22use tokio::sync::broadcast;
23use tokio_util::sync::ReusableBoxFuture;
24use tracing::{debug, info, warn};
25
26use crate::{
27 CommitIndex,
28 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
29 block_verifier::BlockVerifier,
30 commit::{CommitAPI as _, CommitRange, TrustedCommit},
31 commit_vote_monitor::CommitVoteMonitor,
32 context::Context,
33 core_thread::CoreThreadDispatcher,
34 dag_state::DagState,
35 error::{ConsensusError, ConsensusResult},
36 network::{
37 BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream, ObserverBlockStreamItem,
38 ObserverNetworkService, PeerId, ValidatorNetworkService,
39 },
40 round_tracker::RoundTracker,
41 stake_aggregator::{QuorumThreshold, StakeAggregator},
42 storage::Store,
43 synchronizer::SynchronizerHandle,
44 transaction_vote_tracker::TransactionVoteTracker,
45};
46
47pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
48
49pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
51 context: Arc<Context>,
52 commit_vote_monitor: Arc<CommitVoteMonitor>,
53 block_verifier: Arc<dyn BlockVerifier>,
54 synchronizer: Arc<SynchronizerHandle>,
55 core_dispatcher: Arc<C>,
56 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
57 subscription_counter: Arc<SubscriptionCounter>,
58 transaction_vote_tracker: TransactionVoteTracker,
59 dag_state: Arc<RwLock<DagState>>,
60 store: Arc<dyn Store>,
61 round_tracker: Arc<RwLock<RoundTracker>>,
62}
63
64impl<C: CoreThreadDispatcher> AuthorityService<C> {
65 pub(crate) fn new(
66 context: Arc<Context>,
67 block_verifier: Arc<dyn BlockVerifier>,
68 commit_vote_monitor: Arc<CommitVoteMonitor>,
69 round_tracker: Arc<RwLock<RoundTracker>>,
70 synchronizer: Arc<SynchronizerHandle>,
71 core_dispatcher: Arc<C>,
72 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
73 transaction_vote_tracker: TransactionVoteTracker,
74 dag_state: Arc<RwLock<DagState>>,
75 store: Arc<dyn Store>,
76 ) -> Self {
77 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
78 Self {
79 context,
80 block_verifier,
81 commit_vote_monitor,
82 synchronizer,
83 core_dispatcher,
84 rx_block_broadcast,
85 subscription_counter,
86 transaction_vote_tracker,
87 dag_state,
88 store,
89 round_tracker,
90 }
91 }
92
93 fn parse_excluded_ancestors(
95 &self,
96 peer: AuthorityIndex,
97 block: &VerifiedBlock,
98 mut excluded_ancestors: Vec<Vec<u8>>,
99 ) -> ConsensusResult<Vec<BlockRef>> {
100 let peer_hostname = &self.context.committee.authority(peer).hostname;
101
102 let excluded_ancestors_limit = self.context.committee.size() * 2;
103 if excluded_ancestors.len() > excluded_ancestors_limit {
104 debug!(
105 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
106 excluded_ancestors.len() - excluded_ancestors_limit,
107 peer,
108 peer_hostname,
109 );
110 excluded_ancestors.truncate(excluded_ancestors_limit);
111 }
112
113 let excluded_ancestors = excluded_ancestors
114 .into_iter()
115 .map(|serialized| {
116 let block_ref: BlockRef =
117 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
118 if !self.context.committee.is_valid_index(block_ref.author) {
119 return Err(ConsensusError::InvalidAuthorityIndex {
120 index: block_ref.author,
121 max: self.context.committee.size(),
122 });
123 }
124 if block_ref.round >= block.round() {
125 return Err(ConsensusError::InvalidAncestorRound {
126 ancestor: block_ref.round,
127 block: block.round(),
128 });
129 }
130 Ok(block_ref)
131 })
132 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
133
134 for excluded_ancestor in &excluded_ancestors {
135 let excluded_ancestor_hostname = &self
136 .context
137 .committee
138 .authority(excluded_ancestor.author)
139 .hostname;
140 self.context
141 .metrics
142 .node_metrics
143 .network_excluded_ancestors_count_by_authority
144 .with_label_values(&[excluded_ancestor_hostname])
145 .inc();
146 }
147 self.context
148 .metrics
149 .node_metrics
150 .network_received_excluded_ancestors_from_authority
151 .with_label_values(&[peer_hostname])
152 .inc_by(excluded_ancestors.len() as u64);
153
154 Ok(excluded_ancestors)
155 }
156}
157
158#[async_trait]
159impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
160 async fn handle_send_block(
161 &self,
162 peer: AuthorityIndex,
163 serialized_block: ExtendedSerializedBlock,
164 ) -> ConsensusResult<()> {
165 fail_point_async!("consensus-rpc-response");
166
167 let peer_hostname = &self.context.committee.authority(peer).hostname;
168
169 let signed_block: SignedBlock =
171 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
172
173 if peer != signed_block.author() {
175 self.context
176 .metrics
177 .node_metrics
178 .invalid_blocks
179 .with_label_values(&[
180 peer_hostname.as_str(),
181 "handle_send_block",
182 "UnexpectedAuthority",
183 ])
184 .inc();
185 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
186 info!("Block with wrong authority from {}: {}", peer, e);
187 return Err(e);
188 }
189
190 let (verified_block, reject_txn_votes) = self
192 .block_verifier
193 .verify_and_vote(signed_block, serialized_block.block)
194 .tap_err(|e| {
195 self.context
196 .metrics
197 .node_metrics
198 .invalid_blocks
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200 .inc();
201 info!("Invalid block from {}: {}", peer, e);
202 })?;
203 let excluded_ancestors = self
204 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
205 .tap_err(|e| {
206 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
207 self.context
208 .metrics
209 .node_metrics
210 .invalid_blocks
211 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
212 .inc();
213 })?;
214
215 let block_ref = verified_block.reference();
216 debug!("Received block {} via send block.", block_ref);
217
218 self.context
219 .metrics
220 .node_metrics
221 .verified_blocks
222 .with_label_values(&[peer_hostname])
223 .inc();
224
225 let now = self.context.clock.timestamp_utc_ms();
226 let forward_time_drift =
227 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
228
229 self.context
230 .metrics
231 .node_metrics
232 .block_timestamp_drift_ms
233 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
234 .inc_by(forward_time_drift.as_millis() as u64);
235
236 self.commit_vote_monitor.observe_block(&verified_block);
239
240 self.round_tracker
242 .write()
243 .update_from_verified_block(&ExtendedBlock {
244 block: verified_block.clone(),
245 excluded_ancestors: excluded_ancestors.clone(),
246 });
247
248 let last_commit_index = self.dag_state.read().last_commit_index();
257 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
258 if last_commit_index
261 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
262 < quorum_commit_index
263 {
264 self.context
265 .metrics
266 .node_metrics
267 .rejected_blocks
268 .with_label_values(&["commit_lagging"])
269 .inc();
270 debug!(
271 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
272 block_ref, last_commit_index, quorum_commit_index,
273 );
274 return Err(ConsensusError::BlockRejected {
275 block_ref,
276 reason: format!(
277 "Last commit index is lagging quorum commit index too much ({} < {})",
278 last_commit_index, quorum_commit_index,
279 ),
280 });
281 }
282
283 if self.context.protocol_config.transaction_voting_enabled() {
286 self.transaction_vote_tracker
287 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
288 }
289
290 let missing_ancestors = self
292 .core_dispatcher
293 .add_blocks(vec![verified_block.clone()])
294 .await
295 .map_err(|_| ConsensusError::Shutdown)?;
296
297 if !missing_ancestors.is_empty() {
299 self.context
300 .metrics
301 .node_metrics
302 .handler_received_block_missing_ancestors
303 .with_label_values(&[peer_hostname])
304 .inc_by(missing_ancestors.len() as u64);
305 let synchronizer = self.synchronizer.clone();
306 spawn_monitored_task!(async move {
307 if let Err(err) = synchronizer
312 .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
313 .await
314 {
315 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
316 }
317 });
318 }
319
320 let missing_excluded_ancestors = self
322 .core_dispatcher
323 .check_block_refs(excluded_ancestors)
324 .await
325 .map_err(|_| ConsensusError::Shutdown)?;
326 if !missing_excluded_ancestors.is_empty() {
327 self.context
328 .metrics
329 .node_metrics
330 .network_excluded_ancestors_sent_to_fetch
331 .with_label_values(&[peer_hostname])
332 .inc_by(missing_excluded_ancestors.len() as u64);
333
334 let synchronizer = self.synchronizer.clone();
335 spawn_monitored_task!(async move {
336 if let Err(err) = synchronizer
337 .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
338 .await
339 {
340 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
341 }
342 });
343 }
344
345 Ok(())
346 }
347
348 async fn handle_subscribe_blocks(
349 &self,
350 peer: AuthorityIndex,
351 last_received: Round,
352 ) -> ConsensusResult<BlockStream> {
353 fail_point_async!("consensus-rpc-response");
354
355 let past_proposed_blocks = {
364 let dag_state = self.dag_state.read();
365
366 let mut proposed_blocks =
367 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
368 if proposed_blocks.is_empty() {
369 let last_proposed_block = dag_state
370 .get_last_proposed_block()
371 .expect("Last proposed block should be returned on validators");
372 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
373 vec![last_proposed_block]
374 } else {
375 vec![]
376 };
377 }
378 stream::iter(
379 proposed_blocks
380 .into_iter()
381 .map(|block| ExtendedSerializedBlock {
382 block: block.serialized().clone(),
383 excluded_ancestors: vec![],
384 }),
385 )
386 };
387
388 let broadcasted_blocks = BroadcastedBlockStream::new(
389 PeerId::Validator(peer),
390 self.rx_block_broadcast.resubscribe(),
391 self.subscription_counter.clone(),
392 );
393
394 Ok(Box::pin(past_proposed_blocks.chain(
396 broadcasted_blocks.map(ExtendedSerializedBlock::from),
397 )))
398 }
399
400 async fn handle_fetch_blocks(
416 &self,
417 _peer: AuthorityIndex,
418 mut block_refs: Vec<BlockRef>,
419 fetch_after_rounds: Vec<Round>,
420 fetch_missing_ancestors: bool,
421 ) -> ConsensusResult<Vec<Bytes>> {
422 fail_point_async!("consensus-rpc-response");
423
424 if block_refs.is_empty() && (fetch_missing_ancestors || fetch_after_rounds.is_empty()) {
425 return Err(ConsensusError::InvalidFetchBlocksRequest("When no block refs are provided, fetch_after_rounds must be provided and fetch_missing_ancestors must be false".to_string()));
426 }
427 if !fetch_after_rounds.is_empty()
428 && fetch_after_rounds.len() != self.context.committee.size()
429 {
430 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
431 fetch_after_rounds.len(),
432 self.context.committee.size(),
433 ));
434 }
435
436 let max_response_num_blocks = if !fetch_after_rounds.is_empty() && !block_refs.is_empty() {
438 self.context.parameters.max_blocks_per_sync
439 } else {
440 self.context.parameters.max_blocks_per_fetch
441 };
442 if block_refs.len() > max_response_num_blocks {
443 block_refs.truncate(max_response_num_blocks);
444 }
445
446 for block in &block_refs {
448 if !self.context.committee.is_valid_index(block.author) {
449 return Err(ConsensusError::InvalidAuthorityIndex {
450 index: block.author,
451 max: self.context.committee.size(),
452 });
453 }
454 if block.round == GENESIS_ROUND {
455 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
456 }
457 }
458
459 let mut blocks = self
461 .dag_state
462 .read()
463 .get_blocks(&block_refs)
464 .into_iter()
465 .flatten()
466 .collect::<Vec<_>>();
467
468 if blocks.len() < max_response_num_blocks && !fetch_after_rounds.is_empty() {
471 if fetch_missing_ancestors {
472 let missing_ancestors = blocks
475 .iter()
476 .flat_map(|block| block.ancestors().to_vec())
477 .filter(|block_ref| fetch_after_rounds[block_ref.author] < block_ref.round)
478 .collect::<BTreeSet<_>>()
479 .into_iter()
480 .collect::<Vec<_>>();
481
482 let selected_num_blocks = max_response_num_blocks
485 .saturating_sub(blocks.len())
486 .min(missing_ancestors.len());
487 if selected_num_blocks > 0 {
488 let selected_ancestor_refs = missing_ancestors
489 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
490 .copied()
491 .collect::<Vec<_>>();
492 let ancestor_blocks = self
493 .dag_state
494 .read()
495 .get_blocks(&selected_ancestor_refs)
496 .into_iter()
497 .flatten();
498 blocks.extend(ancestor_blocks);
499 }
500 } else {
501 let mut limit_rounds = BTreeMap::<AuthorityIndex, Round>::new();
504 if block_refs.is_empty() {
505 let dag_state = self.dag_state.read();
506 for (index, _authority) in self.context.committee.authorities() {
507 let last_block = dag_state.get_last_block_for_authority(index);
508 limit_rounds.insert(index, last_block.round());
509 }
510 } else {
511 for block_ref in &block_refs {
512 let entry = limit_rounds
513 .entry(block_ref.author)
514 .or_insert(block_ref.round);
515 *entry = (*entry).min(block_ref.round);
516 }
517 }
518
519 let mut heap = BinaryHeap::new();
522 for (authority, limit_round) in &limit_rounds {
523 let fetch_start = fetch_after_rounds[*authority] + 1;
524 if fetch_start < *limit_round {
525 heap.push(Reverse((fetch_start, *authority, *limit_round)));
526 }
527 }
528
529 while let Some(Reverse((fetch_start, authority, limit_round))) = heap.pop() {
530 let fetched = self.store.scan_blocks_by_author_in_range(
531 authority,
532 fetch_start,
533 limit_round,
534 1,
535 )?;
536 if let Some(block) = fetched.into_iter().next() {
537 let next_start = block.round() + 1;
538 blocks.push(block);
539 if blocks.len() >= max_response_num_blocks {
540 blocks.truncate(max_response_num_blocks);
541 break;
542 }
543 if next_start < limit_round {
544 heap.push(Reverse((next_start, authority, limit_round)));
545 }
546 }
547 }
548 }
549 }
550
551 let bytes = blocks
553 .into_iter()
554 .map(|block| block.serialized().clone())
555 .collect::<Vec<_>>();
556 Ok(bytes)
557 }
558
559 async fn handle_fetch_commits(
560 &self,
561 _peer: AuthorityIndex,
562 commit_range: CommitRange,
563 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
564 fail_point_async!("consensus-rpc-response");
565
566 let inclusive_end = commit_range.end().min(
568 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
569 - 1,
570 );
571 let mut commits = self
572 .store
573 .scan_commits((commit_range.start()..=inclusive_end).into())?;
574 let mut certifier_block_refs = vec![];
575 'commit: while let Some(c) = commits.last() {
576 let index = c.index();
577 let votes = self.store.read_commit_votes(index)?;
578 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
579 for v in &votes {
580 stake_aggregator.add(v.author, &self.context.committee);
581 }
582 if stake_aggregator.reached_threshold(&self.context.committee) {
583 certifier_block_refs = votes;
584 break 'commit;
585 } else {
586 debug!(
587 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
588 index,
589 stake_aggregator.stake(),
590 stake_aggregator.threshold(&self.context.committee)
591 );
592 self.context
593 .metrics
594 .node_metrics
595 .commit_sync_fetch_commits_handler_uncertified_skipped
596 .inc();
597 commits.pop();
598 }
599 }
600 let certifier_blocks = self
601 .store
602 .read_blocks(&certifier_block_refs)?
603 .into_iter()
604 .flatten()
605 .collect();
606 Ok((commits, certifier_blocks))
607 }
608
609 async fn handle_fetch_latest_blocks(
610 &self,
611 peer: AuthorityIndex,
612 authorities: Vec<AuthorityIndex>,
613 ) -> ConsensusResult<Vec<Bytes>> {
614 fail_point_async!("consensus-rpc-response");
615
616 if authorities.len() > self.context.committee.size() {
617 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
618 }
619
620 for authority in &authorities {
622 if !self.context.committee.is_valid_index(*authority) {
623 return Err(ConsensusError::InvalidAuthorityIndex {
624 index: *authority,
625 max: self.context.committee.size(),
626 });
627 }
628 }
629
630 let mut blocks = vec![];
634 let dag_state = self.dag_state.read();
635 for authority in authorities {
636 let block = dag_state.get_last_block_for_authority(authority);
637
638 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
639
640 if block.round() != GENESIS_ROUND {
642 blocks.push(block);
643 }
644 }
645
646 let result = blocks
648 .into_iter()
649 .map(|block| block.serialized().clone())
650 .collect::<Vec<_>>();
651
652 Ok(result)
653 }
654
655 async fn handle_get_latest_rounds(
656 &self,
657 _peer: AuthorityIndex,
658 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
659 fail_point_async!("consensus-rpc-response");
660
661 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
662
663 let blocks = self
664 .dag_state
665 .read()
666 .get_last_cached_block_per_authority(Round::MAX);
667 let highest_accepted_rounds = blocks
668 .into_iter()
669 .map(|(block, _)| block.round())
670 .collect::<Vec<_>>();
671
672 Ok((highest_received_rounds, highest_accepted_rounds))
673 }
674}
675
676#[async_trait]
677impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
678 async fn handle_block(
679 &self,
680 _peer: PeerId,
681 _item: ObserverBlockStreamItem,
682 ) -> ConsensusResult<()> {
683 Err(ConsensusError::NetworkRequest(
685 "Observer block handling not yet implemented".to_string(),
686 ))
687 }
688
689 async fn handle_stream_blocks(
690 &self,
691 _peer: NodeId,
692 _highest_round_per_authority: Vec<u64>,
693 ) -> ConsensusResult<ObserverBlockStream> {
694 todo!("Observer block streaming not yet implemented")
696 }
697
698 async fn handle_fetch_blocks(
699 &self,
700 _peer: NodeId,
701 _block_refs: Vec<BlockRef>,
702 ) -> ConsensusResult<Vec<Bytes>> {
703 Err(ConsensusError::NetworkRequest(
706 "Observer fetch blocks not yet implemented".to_string(),
707 ))
708 }
709
710 async fn handle_fetch_commits(
711 &self,
712 _peer: NodeId,
713 _commit_range: CommitRange,
714 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
715 Err(ConsensusError::NetworkRequest(
717 "Observer fetch commits not yet implemented".to_string(),
718 ))
719 }
720}
721struct Counter {
722 count: usize,
723 subscriptions_by_peer: BTreeMap<PeerId, usize>,
724}
725
726pub(crate) struct SubscriptionCounter {
728 context: Arc<Context>,
729 counter: parking_lot::Mutex<Counter>,
730}
731
732impl SubscriptionCounter {
733 pub(crate) fn new(context: Arc<Context>) -> Self {
734 for (_, authority) in context.committee.authorities() {
736 context
737 .metrics
738 .node_metrics
739 .subscribed_by
740 .with_label_values(&[authority.hostname.as_str()])
741 .set(0);
742 }
743
744 Self {
745 counter: parking_lot::Mutex::new(Counter {
746 count: 0,
747 subscriptions_by_peer: BTreeMap::new(),
748 }),
749 context,
750 }
751 }
752
753 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
754 let mut counter = self.counter.lock();
755 counter.count += 1;
756 *counter
757 .subscriptions_by_peer
758 .entry(peer.clone())
759 .or_insert(0) += 1;
760
761 match peer {
762 PeerId::Validator(authority) => {
763 let peer_hostname = &self.context.committee.authority(*authority).hostname;
764 self.context
765 .metrics
766 .node_metrics
767 .subscribed_by
768 .with_label_values(&[peer_hostname])
769 .set(1);
770 }
771 PeerId::Observer(_) => {
772 self.context
773 .metrics
774 .node_metrics
775 .subscribed_by
776 .with_label_values(&["observer"])
777 .inc();
778 }
779 }
780
781 Ok(())
782 }
783
784 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
785 let mut counter = self.counter.lock();
786 counter.count -= 1;
787 *counter
788 .subscriptions_by_peer
789 .entry(peer.clone())
790 .or_insert(0) -= 1;
791
792 if counter.subscriptions_by_peer[peer] == 0 {
793 match peer {
794 PeerId::Validator(authority) => {
795 let peer_hostname = &self.context.committee.authority(*authority).hostname;
796 self.context
797 .metrics
798 .node_metrics
799 .subscribed_by
800 .with_label_values(&[peer_hostname])
801 .set(0);
802 }
803 PeerId::Observer(_) => {
804 self.context
805 .metrics
806 .node_metrics
807 .subscribed_by
808 .with_label_values(&["observer"])
809 .dec();
810 }
811 }
812 }
813
814 Ok(())
815 }
816}
817
818type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
821
822pub(crate) struct BroadcastStream<T> {
825 peer: PeerId,
826 inner: ReusableBoxFuture<
828 'static,
829 (
830 Result<T, broadcast::error::RecvError>,
831 broadcast::Receiver<T>,
832 ),
833 >,
834 subscription_counter: Arc<SubscriptionCounter>,
836}
837
838impl<T: 'static + Clone + Send> BroadcastStream<T> {
839 pub fn new(
840 peer: PeerId,
841 rx: broadcast::Receiver<T>,
842 subscription_counter: Arc<SubscriptionCounter>,
843 ) -> Self {
844 if let Err(err) = subscription_counter.increment(&peer) {
845 match err {
846 ConsensusError::Shutdown => {}
847 _ => panic!("Unexpected error: {err}"),
848 }
849 }
850 Self {
851 peer,
852 inner: ReusableBoxFuture::new(make_recv_future(rx)),
853 subscription_counter,
854 }
855 }
856}
857
858impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
859 type Item = T;
860
861 fn poll_next(
862 mut self: Pin<&mut Self>,
863 cx: &mut task::Context<'_>,
864 ) -> task::Poll<Option<Self::Item>> {
865 let peer = self.peer.clone();
866 let maybe_item = loop {
867 let (result, rx) = ready!(self.inner.poll(cx));
868 self.inner.set(make_recv_future(rx));
869
870 match result {
871 Ok(item) => break Some(item),
872 Err(broadcast::error::RecvError::Closed) => {
873 info!("Block BroadcastedBlockStream {} closed", peer);
874 break None;
875 }
876 Err(broadcast::error::RecvError::Lagged(n)) => {
877 warn!(
878 "Block BroadcastedBlockStream {} lagged by {} messages",
879 peer, n
880 );
881 continue;
882 }
883 }
884 };
885 task::Poll::Ready(maybe_item)
886 }
887}
888
889impl<T> Drop for BroadcastStream<T> {
890 fn drop(&mut self) {
891 if let Err(err) = self.subscription_counter.decrement(&self.peer) {
892 match err {
893 ConsensusError::Shutdown => {}
894 _ => panic!("Unexpected error: {err}"),
895 }
896 }
897 }
898}
899
900async fn make_recv_future<T: Clone>(
901 mut rx: broadcast::Receiver<T>,
902) -> (
903 Result<T, broadcast::error::RecvError>,
904 broadcast::Receiver<T>,
905) {
906 let result = rx.recv().await;
907 (result, rx)
908}
909
910#[cfg(test)]
913mod tests {
914 use std::{
915 collections::{BTreeMap, BTreeSet},
916 sync::Arc,
917 time::Duration,
918 };
919
920 use async_trait::async_trait;
921 use bytes::Bytes;
922 use consensus_config::AuthorityIndex;
923 use consensus_types::block::{BlockDigest, BlockRef, Round};
924 use parking_lot::{Mutex, RwLock};
925 use tokio::{sync::broadcast, time::sleep};
926
927 use futures::StreamExt as _;
928
929 use crate::{
930 authority_service::AuthorityService,
931 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
932 commit::{CertifiedCommits, CommitRange},
933 commit_vote_monitor::CommitVoteMonitor,
934 context::Context,
935 core_thread::{CoreError, CoreThreadDispatcher},
936 dag_state::DagState,
937 error::ConsensusResult,
938 network::{
939 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
940 ValidatorNetworkClient, ValidatorNetworkService,
941 },
942 peers_pool::PeersPool,
943 round_tracker::RoundTracker,
944 storage::mem_store::MemStore,
945 synchronizer::Synchronizer,
946 test_dag_builder::DagBuilder,
947 transaction_vote_tracker::TransactionVoteTracker,
948 };
949 struct FakeCoreThreadDispatcher {
950 blocks: Mutex<Vec<VerifiedBlock>>,
951 }
952
953 impl FakeCoreThreadDispatcher {
954 fn new() -> Self {
955 Self {
956 blocks: Mutex::new(vec![]),
957 }
958 }
959
960 fn get_blocks(&self) -> Vec<VerifiedBlock> {
961 self.blocks.lock().clone()
962 }
963 }
964
965 #[async_trait]
966 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
967 async fn add_blocks(
968 &self,
969 blocks: Vec<VerifiedBlock>,
970 ) -> Result<BTreeSet<BlockRef>, CoreError> {
971 let block_refs = blocks.iter().map(|b| b.reference()).collect();
972 self.blocks.lock().extend(blocks);
973 Ok(block_refs)
974 }
975
976 async fn check_block_refs(
977 &self,
978 _block_refs: Vec<BlockRef>,
979 ) -> Result<BTreeSet<BlockRef>, CoreError> {
980 Ok(BTreeSet::new())
981 }
982
983 async fn add_certified_commits(
984 &self,
985 _commits: CertifiedCommits,
986 ) -> Result<BTreeSet<BlockRef>, CoreError> {
987 todo!()
988 }
989
990 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
991 Ok(())
992 }
993
994 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
995 Ok(Default::default())
996 }
997
998 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
999 todo!()
1000 }
1001
1002 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
1003 todo!()
1004 }
1005 }
1006
1007 #[derive(Default)]
1008 struct FakeNetworkClient {}
1009
1010 #[async_trait]
1011 impl ValidatorNetworkClient for FakeNetworkClient {
1012 async fn send_block(
1013 &self,
1014 _peer: AuthorityIndex,
1015 _block: &VerifiedBlock,
1016 _timeout: Duration,
1017 ) -> ConsensusResult<()> {
1018 unimplemented!("Unimplemented")
1019 }
1020
1021 async fn subscribe_blocks(
1022 &self,
1023 _peer: AuthorityIndex,
1024 _last_received: Round,
1025 _timeout: Duration,
1026 ) -> ConsensusResult<BlockStream> {
1027 unimplemented!("Unimplemented")
1028 }
1029
1030 async fn fetch_blocks(
1031 &self,
1032 _peer: AuthorityIndex,
1033 _block_refs: Vec<BlockRef>,
1034 _fetch_after_rounds: Vec<Round>,
1035 _fetch_missing_ancestors: bool,
1036 _timeout: Duration,
1037 ) -> ConsensusResult<Vec<Bytes>> {
1038 unimplemented!("Unimplemented")
1039 }
1040
1041 async fn fetch_commits(
1042 &self,
1043 _peer: AuthorityIndex,
1044 _commit_range: CommitRange,
1045 _timeout: Duration,
1046 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1047 unimplemented!("Unimplemented")
1048 }
1049
1050 async fn fetch_latest_blocks(
1051 &self,
1052 _peer: AuthorityIndex,
1053 _authorities: Vec<AuthorityIndex>,
1054 _timeout: Duration,
1055 ) -> ConsensusResult<Vec<Bytes>> {
1056 unimplemented!("Unimplemented")
1057 }
1058
1059 async fn get_latest_rounds(
1060 &self,
1061 _peer: AuthorityIndex,
1062 _timeout: Duration,
1063 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
1064 unimplemented!("Unimplemented")
1065 }
1066 }
1067
1068 #[async_trait]
1069 impl ObserverNetworkClient for FakeNetworkClient {
1070 async fn stream_blocks(
1071 &self,
1072 _peer: crate::network::PeerId,
1073 _highest_round_per_authority: Vec<u64>,
1074 _timeout: Duration,
1075 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1076 unimplemented!("Unimplemented")
1077 }
1078
1079 async fn fetch_blocks(
1080 &self,
1081 _peer: crate::network::PeerId,
1082 _block_refs: Vec<BlockRef>,
1083 _timeout: Duration,
1084 ) -> ConsensusResult<Vec<Bytes>> {
1085 unimplemented!("Unimplemented")
1086 }
1087
1088 async fn fetch_commits(
1089 &self,
1090 _peer: crate::network::PeerId,
1091 _commit_range: CommitRange,
1092 _timeout: Duration,
1093 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1094 unimplemented!("Unimplemented")
1095 }
1096 }
1097
1098 #[tokio::test(flavor = "current_thread", start_paused = true)]
1099 async fn test_handle_send_block() {
1100 let (context, _keys) = Context::new_for_test(4);
1101 let context = Arc::new(context);
1102 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1103 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1104 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1105 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1106 let fake_client = Arc::new(FakeNetworkClient::default());
1107 let network_client = Arc::new(SynchronizerClient::new(
1108 context.clone(),
1109 Some(fake_client.clone()),
1110 Some(fake_client.clone()),
1111 ));
1112 let store = Arc::new(MemStore::new());
1113 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1114 let transaction_vote_tracker =
1115 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1116 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1117 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1118 let synchronizer = Synchronizer::start(
1119 network_client,
1120 context.clone(),
1121 core_dispatcher.clone(),
1122 commit_vote_monitor.clone(),
1123 block_verifier.clone(),
1124 transaction_vote_tracker.clone(),
1125 round_tracker.clone(),
1126 dag_state.clone(),
1127 peers_pool.clone(),
1128 false,
1129 );
1130 let authority_service = Arc::new(AuthorityService::new(
1131 context.clone(),
1132 block_verifier,
1133 commit_vote_monitor,
1134 round_tracker,
1135 synchronizer,
1136 core_dispatcher.clone(),
1137 rx_block_broadcast,
1138 transaction_vote_tracker,
1139 dag_state,
1140 store,
1141 ));
1142
1143 let now = context.clock.timestamp_utc_ms();
1145 let max_drift = context.parameters.max_forward_time_drift;
1146 let input_block = VerifiedBlock::new_for_test(
1147 TestBlock::new(9, 0)
1148 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1149 .build(),
1150 );
1151
1152 let service = authority_service.clone();
1153 let serialized = ExtendedSerializedBlock {
1154 block: input_block.serialized().clone(),
1155 excluded_ancestors: vec![],
1156 };
1157
1158 tokio::spawn({
1159 let service = service.clone();
1160 let context = context.clone();
1161 async move {
1162 service
1163 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1164 .await
1165 .unwrap();
1166 }
1167 });
1168
1169 sleep(max_drift / 2).await;
1170
1171 let blocks = core_dispatcher.get_blocks();
1172 assert_eq!(blocks.len(), 1);
1173 assert_eq!(blocks[0], input_block);
1174
1175 let invalid_block =
1177 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1178 let extended_block = ExtendedSerializedBlock {
1179 block: invalid_block.serialized().clone(),
1180 excluded_ancestors: vec![],
1181 };
1182 service
1183 .handle_send_block(
1184 context.committee.to_authority_index(0).unwrap(),
1185 extended_block,
1186 )
1187 .await
1188 .unwrap_err();
1189
1190 let invalid_excluded_ancestors = vec![
1192 bcs::to_bytes(&BlockRef::new(
1193 10,
1194 AuthorityIndex::new_for_test(1000),
1195 BlockDigest::MIN,
1196 ))
1197 .unwrap(),
1198 vec![3u8; 40],
1199 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1200 ];
1201 let extended_block = ExtendedSerializedBlock {
1202 block: input_block.serialized().clone(),
1203 excluded_ancestors: invalid_excluded_ancestors,
1204 };
1205 service
1206 .handle_send_block(
1207 context.committee.to_authority_index(0).unwrap(),
1208 extended_block,
1209 )
1210 .await
1211 .unwrap_err();
1212 }
1213
1214 #[tokio::test(flavor = "current_thread", start_paused = true)]
1215 async fn test_handle_fetch_blocks() {
1216 const NUM_AUTHORITIES: usize = 40;
1219 const NUM_ROUNDS: usize = 40;
1220 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1221 context.parameters.max_blocks_per_fetch = 50;
1222 let context = Arc::new(context);
1223 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1224 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1225 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1226 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1227 let fake_client = Arc::new(FakeNetworkClient::default());
1228 let network_client = Arc::new(SynchronizerClient::new(
1229 context.clone(),
1230 Some(fake_client.clone()),
1231 Some(fake_client.clone()),
1232 ));
1233 let store = Arc::new(MemStore::new());
1234 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1235 let transaction_vote_tracker =
1236 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1237 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1238 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1239 let synchronizer = Synchronizer::start(
1240 network_client,
1241 context.clone(),
1242 core_dispatcher.clone(),
1243 commit_vote_monitor.clone(),
1244 block_verifier.clone(),
1245 transaction_vote_tracker.clone(),
1246 round_tracker.clone(),
1247 dag_state.clone(),
1248 peers_pool.clone(),
1249 false,
1250 );
1251 let authority_service = Arc::new(AuthorityService::new(
1252 context.clone(),
1253 block_verifier,
1254 commit_vote_monitor,
1255 round_tracker,
1256 synchronizer,
1257 core_dispatcher.clone(),
1258 rx_block_broadcast,
1259 transaction_vote_tracker,
1260 dag_state.clone(),
1261 store,
1262 ));
1263
1264 let mut dag_builder = DagBuilder::new(context.clone());
1266 dag_builder
1267 .layers(1..=(NUM_ROUNDS as u32))
1268 .build()
1269 .persist_layers(dag_state.clone());
1270 dag_state.write().flush();
1271 let all_blocks = dag_builder.all_blocks();
1272
1273 let missing_block_refs: Vec<BlockRef> = all_blocks
1275 .iter()
1276 .rev()
1277 .take(2)
1278 .map(|b| b.reference())
1279 .collect();
1280 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1281 let results = authority_service
1282 .handle_fetch_blocks(
1283 AuthorityIndex::new_for_test(0),
1284 missing_block_refs.clone(),
1285 highest_accepted_rounds,
1286 true,
1287 )
1288 .await
1289 .unwrap();
1290
1291 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1293 .iter()
1294 .map(|b| {
1295 let signed = bcs::from_bytes(b).unwrap();
1296 let block = VerifiedBlock::new_verified(signed, b.clone());
1297 (block.reference(), block)
1298 })
1299 .collect();
1300 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1301 for b in &missing_block_refs {
1303 assert!(blocks.contains_key(b));
1304 }
1305 let num_missing_ancestors = blocks
1306 .keys()
1307 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1308 .count();
1309 assert_eq!(
1310 num_missing_ancestors,
1311 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1312 );
1313
1314 let missing_round = NUM_ROUNDS as Round - 3;
1316 let missing_block_refs: Vec<BlockRef> = all_blocks
1317 .iter()
1318 .filter(|b| b.reference().round == missing_round)
1319 .map(|b| b.reference())
1320 .take(2)
1321 .collect();
1322 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1323 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1325 let results = authority_service
1326 .handle_fetch_blocks(
1327 AuthorityIndex::new_for_test(0),
1328 missing_block_refs.clone(),
1329 highest_accepted_rounds,
1330 false,
1331 )
1332 .await
1333 .unwrap();
1334
1335 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1337 .iter()
1338 .map(|b| {
1339 let signed = bcs::from_bytes(b).unwrap();
1340 let block = VerifiedBlock::new_verified(signed, b.clone());
1341 (block.reference(), block)
1342 })
1343 .collect();
1344 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1345 for b in &missing_block_refs {
1347 assert!(blocks.contains_key(b));
1348 }
1349 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1351 for b in blocks.keys() {
1352 assert!(b.round <= missing_round);
1353 assert!(expected_authors.contains(&b.author));
1354 }
1355
1356 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1358 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1360 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1361 let results = authority_service
1362 .handle_fetch_blocks(
1363 AuthorityIndex::new_for_test(0),
1364 vec![],
1365 highest_accepted_rounds.clone(),
1366 false,
1367 )
1368 .await
1369 .unwrap();
1370
1371 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1373 .iter()
1374 .map(|b| {
1375 let signed = bcs::from_bytes(b).unwrap();
1376 let block = VerifiedBlock::new_verified(signed, b.clone());
1377 (block.reference(), block)
1378 })
1379 .collect();
1380 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1381 for block_ref in blocks.keys() {
1383 let accepted = highest_accepted_rounds[block_ref.author];
1384 assert!(block_ref.round > accepted);
1385 }
1386 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1389 assert!(
1392 max_round_in_result <= 4,
1393 "Expected low rounds from fair round-order fetching, got max round {}",
1394 max_round_in_result
1395 );
1396
1397 let missing_block_refs: Vec<BlockRef> = all_blocks
1399 .iter()
1400 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1401 .map(|b| b.reference())
1402 .take(5)
1403 .collect();
1404 let results = authority_service
1405 .handle_fetch_blocks(
1406 AuthorityIndex::new_for_test(0),
1407 missing_block_refs.clone(),
1408 vec![],
1409 false,
1410 )
1411 .await
1412 .unwrap();
1413
1414 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1416 .iter()
1417 .map(|b| {
1418 let signed = bcs::from_bytes(b).unwrap();
1419 let block = VerifiedBlock::new_verified(signed, b.clone());
1420 (block.reference(), block)
1421 })
1422 .collect();
1423 assert_eq!(blocks.len(), 5);
1424 for b in &missing_block_refs {
1425 assert!(blocks.contains_key(b));
1426 }
1427 }
1428
1429 #[tokio::test(flavor = "current_thread", start_paused = true)]
1430 async fn test_handle_fetch_latest_blocks() {
1431 let (context, _keys) = Context::new_for_test(4);
1433 let context = Arc::new(context);
1434 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1435 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1436 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1437 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1438 let fake_client = Arc::new(FakeNetworkClient::default());
1439 let network_client = Arc::new(SynchronizerClient::new(
1440 context.clone(),
1441 Some(fake_client.clone()),
1442 Some(fake_client.clone()),
1443 ));
1444 let store = Arc::new(MemStore::new());
1445 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1446 let transaction_vote_tracker =
1447 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1448 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1449 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1450 let synchronizer = Synchronizer::start(
1451 network_client,
1452 context.clone(),
1453 core_dispatcher.clone(),
1454 commit_vote_monitor.clone(),
1455 block_verifier.clone(),
1456 transaction_vote_tracker.clone(),
1457 round_tracker.clone(),
1458 dag_state.clone(),
1459 peers_pool.clone(),
1460 true,
1461 );
1462 let authority_service = Arc::new(AuthorityService::new(
1463 context.clone(),
1464 block_verifier,
1465 commit_vote_monitor,
1466 round_tracker,
1467 synchronizer,
1468 core_dispatcher.clone(),
1469 rx_block_broadcast,
1470 transaction_vote_tracker,
1471 dag_state.clone(),
1472 store,
1473 ));
1474
1475 let mut dag_builder = DagBuilder::new(context.clone());
1477 dag_builder
1478 .layers(1..=10)
1479 .authorities(vec![AuthorityIndex::new_for_test(2)])
1480 .equivocate(1)
1481 .build()
1482 .persist_layers(dag_state);
1483
1484 let authorities_to_request = vec![
1486 AuthorityIndex::new_for_test(1),
1487 AuthorityIndex::new_for_test(2),
1488 ];
1489 let results = authority_service
1490 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1491 .await;
1492
1493 let serialised_blocks = results.unwrap();
1495 for serialised_block in serialised_blocks {
1496 let signed_block: SignedBlock =
1497 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1498 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1499
1500 assert_eq!(verified_block.round(), 10);
1501 }
1502 }
1503
1504 #[tokio::test(flavor = "current_thread", start_paused = true)]
1505 async fn test_handle_subscribe_blocks() {
1506 let (context, _keys) = Context::new_for_test(4);
1507 let context = Arc::new(context);
1508 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1509 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1510 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1511 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1512 let fake_client = Arc::new(FakeNetworkClient::default());
1513 let network_client = Arc::new(SynchronizerClient::new(
1514 context.clone(),
1515 Some(fake_client.clone()),
1516 Some(fake_client.clone()),
1517 ));
1518 let store = Arc::new(MemStore::new());
1519 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1520 let transaction_vote_tracker =
1521 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1522 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1523 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1524 let synchronizer = Synchronizer::start(
1525 network_client,
1526 context.clone(),
1527 core_dispatcher.clone(),
1528 commit_vote_monitor.clone(),
1529 block_verifier.clone(),
1530 transaction_vote_tracker.clone(),
1531 round_tracker.clone(),
1532 dag_state.clone(),
1533 peers_pool.clone(),
1534 false,
1535 );
1536
1537 dag_state
1539 .write()
1540 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1541 dag_state
1542 .write()
1543 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1544 dag_state
1545 .write()
1546 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1547
1548 let authority_service = Arc::new(AuthorityService::new(
1549 context.clone(),
1550 block_verifier,
1551 commit_vote_monitor,
1552 round_tracker,
1553 synchronizer,
1554 core_dispatcher.clone(),
1555 rx_block_broadcast,
1556 transaction_vote_tracker,
1557 dag_state.clone(),
1558 store,
1559 ));
1560
1561 let peer = context.committee.to_authority_index(1).unwrap();
1562
1563 {
1566 let mut stream = authority_service
1567 .handle_subscribe_blocks(peer, 100)
1568 .await
1569 .unwrap();
1570 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1571 assert_eq!(
1572 block.round(),
1573 15,
1574 "Should return last proposed block as fallback"
1575 );
1576 assert_eq!(block.author().value(), 0);
1577 }
1578
1579 {
1582 let mut stream = authority_service
1583 .handle_subscribe_blocks(peer, 7)
1584 .await
1585 .unwrap();
1586
1587 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1588 assert_eq!(block1.round(), 10, "Should return block at round 10");
1589
1590 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1591 assert_eq!(block2.round(), 15, "Should return block at round 15");
1592 }
1593 }
1594
1595 #[tokio::test(flavor = "current_thread", start_paused = true)]
1596 async fn test_handle_subscribe_blocks_not_proposed() {
1597 let (context, _keys) = Context::new_for_test(4);
1598 let context = Arc::new(context);
1599 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1600 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1601 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1602 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1603 let fake_client = Arc::new(FakeNetworkClient::default());
1604 let network_client = Arc::new(SynchronizerClient::new(
1605 context.clone(),
1606 Some(fake_client.clone()),
1607 Some(fake_client.clone()),
1608 ));
1609 let store = Arc::new(MemStore::new());
1610 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1611 let transaction_vote_tracker =
1612 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1613 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1614 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1615 let synchronizer = Synchronizer::start(
1616 network_client,
1617 context.clone(),
1618 core_dispatcher.clone(),
1619 commit_vote_monitor.clone(),
1620 block_verifier.clone(),
1621 transaction_vote_tracker.clone(),
1622 round_tracker.clone(),
1623 dag_state.clone(),
1624 peers_pool.clone(),
1625 false,
1626 );
1627
1628 let authority_service = Arc::new(AuthorityService::new(
1631 context.clone(),
1632 block_verifier,
1633 commit_vote_monitor,
1634 round_tracker,
1635 synchronizer,
1636 core_dispatcher.clone(),
1637 rx_block_broadcast,
1638 transaction_vote_tracker,
1639 dag_state.clone(),
1640 store,
1641 ));
1642
1643 let peer = context.committee.to_authority_index(1).unwrap();
1644
1645 let mut stream = authority_service
1647 .handle_subscribe_blocks(peer, 0)
1648 .await
1649 .unwrap();
1650
1651 use futures::poll;
1653 use std::task::Poll;
1654 let poll_result = poll!(stream.next());
1655 assert!(
1656 matches!(poll_result, Poll::Pending),
1657 "Should not receive genesis block on subscription stream"
1658 );
1659 }
1660}