1use std::{
5 collections::{BTreeMap, BTreeSet},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26 CommitIndex,
27 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28 block_verifier::BlockVerifier,
29 commit::{CommitAPI as _, CommitRange, TrustedCommit},
30 commit_vote_monitor::CommitVoteMonitor,
31 context::Context,
32 core_thread::CoreThreadDispatcher,
33 dag_state::DagState,
34 error::{ConsensusError, ConsensusResult},
35 network::{
36 BlockRequestStream, BlockStream, ExtendedSerializedBlock, NodeId, ObserverBlockStream,
37 ObserverNetworkService, ValidatorNetworkService,
38 },
39 round_tracker::RoundTracker,
40 stake_aggregator::{QuorumThreshold, StakeAggregator},
41 storage::Store,
42 synchronizer::SynchronizerHandle,
43 transaction_certifier::TransactionCertifier,
44};
45
46pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
47
48pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
50 context: Arc<Context>,
51 commit_vote_monitor: Arc<CommitVoteMonitor>,
52 block_verifier: Arc<dyn BlockVerifier>,
53 synchronizer: Arc<SynchronizerHandle>,
54 core_dispatcher: Arc<C>,
55 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
56 subscription_counter: Arc<SubscriptionCounter>,
57 transaction_certifier: TransactionCertifier,
58 dag_state: Arc<RwLock<DagState>>,
59 store: Arc<dyn Store>,
60 round_tracker: Arc<RwLock<RoundTracker>>,
61}
62
63impl<C: CoreThreadDispatcher> AuthorityService<C> {
64 pub(crate) fn new(
65 context: Arc<Context>,
66 block_verifier: Arc<dyn BlockVerifier>,
67 commit_vote_monitor: Arc<CommitVoteMonitor>,
68 round_tracker: Arc<RwLock<RoundTracker>>,
69 synchronizer: Arc<SynchronizerHandle>,
70 core_dispatcher: Arc<C>,
71 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
72 transaction_certifier: TransactionCertifier,
73 dag_state: Arc<RwLock<DagState>>,
74 store: Arc<dyn Store>,
75 ) -> Self {
76 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
77 Self {
78 context,
79 block_verifier,
80 commit_vote_monitor,
81 synchronizer,
82 core_dispatcher,
83 rx_block_broadcast,
84 subscription_counter,
85 transaction_certifier,
86 dag_state,
87 store,
88 round_tracker,
89 }
90 }
91
92 fn parse_excluded_ancestors(
94 &self,
95 peer: AuthorityIndex,
96 block: &VerifiedBlock,
97 mut excluded_ancestors: Vec<Vec<u8>>,
98 ) -> ConsensusResult<Vec<BlockRef>> {
99 let peer_hostname = &self.context.committee.authority(peer).hostname;
100
101 let excluded_ancestors_limit = self.context.committee.size() * 2;
102 if excluded_ancestors.len() > excluded_ancestors_limit {
103 debug!(
104 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
105 excluded_ancestors.len() - excluded_ancestors_limit,
106 peer,
107 peer_hostname,
108 );
109 excluded_ancestors.truncate(excluded_ancestors_limit);
110 }
111
112 let excluded_ancestors = excluded_ancestors
113 .into_iter()
114 .map(|serialized| {
115 let block_ref: BlockRef =
116 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
117 if !self.context.committee.is_valid_index(block_ref.author) {
118 return Err(ConsensusError::InvalidAuthorityIndex {
119 index: block_ref.author,
120 max: self.context.committee.size(),
121 });
122 }
123 if block_ref.round >= block.round() {
124 return Err(ConsensusError::InvalidAncestorRound {
125 ancestor: block_ref.round,
126 block: block.round(),
127 });
128 }
129 Ok(block_ref)
130 })
131 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
132
133 for excluded_ancestor in &excluded_ancestors {
134 let excluded_ancestor_hostname = &self
135 .context
136 .committee
137 .authority(excluded_ancestor.author)
138 .hostname;
139 self.context
140 .metrics
141 .node_metrics
142 .network_excluded_ancestors_count_by_authority
143 .with_label_values(&[excluded_ancestor_hostname])
144 .inc();
145 }
146 self.context
147 .metrics
148 .node_metrics
149 .network_received_excluded_ancestors_from_authority
150 .with_label_values(&[peer_hostname])
151 .inc_by(excluded_ancestors.len() as u64);
152
153 Ok(excluded_ancestors)
154 }
155}
156
157#[async_trait]
158impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
159 async fn handle_send_block(
160 &self,
161 peer: AuthorityIndex,
162 serialized_block: ExtendedSerializedBlock,
163 ) -> ConsensusResult<()> {
164 fail_point_async!("consensus-rpc-response");
165
166 let peer_hostname = &self.context.committee.authority(peer).hostname;
167
168 let signed_block: SignedBlock =
170 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
171
172 if peer != signed_block.author() {
174 self.context
175 .metrics
176 .node_metrics
177 .invalid_blocks
178 .with_label_values(&[
179 peer_hostname.as_str(),
180 "handle_send_block",
181 "UnexpectedAuthority",
182 ])
183 .inc();
184 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
185 info!("Block with wrong authority from {}: {}", peer, e);
186 return Err(e);
187 }
188
189 let (verified_block, reject_txn_votes) = self
191 .block_verifier
192 .verify_and_vote(signed_block, serialized_block.block)
193 .tap_err(|e| {
194 self.context
195 .metrics
196 .node_metrics
197 .invalid_blocks
198 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
199 .inc();
200 info!("Invalid block from {}: {}", peer, e);
201 })?;
202 let excluded_ancestors = self
203 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
204 .tap_err(|e| {
205 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
206 self.context
207 .metrics
208 .node_metrics
209 .invalid_blocks
210 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
211 .inc();
212 })?;
213
214 let block_ref = verified_block.reference();
215 debug!("Received block {} via send block.", block_ref);
216
217 self.context
218 .metrics
219 .node_metrics
220 .verified_blocks
221 .with_label_values(&[peer_hostname])
222 .inc();
223
224 let now = self.context.clock.timestamp_utc_ms();
225 let forward_time_drift =
226 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
227
228 self.context
229 .metrics
230 .node_metrics
231 .block_timestamp_drift_ms
232 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
233 .inc_by(forward_time_drift.as_millis() as u64);
234
235 self.commit_vote_monitor.observe_block(&verified_block);
238
239 self.round_tracker
241 .write()
242 .update_from_verified_block(&ExtendedBlock {
243 block: verified_block.clone(),
244 excluded_ancestors: excluded_ancestors.clone(),
245 });
246
247 let last_commit_index = self.dag_state.read().last_commit_index();
256 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
257 if last_commit_index
260 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
261 < quorum_commit_index
262 {
263 self.context
264 .metrics
265 .node_metrics
266 .rejected_blocks
267 .with_label_values(&["commit_lagging"])
268 .inc();
269 debug!(
270 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
271 block_ref, last_commit_index, quorum_commit_index,
272 );
273 return Err(ConsensusError::BlockRejected {
274 block_ref,
275 reason: format!(
276 "Last commit index is lagging quorum commit index too much ({} < {})",
277 last_commit_index, quorum_commit_index,
278 ),
279 });
280 }
281
282 if self.context.protocol_config.transaction_voting_enabled() {
285 self.transaction_certifier
286 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
287 }
288
289 let missing_ancestors = self
291 .core_dispatcher
292 .add_blocks(vec![verified_block.clone()])
293 .await
294 .map_err(|_| ConsensusError::Shutdown)?;
295
296 if !missing_ancestors.is_empty() {
298 self.context
299 .metrics
300 .node_metrics
301 .handler_received_block_missing_ancestors
302 .with_label_values(&[peer_hostname])
303 .inc_by(missing_ancestors.len() as u64);
304 let synchronizer = self.synchronizer.clone();
305 spawn_monitored_task!(async move {
306 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
311 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
312 }
313 });
314 }
315
316 let missing_excluded_ancestors = self
318 .core_dispatcher
319 .check_block_refs(excluded_ancestors)
320 .await
321 .map_err(|_| ConsensusError::Shutdown)?;
322 if !missing_excluded_ancestors.is_empty() {
323 self.context
324 .metrics
325 .node_metrics
326 .network_excluded_ancestors_sent_to_fetch
327 .with_label_values(&[peer_hostname])
328 .inc_by(missing_excluded_ancestors.len() as u64);
329
330 let synchronizer = self.synchronizer.clone();
331 spawn_monitored_task!(async move {
332 if let Err(err) = synchronizer
333 .fetch_blocks(missing_excluded_ancestors, peer)
334 .await
335 {
336 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
337 }
338 });
339 }
340
341 Ok(())
342 }
343
344 async fn handle_subscribe_blocks(
345 &self,
346 peer: AuthorityIndex,
347 last_received: Round,
348 ) -> ConsensusResult<BlockStream> {
349 fail_point_async!("consensus-rpc-response");
350
351 let past_proposed_blocks = {
360 let dag_state = self.dag_state.read();
361
362 let mut proposed_blocks =
363 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
364 if proposed_blocks.is_empty() {
365 let last_proposed_block = dag_state.get_last_proposed_block();
366 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
367 vec![last_proposed_block]
368 } else {
369 vec![]
370 };
371 }
372 stream::iter(
373 proposed_blocks
374 .into_iter()
375 .map(|block| ExtendedSerializedBlock {
376 block: block.serialized().clone(),
377 excluded_ancestors: vec![],
378 }),
379 )
380 };
381
382 let broadcasted_blocks = BroadcastedBlockStream::new(
383 peer,
384 self.rx_block_broadcast.resubscribe(),
385 self.subscription_counter.clone(),
386 );
387
388 Ok(Box::pin(past_proposed_blocks.chain(
390 broadcasted_blocks.map(ExtendedSerializedBlock::from),
391 )))
392 }
393
394 async fn handle_fetch_blocks(
402 &self,
403 _peer: AuthorityIndex,
404 mut block_refs: Vec<BlockRef>,
405 highest_accepted_rounds: Vec<Round>,
406 breadth_first: bool,
407 ) -> ConsensusResult<Vec<Bytes>> {
408 fail_point_async!("consensus-rpc-response");
409
410 if !highest_accepted_rounds.is_empty()
411 && highest_accepted_rounds.len() != self.context.committee.size()
412 {
413 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
414 highest_accepted_rounds.len(),
415 self.context.committee.size(),
416 ));
417 }
418
419 let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
421 self.context.parameters.max_blocks_per_sync
422 } else {
423 self.context.parameters.max_blocks_per_fetch
424 };
425 if block_refs.len() > max_response_num_blocks {
426 block_refs.truncate(max_response_num_blocks);
427 }
428
429 for block in &block_refs {
431 if !self.context.committee.is_valid_index(block.author) {
432 return Err(ConsensusError::InvalidAuthorityIndex {
433 index: block.author,
434 max: self.context.committee.size(),
435 });
436 }
437 if block.round == GENESIS_ROUND {
438 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
439 }
440 }
441
442 let blocks = if !highest_accepted_rounds.is_empty() {
444 block_refs.sort();
445 block_refs.dedup();
446 let mut blocks = self
447 .dag_state
448 .read()
449 .get_blocks(&block_refs)
450 .into_iter()
451 .flatten()
452 .collect::<Vec<_>>();
453
454 if breadth_first {
455 let mut missing_ancestors = blocks
457 .iter()
458 .flat_map(|block| block.ancestors().to_vec())
459 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
460 .collect::<BTreeSet<_>>()
461 .into_iter()
462 .collect::<Vec<_>>();
463
464 let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
467 if selected_num_blocks < missing_ancestors.len() {
468 missing_ancestors = missing_ancestors
469 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
470 .copied()
471 .collect::<Vec<_>>();
472 }
473 let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
474 blocks.extend(ancestor_blocks.into_iter().flatten());
475 } else {
476 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
479 for block_ref in blocks.iter().map(|b| b.reference()) {
480 let entry = lowest_missing_rounds
481 .entry(block_ref.author)
482 .or_insert(block_ref.round);
483 *entry = (*entry).min(block_ref.round);
484 }
485
486 let dag_state = self.dag_state.read();
492 for (authority, lowest_missing_round) in lowest_missing_rounds {
493 let highest_accepted_round = highest_accepted_rounds[authority];
494 if highest_accepted_round >= lowest_missing_round {
495 continue;
496 }
497 let missing_blocks = dag_state.get_cached_blocks_in_range(
498 authority,
499 highest_accepted_round + 1,
500 lowest_missing_round,
501 self.context
502 .parameters
503 .max_blocks_per_sync
504 .saturating_sub(blocks.len()),
505 );
506 blocks.extend(missing_blocks);
507 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
508 blocks.truncate(self.context.parameters.max_blocks_per_sync);
509 break;
510 }
511 }
512 }
513
514 blocks
515 } else {
516 self.dag_state
517 .read()
518 .get_blocks(&block_refs)
519 .into_iter()
520 .flatten()
521 .collect()
522 };
523
524 let bytes = blocks
526 .into_iter()
527 .map(|block| block.serialized().clone())
528 .collect::<Vec<_>>();
529 Ok(bytes)
530 }
531
532 async fn handle_fetch_commits(
533 &self,
534 _peer: AuthorityIndex,
535 commit_range: CommitRange,
536 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
537 fail_point_async!("consensus-rpc-response");
538
539 let inclusive_end = commit_range.end().min(
541 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
542 - 1,
543 );
544 let mut commits = self
545 .store
546 .scan_commits((commit_range.start()..=inclusive_end).into())?;
547 let mut certifier_block_refs = vec![];
548 'commit: while let Some(c) = commits.last() {
549 let index = c.index();
550 let votes = self.store.read_commit_votes(index)?;
551 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
552 for v in &votes {
553 stake_aggregator.add(v.author, &self.context.committee);
554 }
555 if stake_aggregator.reached_threshold(&self.context.committee) {
556 certifier_block_refs = votes;
557 break 'commit;
558 } else {
559 debug!(
560 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
561 index,
562 stake_aggregator.stake(),
563 stake_aggregator.threshold(&self.context.committee)
564 );
565 self.context
566 .metrics
567 .node_metrics
568 .commit_sync_fetch_commits_handler_uncertified_skipped
569 .inc();
570 commits.pop();
571 }
572 }
573 let certifier_blocks = self
574 .store
575 .read_blocks(&certifier_block_refs)?
576 .into_iter()
577 .flatten()
578 .collect();
579 Ok((commits, certifier_blocks))
580 }
581
582 async fn handle_fetch_latest_blocks(
583 &self,
584 peer: AuthorityIndex,
585 authorities: Vec<AuthorityIndex>,
586 ) -> ConsensusResult<Vec<Bytes>> {
587 fail_point_async!("consensus-rpc-response");
588
589 if authorities.len() > self.context.committee.size() {
590 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
591 }
592
593 for authority in &authorities {
595 if !self.context.committee.is_valid_index(*authority) {
596 return Err(ConsensusError::InvalidAuthorityIndex {
597 index: *authority,
598 max: self.context.committee.size(),
599 });
600 }
601 }
602
603 let mut blocks = vec![];
607 let dag_state = self.dag_state.read();
608 for authority in authorities {
609 let block = dag_state.get_last_block_for_authority(authority);
610
611 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
612
613 if block.round() != GENESIS_ROUND {
615 blocks.push(block);
616 }
617 }
618
619 let result = blocks
621 .into_iter()
622 .map(|block| block.serialized().clone())
623 .collect::<Vec<_>>();
624
625 Ok(result)
626 }
627
628 async fn handle_get_latest_rounds(
629 &self,
630 _peer: AuthorityIndex,
631 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
632 fail_point_async!("consensus-rpc-response");
633
634 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
635
636 let blocks = self
637 .dag_state
638 .read()
639 .get_last_cached_block_per_authority(Round::MAX);
640 let highest_accepted_rounds = blocks
641 .into_iter()
642 .map(|(block, _)| block.round())
643 .collect::<Vec<_>>();
644
645 Ok((highest_received_rounds, highest_accepted_rounds))
646 }
647}
648
649#[async_trait]
650impl<C: CoreThreadDispatcher> ObserverNetworkService for AuthorityService<C> {
651 async fn handle_stream_blocks(
652 &self,
653 _peer: NodeId,
654 _request_stream: BlockRequestStream,
655 ) -> ConsensusResult<ObserverBlockStream> {
656 todo!("Observer block streaming not yet implemented")
658 }
659
660 async fn handle_fetch_blocks(
661 &self,
662 _peer: NodeId,
663 _block_refs: Vec<BlockRef>,
664 ) -> ConsensusResult<Vec<Bytes>> {
665 Err(ConsensusError::NetworkRequest(
668 "Observer fetch blocks not yet implemented".to_string(),
669 ))
670 }
671
672 async fn handle_fetch_commits(
673 &self,
674 _peer: NodeId,
675 _commit_range: CommitRange,
676 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
677 Err(ConsensusError::NetworkRequest(
679 "Observer fetch commits not yet implemented".to_string(),
680 ))
681 }
682}
683
684struct Counter {
685 count: usize,
686 subscriptions_by_authority: Vec<usize>,
687}
688
689struct SubscriptionCounter {
691 context: Arc<Context>,
692 counter: parking_lot::Mutex<Counter>,
693}
694
695impl SubscriptionCounter {
696 fn new(context: Arc<Context>) -> Self {
697 for (_, authority) in context.committee.authorities() {
699 context
700 .metrics
701 .node_metrics
702 .subscribed_by
703 .with_label_values(&[authority.hostname.as_str()])
704 .set(0);
705 }
706
707 Self {
708 counter: parking_lot::Mutex::new(Counter {
709 count: 0,
710 subscriptions_by_authority: vec![0; context.committee.size()],
711 }),
712 context,
713 }
714 }
715
716 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
717 let mut counter = self.counter.lock();
718 counter.count += 1;
719 counter.subscriptions_by_authority[peer] += 1;
720
721 let peer_hostname = &self.context.committee.authority(peer).hostname;
722 self.context
723 .metrics
724 .node_metrics
725 .subscribed_by
726 .with_label_values(&[peer_hostname])
727 .set(1);
728
729 Ok(())
730 }
731
732 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
733 let mut counter = self.counter.lock();
734 counter.count -= 1;
735 counter.subscriptions_by_authority[peer] -= 1;
736
737 if counter.subscriptions_by_authority[peer] == 0 {
738 let peer_hostname = &self.context.committee.authority(peer).hostname;
739 self.context
740 .metrics
741 .node_metrics
742 .subscribed_by
743 .with_label_values(&[peer_hostname])
744 .set(0);
745 }
746
747 Ok(())
748 }
749}
750
751type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
754
755struct BroadcastStream<T> {
758 peer: AuthorityIndex,
759 inner: ReusableBoxFuture<
761 'static,
762 (
763 Result<T, broadcast::error::RecvError>,
764 broadcast::Receiver<T>,
765 ),
766 >,
767 subscription_counter: Arc<SubscriptionCounter>,
769}
770
771impl<T: 'static + Clone + Send> BroadcastStream<T> {
772 pub fn new(
773 peer: AuthorityIndex,
774 rx: broadcast::Receiver<T>,
775 subscription_counter: Arc<SubscriptionCounter>,
776 ) -> Self {
777 if let Err(err) = subscription_counter.increment(peer) {
778 match err {
779 ConsensusError::Shutdown => {}
780 _ => panic!("Unexpected error: {err}"),
781 }
782 }
783 Self {
784 peer,
785 inner: ReusableBoxFuture::new(make_recv_future(rx)),
786 subscription_counter,
787 }
788 }
789}
790
791impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
792 type Item = T;
793
794 fn poll_next(
795 mut self: Pin<&mut Self>,
796 cx: &mut task::Context<'_>,
797 ) -> task::Poll<Option<Self::Item>> {
798 let peer = self.peer;
799 let maybe_item = loop {
800 let (result, rx) = ready!(self.inner.poll(cx));
801 self.inner.set(make_recv_future(rx));
802
803 match result {
804 Ok(item) => break Some(item),
805 Err(broadcast::error::RecvError::Closed) => {
806 info!("Block BroadcastedBlockStream {} closed", peer);
807 break None;
808 }
809 Err(broadcast::error::RecvError::Lagged(n)) => {
810 warn!(
811 "Block BroadcastedBlockStream {} lagged by {} messages",
812 peer, n
813 );
814 continue;
815 }
816 }
817 };
818 task::Poll::Ready(maybe_item)
819 }
820}
821
822impl<T> Drop for BroadcastStream<T> {
823 fn drop(&mut self) {
824 if let Err(err) = self.subscription_counter.decrement(self.peer) {
825 match err {
826 ConsensusError::Shutdown => {}
827 _ => panic!("Unexpected error: {err}"),
828 }
829 }
830 }
831}
832
833async fn make_recv_future<T: Clone>(
834 mut rx: broadcast::Receiver<T>,
835) -> (
836 Result<T, broadcast::error::RecvError>,
837 broadcast::Receiver<T>,
838) {
839 let result = rx.recv().await;
840 (result, rx)
841}
842
843#[cfg(test)]
846mod tests {
847 use std::{
848 collections::{BTreeMap, BTreeSet},
849 sync::Arc,
850 time::Duration,
851 };
852
853 use async_trait::async_trait;
854 use bytes::Bytes;
855 use consensus_config::AuthorityIndex;
856 use consensus_types::block::{BlockDigest, BlockRef, Round};
857 use mysten_metrics::monitored_mpsc;
858 use parking_lot::{Mutex, RwLock};
859 use tokio::{sync::broadcast, time::sleep};
860
861 use futures::StreamExt as _;
862
863 use crate::{
864 authority_service::AuthorityService,
865 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
866 commit::{CertifiedCommits, CommitRange},
867 commit_vote_monitor::CommitVoteMonitor,
868 context::Context,
869 core_thread::{CoreError, CoreThreadDispatcher},
870 dag_state::DagState,
871 error::ConsensusResult,
872 network::{
873 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
874 ValidatorNetworkClient, ValidatorNetworkService,
875 },
876 round_tracker::RoundTracker,
877 storage::mem_store::MemStore,
878 synchronizer::Synchronizer,
879 test_dag_builder::DagBuilder,
880 transaction_certifier::TransactionCertifier,
881 };
882 struct FakeCoreThreadDispatcher {
883 blocks: Mutex<Vec<VerifiedBlock>>,
884 }
885
886 impl FakeCoreThreadDispatcher {
887 fn new() -> Self {
888 Self {
889 blocks: Mutex::new(vec![]),
890 }
891 }
892
893 fn get_blocks(&self) -> Vec<VerifiedBlock> {
894 self.blocks.lock().clone()
895 }
896 }
897
898 #[async_trait]
899 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
900 async fn add_blocks(
901 &self,
902 blocks: Vec<VerifiedBlock>,
903 ) -> Result<BTreeSet<BlockRef>, CoreError> {
904 let block_refs = blocks.iter().map(|b| b.reference()).collect();
905 self.blocks.lock().extend(blocks);
906 Ok(block_refs)
907 }
908
909 async fn check_block_refs(
910 &self,
911 _block_refs: Vec<BlockRef>,
912 ) -> Result<BTreeSet<BlockRef>, CoreError> {
913 Ok(BTreeSet::new())
914 }
915
916 async fn add_certified_commits(
917 &self,
918 _commits: CertifiedCommits,
919 ) -> Result<BTreeSet<BlockRef>, CoreError> {
920 todo!()
921 }
922
923 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
924 Ok(())
925 }
926
927 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
928 Ok(Default::default())
929 }
930
931 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
932 todo!()
933 }
934
935 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
936 todo!()
937 }
938 }
939
940 #[derive(Default)]
941 struct FakeNetworkClient {}
942
943 #[async_trait]
944 impl ValidatorNetworkClient for FakeNetworkClient {
945 async fn send_block(
946 &self,
947 _peer: AuthorityIndex,
948 _block: &VerifiedBlock,
949 _timeout: Duration,
950 ) -> ConsensusResult<()> {
951 unimplemented!("Unimplemented")
952 }
953
954 async fn subscribe_blocks(
955 &self,
956 _peer: AuthorityIndex,
957 _last_received: Round,
958 _timeout: Duration,
959 ) -> ConsensusResult<BlockStream> {
960 unimplemented!("Unimplemented")
961 }
962
963 async fn fetch_blocks(
964 &self,
965 _peer: AuthorityIndex,
966 _block_refs: Vec<BlockRef>,
967 _highest_accepted_rounds: Vec<Round>,
968 _breadth_first: bool,
969 _timeout: Duration,
970 ) -> ConsensusResult<Vec<Bytes>> {
971 unimplemented!("Unimplemented")
972 }
973
974 async fn fetch_commits(
975 &self,
976 _peer: AuthorityIndex,
977 _commit_range: CommitRange,
978 _timeout: Duration,
979 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
980 unimplemented!("Unimplemented")
981 }
982
983 async fn fetch_latest_blocks(
984 &self,
985 _peer: AuthorityIndex,
986 _authorities: Vec<AuthorityIndex>,
987 _timeout: Duration,
988 ) -> ConsensusResult<Vec<Bytes>> {
989 unimplemented!("Unimplemented")
990 }
991
992 async fn get_latest_rounds(
993 &self,
994 _peer: AuthorityIndex,
995 _timeout: Duration,
996 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
997 unimplemented!("Unimplemented")
998 }
999 }
1000
1001 #[async_trait]
1002 impl ObserverNetworkClient for FakeNetworkClient {
1003 async fn stream_blocks(
1004 &self,
1005 _peer: crate::network::PeerId,
1006 _request_stream: crate::network::BlockRequestStream,
1007 _timeout: Duration,
1008 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
1009 unimplemented!("Unimplemented")
1010 }
1011
1012 async fn fetch_blocks(
1013 &self,
1014 _peer: crate::network::PeerId,
1015 _block_refs: Vec<BlockRef>,
1016 _timeout: Duration,
1017 ) -> ConsensusResult<Vec<Bytes>> {
1018 unimplemented!("Unimplemented")
1019 }
1020
1021 async fn fetch_commits(
1022 &self,
1023 _peer: crate::network::PeerId,
1024 _commit_range: CommitRange,
1025 _timeout: Duration,
1026 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
1027 unimplemented!("Unimplemented")
1028 }
1029 }
1030
1031 #[tokio::test(flavor = "current_thread", start_paused = true)]
1032 async fn test_handle_send_block() {
1033 let (context, _keys) = Context::new_for_test(4);
1034 let context = Arc::new(context);
1035 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1036 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1037 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1038 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1039 let fake_client = Arc::new(FakeNetworkClient::default());
1040 let network_client = Arc::new(SynchronizerClient::new(
1041 context.clone(),
1042 Some(fake_client.clone()),
1043 Some(fake_client.clone()),
1044 ));
1045 let (blocks_sender, _blocks_receiver) =
1046 monitored_mpsc::unbounded_channel("consensus_block_output");
1047 let store = Arc::new(MemStore::new());
1048 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1049 let transaction_certifier = TransactionCertifier::new(
1050 context.clone(),
1051 block_verifier.clone(),
1052 dag_state.clone(),
1053 blocks_sender,
1054 );
1055 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1056 let synchronizer = Synchronizer::start(
1057 network_client,
1058 context.clone(),
1059 core_dispatcher.clone(),
1060 commit_vote_monitor.clone(),
1061 block_verifier.clone(),
1062 transaction_certifier.clone(),
1063 round_tracker.clone(),
1064 dag_state.clone(),
1065 false,
1066 );
1067 let authority_service = Arc::new(AuthorityService::new(
1068 context.clone(),
1069 block_verifier,
1070 commit_vote_monitor,
1071 round_tracker,
1072 synchronizer,
1073 core_dispatcher.clone(),
1074 rx_block_broadcast,
1075 transaction_certifier,
1076 dag_state,
1077 store,
1078 ));
1079
1080 let now = context.clock.timestamp_utc_ms();
1082 let max_drift = context.parameters.max_forward_time_drift;
1083 let input_block = VerifiedBlock::new_for_test(
1084 TestBlock::new(9, 0)
1085 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1086 .build(),
1087 );
1088
1089 let service = authority_service.clone();
1090 let serialized = ExtendedSerializedBlock {
1091 block: input_block.serialized().clone(),
1092 excluded_ancestors: vec![],
1093 };
1094
1095 tokio::spawn({
1096 let service = service.clone();
1097 let context = context.clone();
1098 async move {
1099 service
1100 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1101 .await
1102 .unwrap();
1103 }
1104 });
1105
1106 sleep(max_drift / 2).await;
1107
1108 let blocks = core_dispatcher.get_blocks();
1109 assert_eq!(blocks.len(), 1);
1110 assert_eq!(blocks[0], input_block);
1111
1112 let invalid_block =
1114 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1115 let extended_block = ExtendedSerializedBlock {
1116 block: invalid_block.serialized().clone(),
1117 excluded_ancestors: vec![],
1118 };
1119 service
1120 .handle_send_block(
1121 context.committee.to_authority_index(0).unwrap(),
1122 extended_block,
1123 )
1124 .await
1125 .unwrap_err();
1126
1127 let invalid_excluded_ancestors = vec![
1129 bcs::to_bytes(&BlockRef::new(
1130 10,
1131 AuthorityIndex::new_for_test(1000),
1132 BlockDigest::MIN,
1133 ))
1134 .unwrap(),
1135 vec![3u8; 40],
1136 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1137 ];
1138 let extended_block = ExtendedSerializedBlock {
1139 block: input_block.serialized().clone(),
1140 excluded_ancestors: invalid_excluded_ancestors,
1141 };
1142 service
1143 .handle_send_block(
1144 context.committee.to_authority_index(0).unwrap(),
1145 extended_block,
1146 )
1147 .await
1148 .unwrap_err();
1149 }
1150
1151 #[tokio::test(flavor = "current_thread", start_paused = true)]
1152 async fn test_handle_fetch_blocks() {
1153 const NUM_AUTHORITIES: usize = 40;
1156 const NUM_ROUNDS: usize = 40;
1157 let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1158 let context = Arc::new(context);
1159 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1160 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1161 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1162 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1163 let fake_client = Arc::new(FakeNetworkClient::default());
1164 let network_client = Arc::new(SynchronizerClient::new(
1165 context.clone(),
1166 Some(fake_client.clone()),
1167 Some(fake_client.clone()),
1168 ));
1169 let (blocks_sender, _blocks_receiver) =
1170 monitored_mpsc::unbounded_channel("consensus_block_output");
1171 let store = Arc::new(MemStore::new());
1172 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1173 let transaction_certifier = TransactionCertifier::new(
1174 context.clone(),
1175 block_verifier.clone(),
1176 dag_state.clone(),
1177 blocks_sender,
1178 );
1179 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1180 let synchronizer = Synchronizer::start(
1181 network_client,
1182 context.clone(),
1183 core_dispatcher.clone(),
1184 commit_vote_monitor.clone(),
1185 block_verifier.clone(),
1186 transaction_certifier.clone(),
1187 round_tracker.clone(),
1188 dag_state.clone(),
1189 false,
1190 );
1191 let authority_service = Arc::new(AuthorityService::new(
1192 context.clone(),
1193 block_verifier,
1194 commit_vote_monitor,
1195 round_tracker,
1196 synchronizer,
1197 core_dispatcher.clone(),
1198 rx_block_broadcast,
1199 transaction_certifier,
1200 dag_state.clone(),
1201 store,
1202 ));
1203
1204 let mut dag_builder = DagBuilder::new(context.clone());
1206 dag_builder
1207 .layers(1..=(NUM_ROUNDS as u32))
1208 .build()
1209 .persist_layers(dag_state.clone());
1210 let all_blocks = dag_builder.all_blocks();
1211
1212 let missing_block_refs: Vec<BlockRef> = all_blocks
1214 .iter()
1215 .rev()
1216 .take(2)
1217 .map(|b| b.reference())
1218 .collect();
1219 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1220 let results = authority_service
1221 .handle_fetch_blocks(
1222 AuthorityIndex::new_for_test(0),
1223 missing_block_refs.clone(),
1224 highest_accepted_rounds,
1225 true,
1226 )
1227 .await
1228 .unwrap();
1229
1230 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1232 .iter()
1233 .map(|b| {
1234 let signed = bcs::from_bytes(b).unwrap();
1235 let block = VerifiedBlock::new_verified(signed, b.clone());
1236 (block.reference(), block)
1237 })
1238 .collect();
1239 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1240 for b in &missing_block_refs {
1242 assert!(blocks.contains_key(b));
1243 }
1244 let num_missing_ancestors = blocks
1245 .keys()
1246 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1247 .count();
1248 assert_eq!(
1249 num_missing_ancestors,
1250 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1251 );
1252
1253 let missing_round = NUM_ROUNDS as Round - 3;
1255 let missing_block_refs: Vec<BlockRef> = all_blocks
1256 .iter()
1257 .filter(|b| b.reference().round == missing_round)
1258 .map(|b| b.reference())
1259 .take(2)
1260 .collect();
1261 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1262 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1264 let results = authority_service
1265 .handle_fetch_blocks(
1266 AuthorityIndex::new_for_test(0),
1267 missing_block_refs.clone(),
1268 highest_accepted_rounds,
1269 false,
1270 )
1271 .await
1272 .unwrap();
1273
1274 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1276 .iter()
1277 .map(|b| {
1278 let signed = bcs::from_bytes(b).unwrap();
1279 let block = VerifiedBlock::new_verified(signed, b.clone());
1280 (block.reference(), block)
1281 })
1282 .collect();
1283 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1284 for b in &missing_block_refs {
1286 assert!(blocks.contains_key(b));
1287 }
1288 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1290 for b in blocks.keys() {
1291 assert!(b.round <= missing_round);
1292 assert!(expected_authors.contains(&b.author));
1293 }
1294
1295 let missing_block_refs: Vec<BlockRef> = all_blocks
1297 .iter()
1298 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1299 .map(|b| b.reference())
1300 .take(5)
1301 .collect();
1302 let results = authority_service
1303 .handle_fetch_blocks(
1304 AuthorityIndex::new_for_test(0),
1305 missing_block_refs.clone(),
1306 vec![],
1307 false,
1308 )
1309 .await
1310 .unwrap();
1311
1312 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1314 .iter()
1315 .map(|b| {
1316 let signed = bcs::from_bytes(b).unwrap();
1317 let block = VerifiedBlock::new_verified(signed, b.clone());
1318 (block.reference(), block)
1319 })
1320 .collect();
1321 assert_eq!(blocks.len(), 5);
1322 for b in &missing_block_refs {
1323 assert!(blocks.contains_key(b));
1324 }
1325 }
1326
1327 #[tokio::test(flavor = "current_thread", start_paused = true)]
1328 async fn test_handle_fetch_latest_blocks() {
1329 let (context, _keys) = Context::new_for_test(4);
1331 let context = Arc::new(context);
1332 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1333 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1334 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1335 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1336 let fake_client = Arc::new(FakeNetworkClient::default());
1337 let network_client = Arc::new(SynchronizerClient::new(
1338 context.clone(),
1339 Some(fake_client.clone()),
1340 Some(fake_client.clone()),
1341 ));
1342 let (blocks_sender, _blocks_receiver) =
1343 monitored_mpsc::unbounded_channel("consensus_block_output");
1344 let store = Arc::new(MemStore::new());
1345 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1346 let transaction_certifier = TransactionCertifier::new(
1347 context.clone(),
1348 block_verifier.clone(),
1349 dag_state.clone(),
1350 blocks_sender,
1351 );
1352 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1353 let synchronizer = Synchronizer::start(
1354 network_client,
1355 context.clone(),
1356 core_dispatcher.clone(),
1357 commit_vote_monitor.clone(),
1358 block_verifier.clone(),
1359 transaction_certifier.clone(),
1360 round_tracker.clone(),
1361 dag_state.clone(),
1362 true,
1363 );
1364 let authority_service = Arc::new(AuthorityService::new(
1365 context.clone(),
1366 block_verifier,
1367 commit_vote_monitor,
1368 round_tracker,
1369 synchronizer,
1370 core_dispatcher.clone(),
1371 rx_block_broadcast,
1372 transaction_certifier,
1373 dag_state.clone(),
1374 store,
1375 ));
1376
1377 let mut dag_builder = DagBuilder::new(context.clone());
1379 dag_builder
1380 .layers(1..=10)
1381 .authorities(vec![AuthorityIndex::new_for_test(2)])
1382 .equivocate(1)
1383 .build()
1384 .persist_layers(dag_state);
1385
1386 let authorities_to_request = vec![
1388 AuthorityIndex::new_for_test(1),
1389 AuthorityIndex::new_for_test(2),
1390 ];
1391 let results = authority_service
1392 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1393 .await;
1394
1395 let serialised_blocks = results.unwrap();
1397 for serialised_block in serialised_blocks {
1398 let signed_block: SignedBlock =
1399 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1400 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1401
1402 assert_eq!(verified_block.round(), 10);
1403 }
1404 }
1405
1406 #[tokio::test(flavor = "current_thread", start_paused = true)]
1407 async fn test_handle_subscribe_blocks() {
1408 let (context, _keys) = Context::new_for_test(4);
1409 let context = Arc::new(context);
1410 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1411 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1412 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1413 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1414 let fake_client = Arc::new(FakeNetworkClient::default());
1415 let network_client = Arc::new(SynchronizerClient::new(
1416 context.clone(),
1417 Some(fake_client.clone()),
1418 Some(fake_client.clone()),
1419 ));
1420 let (blocks_sender, _blocks_receiver) =
1421 monitored_mpsc::unbounded_channel("consensus_block_output");
1422 let store = Arc::new(MemStore::new());
1423 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1424 let transaction_certifier = TransactionCertifier::new(
1425 context.clone(),
1426 block_verifier.clone(),
1427 dag_state.clone(),
1428 blocks_sender,
1429 );
1430 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1431 let synchronizer = Synchronizer::start(
1432 network_client,
1433 context.clone(),
1434 core_dispatcher.clone(),
1435 commit_vote_monitor.clone(),
1436 block_verifier.clone(),
1437 transaction_certifier.clone(),
1438 round_tracker.clone(),
1439 dag_state.clone(),
1440 false,
1441 );
1442
1443 dag_state
1445 .write()
1446 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1447 dag_state
1448 .write()
1449 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1450 dag_state
1451 .write()
1452 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1453
1454 let authority_service = Arc::new(AuthorityService::new(
1455 context.clone(),
1456 block_verifier,
1457 commit_vote_monitor,
1458 round_tracker,
1459 synchronizer,
1460 core_dispatcher.clone(),
1461 rx_block_broadcast,
1462 transaction_certifier,
1463 dag_state.clone(),
1464 store,
1465 ));
1466
1467 let peer = context.committee.to_authority_index(1).unwrap();
1468
1469 {
1472 let mut stream = authority_service
1473 .handle_subscribe_blocks(peer, 100)
1474 .await
1475 .unwrap();
1476 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1477 assert_eq!(
1478 block.round(),
1479 15,
1480 "Should return last proposed block as fallback"
1481 );
1482 assert_eq!(block.author().value(), 0);
1483 }
1484
1485 {
1488 let mut stream = authority_service
1489 .handle_subscribe_blocks(peer, 7)
1490 .await
1491 .unwrap();
1492
1493 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1494 assert_eq!(block1.round(), 10, "Should return block at round 10");
1495
1496 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1497 assert_eq!(block2.round(), 15, "Should return block at round 15");
1498 }
1499 }
1500
1501 #[tokio::test(flavor = "current_thread", start_paused = true)]
1502 async fn test_handle_subscribe_blocks_not_proposed() {
1503 let (context, _keys) = Context::new_for_test(4);
1504 let context = Arc::new(context);
1505 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1506 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1507 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1508 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1509 let fake_client = Arc::new(FakeNetworkClient::default());
1510 let network_client = Arc::new(SynchronizerClient::new(
1511 context.clone(),
1512 Some(fake_client.clone()),
1513 Some(fake_client.clone()),
1514 ));
1515 let (blocks_sender, _blocks_receiver) =
1516 monitored_mpsc::unbounded_channel("consensus_block_output");
1517 let store = Arc::new(MemStore::new());
1518 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1519 let transaction_certifier = TransactionCertifier::new(
1520 context.clone(),
1521 block_verifier.clone(),
1522 dag_state.clone(),
1523 blocks_sender,
1524 );
1525 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1526 let synchronizer = Synchronizer::start(
1527 network_client,
1528 context.clone(),
1529 core_dispatcher.clone(),
1530 commit_vote_monitor.clone(),
1531 block_verifier.clone(),
1532 transaction_certifier.clone(),
1533 round_tracker.clone(),
1534 dag_state.clone(),
1535 false,
1536 );
1537
1538 let authority_service = Arc::new(AuthorityService::new(
1541 context.clone(),
1542 block_verifier,
1543 commit_vote_monitor,
1544 round_tracker,
1545 synchronizer,
1546 core_dispatcher.clone(),
1547 rx_block_broadcast,
1548 transaction_certifier,
1549 dag_state.clone(),
1550 store,
1551 ));
1552
1553 let peer = context.committee.to_authority_index(1).unwrap();
1554
1555 let mut stream = authority_service
1557 .handle_subscribe_blocks(peer, 0)
1558 .await
1559 .unwrap();
1560
1561 use futures::poll;
1563 use std::task::Poll;
1564 let poll_result = poll!(stream.next());
1565 assert!(
1566 matches!(poll_result, Poll::Pending),
1567 "Should not receive genesis block on subscription stream"
1568 );
1569 }
1570}