1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use consensus_types::block::{BlockRef, Round};
10use futures::{Stream, StreamExt, ready, stream, task};
11use mysten_metrics::spawn_monitored_task;
12use parking_lot::RwLock;
13use sui_macros::fail_point_async;
14use tap::TapFallible;
15use tokio::sync::broadcast;
16use tokio_util::sync::ReusableBoxFuture;
17use tracing::{debug, info, warn};
18
19use crate::{
20 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
21 block_sync_service::BlockSyncService,
22 block_verifier::BlockVerifier,
23 commit::{CommitRange, TrustedCommit},
24 commit_vote_monitor::{CommitVoteMonitor, is_commit_lagging},
25 context::Context,
26 core_thread::CoreThreadDispatcher,
27 dag_state::DagState,
28 error::{ConsensusError, ConsensusResult},
29 network::{BlockStream, ExtendedSerializedBlock, PeerId, ValidatorNetworkService},
30 round_tracker::RoundTracker,
31 synchronizer::SynchronizerHandle,
32 transaction_vote_tracker::TransactionVoteTracker,
33};
34
35pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
37 context: Arc<Context>,
38 commit_vote_monitor: Arc<CommitVoteMonitor>,
39 block_verifier: Arc<dyn BlockVerifier>,
40 synchronizer: Arc<SynchronizerHandle>,
41 core_dispatcher: Arc<C>,
42 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
43 subscription_counter: Arc<SubscriptionCounter>,
44 transaction_vote_tracker: TransactionVoteTracker,
45 dag_state: Arc<RwLock<DagState>>,
46 round_tracker: Arc<RwLock<RoundTracker>>,
47 block_sync_service: Arc<BlockSyncService>,
48}
49
50impl<C: CoreThreadDispatcher> AuthorityService<C> {
51 pub(crate) fn new(
52 context: Arc<Context>,
53 block_verifier: Arc<dyn BlockVerifier>,
54 commit_vote_monitor: Arc<CommitVoteMonitor>,
55 round_tracker: Arc<RwLock<RoundTracker>>,
56 synchronizer: Arc<SynchronizerHandle>,
57 core_dispatcher: Arc<C>,
58 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
59 transaction_vote_tracker: TransactionVoteTracker,
60 dag_state: Arc<RwLock<DagState>>,
61 block_sync_service: Arc<BlockSyncService>,
62 ) -> Self {
63 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
64 Self {
65 context,
66 block_verifier,
67 commit_vote_monitor,
68 synchronizer,
69 core_dispatcher,
70 rx_block_broadcast,
71 subscription_counter,
72 transaction_vote_tracker,
73 dag_state,
74 round_tracker,
75 block_sync_service,
76 }
77 }
78
79 fn parse_excluded_ancestors(
81 &self,
82 peer: AuthorityIndex,
83 block: &VerifiedBlock,
84 mut excluded_ancestors: Vec<Vec<u8>>,
85 ) -> ConsensusResult<Vec<BlockRef>> {
86 let peer_hostname = &self.context.committee.authority(peer).hostname;
87
88 let excluded_ancestors_limit = self.context.committee.size() * 2;
89 if excluded_ancestors.len() > excluded_ancestors_limit {
90 debug!(
91 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
92 excluded_ancestors.len() - excluded_ancestors_limit,
93 peer,
94 peer_hostname,
95 );
96 excluded_ancestors.truncate(excluded_ancestors_limit);
97 }
98
99 let excluded_ancestors = excluded_ancestors
100 .into_iter()
101 .map(|serialized| {
102 let block_ref: BlockRef =
103 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
104 if !self.context.committee.is_valid_index(block_ref.author) {
105 return Err(ConsensusError::InvalidAuthorityIndex {
106 index: block_ref.author,
107 max: self.context.committee.size(),
108 });
109 }
110 if block_ref.round >= block.round() {
111 return Err(ConsensusError::InvalidAncestorRound {
112 ancestor: block_ref.round,
113 block: block.round(),
114 });
115 }
116 Ok(block_ref)
117 })
118 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
119
120 for excluded_ancestor in &excluded_ancestors {
121 let excluded_ancestor_hostname = &self
122 .context
123 .committee
124 .authority(excluded_ancestor.author)
125 .hostname;
126 self.context
127 .metrics
128 .node_metrics
129 .network_excluded_ancestors_count_by_authority
130 .with_label_values(&[excluded_ancestor_hostname])
131 .inc();
132 }
133 self.context
134 .metrics
135 .node_metrics
136 .network_received_excluded_ancestors_from_authority
137 .with_label_values(&[peer_hostname])
138 .inc_by(excluded_ancestors.len() as u64);
139
140 Ok(excluded_ancestors)
141 }
142}
143
144#[async_trait]
145impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
146 async fn handle_send_block(
147 &self,
148 peer: AuthorityIndex,
149 serialized_block: ExtendedSerializedBlock,
150 ) -> ConsensusResult<()> {
151 fail_point_async!("consensus-rpc-response");
152
153 let peer_hostname = &self.context.committee.authority(peer).hostname;
154
155 let signed_block: SignedBlock =
157 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
158
159 if peer != signed_block.author() {
161 self.context
162 .metrics
163 .node_metrics
164 .invalid_blocks
165 .with_label_values(&[
166 peer_hostname.as_str(),
167 "handle_send_block",
168 "UnexpectedAuthority",
169 ])
170 .inc();
171 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
172 info!("Block with wrong authority from {}: {}", peer, e);
173 return Err(e);
174 }
175
176 let (verified_block, reject_txn_votes) = self
178 .block_verifier
179 .verify_and_vote(signed_block, serialized_block.block)
180 .tap_err(|e| {
181 self.context
182 .metrics
183 .node_metrics
184 .invalid_blocks
185 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
186 .inc();
187 info!("Invalid block from {}: {}", peer, e);
188 })?;
189 let excluded_ancestors = self
190 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
191 .tap_err(|e| {
192 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
193 self.context
194 .metrics
195 .node_metrics
196 .invalid_blocks
197 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
198 .inc();
199 })?;
200
201 let block_ref = verified_block.reference();
202 debug!("Received block {} via send block.", block_ref);
203
204 self.context
205 .metrics
206 .node_metrics
207 .verified_blocks
208 .with_label_values(&[peer_hostname])
209 .inc();
210
211 let now = self.context.clock.timestamp_utc_ms();
212 let forward_time_drift =
213 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
214
215 self.context
216 .metrics
217 .node_metrics
218 .block_timestamp_drift_ms
219 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
220 .inc_by(forward_time_drift.as_millis() as u64);
221
222 self.commit_vote_monitor.observe_block(&verified_block);
225
226 self.round_tracker
228 .write()
229 .update_from_verified_block(&ExtendedBlock {
230 block: verified_block.clone(),
231 excluded_ancestors: excluded_ancestors.clone(),
232 });
233
234 let last_commit_index = self.dag_state.read().last_commit_index();
243 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
244 if is_commit_lagging(
245 self.context.as_ref(),
246 last_commit_index,
247 quorum_commit_index,
248 ) {
249 self.context
250 .metrics
251 .node_metrics
252 .rejected_blocks
253 .with_label_values(&["commit_lagging"])
254 .inc();
255 debug!(
256 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
257 block_ref, last_commit_index, quorum_commit_index,
258 );
259 return Err(ConsensusError::BlockRejected {
260 block_ref,
261 reason: format!(
262 "Last commit index is lagging quorum commit index too much ({} < {})",
263 last_commit_index, quorum_commit_index,
264 ),
265 });
266 }
267
268 if self.context.protocol_config.transaction_voting_enabled() {
271 self.transaction_vote_tracker
272 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
273 }
274
275 let missing_ancestors = self
277 .core_dispatcher
278 .add_blocks(vec![verified_block.clone()])
279 .await
280 .map_err(|_| ConsensusError::Shutdown)?;
281
282 if !missing_ancestors.is_empty() {
284 self.context
285 .metrics
286 .node_metrics
287 .handler_received_block_missing_ancestors
288 .with_label_values(&[peer_hostname])
289 .inc_by(missing_ancestors.len() as u64);
290 let synchronizer = self.synchronizer.clone();
291 spawn_monitored_task!(async move {
292 if let Err(err) = synchronizer
297 .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
298 .await
299 {
300 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
301 }
302 });
303 }
304
305 let missing_excluded_ancestors = self
307 .core_dispatcher
308 .check_block_refs(excluded_ancestors)
309 .await
310 .map_err(|_| ConsensusError::Shutdown)?;
311 if !missing_excluded_ancestors.is_empty() {
312 self.context
313 .metrics
314 .node_metrics
315 .network_excluded_ancestors_sent_to_fetch
316 .with_label_values(&[peer_hostname])
317 .inc_by(missing_excluded_ancestors.len() as u64);
318
319 let synchronizer = self.synchronizer.clone();
320 spawn_monitored_task!(async move {
321 if let Err(err) = synchronizer
322 .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
323 .await
324 {
325 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
326 }
327 });
328 }
329
330 Ok(())
331 }
332
333 async fn handle_subscribe_blocks(
334 &self,
335 peer: AuthorityIndex,
336 last_received: Round,
337 ) -> ConsensusResult<BlockStream> {
338 fail_point_async!("consensus-rpc-response");
339
340 let past_proposed_blocks = {
349 let dag_state = self.dag_state.read();
350
351 let mut proposed_blocks =
352 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
353 if proposed_blocks.is_empty() {
354 let last_proposed_block = dag_state
355 .get_last_proposed_block()
356 .expect("Last proposed block should be returned on validators");
357 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
358 vec![last_proposed_block]
359 } else {
360 vec![]
361 };
362 }
363 stream::iter(
364 proposed_blocks
365 .into_iter()
366 .map(|block| ExtendedSerializedBlock {
367 block: block.serialized().clone(),
368 excluded_ancestors: vec![],
369 }),
370 )
371 };
372
373 const MAX_BLOCKS_PER_POLL: usize = 1;
374 let broadcasted_blocks = BroadcastedBlockStream::new(
375 PeerId::Validator(peer),
376 self.rx_block_broadcast.resubscribe(),
377 MAX_BLOCKS_PER_POLL,
378 self.subscription_counter.clone(),
379 );
380
381 Ok(Box::pin(past_proposed_blocks.chain(
383 broadcasted_blocks.flat_map(|items| {
384 debug_assert!(
385 items.len() <= MAX_BLOCKS_PER_POLL,
386 "Too many blocks received from broadcast"
387 );
388 stream::iter(items.into_iter().map(ExtendedSerializedBlock::from))
389 }),
390 )))
391 }
392
393 async fn handle_fetch_blocks(
409 &self,
410 _peer: AuthorityIndex,
411 block_refs: Vec<BlockRef>,
412 fetch_after_rounds: Vec<Round>,
413 fetch_missing_ancestors: bool,
414 ) -> ConsensusResult<Vec<Bytes>> {
415 fail_point_async!("consensus-rpc-response");
416
417 self.block_sync_service
419 .fetch_blocks(block_refs, fetch_after_rounds, fetch_missing_ancestors)
420 .await
421 }
422
423 async fn handle_fetch_commits(
424 &self,
425 _peer: AuthorityIndex,
426 commit_range: CommitRange,
427 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
428 fail_point_async!("consensus-rpc-response");
429
430 self.block_sync_service.fetch_commits(commit_range).await
432 }
433
434 async fn handle_fetch_latest_blocks(
435 &self,
436 peer: AuthorityIndex,
437 authorities: Vec<AuthorityIndex>,
438 ) -> ConsensusResult<Vec<Bytes>> {
439 fail_point_async!("consensus-rpc-response");
440
441 self.block_sync_service
443 .fetch_latest_blocks(peer, authorities)
444 .await
445 }
446
447 async fn handle_get_latest_rounds(
448 &self,
449 _peer: AuthorityIndex,
450 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
451 fail_point_async!("consensus-rpc-response");
452
453 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
454
455 let blocks = self
456 .dag_state
457 .read()
458 .get_last_cached_block_per_authority(Round::MAX);
459 let highest_accepted_rounds = blocks
460 .into_iter()
461 .map(|(block, _)| block.round())
462 .collect::<Vec<_>>();
463
464 Ok((highest_received_rounds, highest_accepted_rounds))
465 }
466}
467
468struct Counter {
469 count: usize,
470 subscriptions_by_peer: BTreeMap<PeerId, usize>,
471}
472
473pub(crate) struct SubscriptionCounter {
475 context: Arc<Context>,
476 counter: parking_lot::Mutex<Counter>,
477}
478
479impl SubscriptionCounter {
480 pub(crate) fn new(context: Arc<Context>) -> Self {
481 for (_, authority) in context.committee.authorities() {
483 context
484 .metrics
485 .node_metrics
486 .subscribed_by
487 .with_label_values(&[authority.hostname.as_str()])
488 .set(0);
489 }
490
491 Self {
492 counter: parking_lot::Mutex::new(Counter {
493 count: 0,
494 subscriptions_by_peer: BTreeMap::new(),
495 }),
496 context,
497 }
498 }
499
500 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
501 let mut counter = self.counter.lock();
502 counter.count += 1;
503 *counter
504 .subscriptions_by_peer
505 .entry(peer.clone())
506 .or_insert(0) += 1;
507
508 match peer {
509 PeerId::Validator(authority) => {
510 let peer_hostname = &self.context.committee.authority(*authority).hostname;
511 self.context
512 .metrics
513 .node_metrics
514 .subscribed_by
515 .with_label_values(&[peer_hostname])
516 .set(1);
517 }
518 PeerId::Observer(_) => {
519 self.context
520 .metrics
521 .node_metrics
522 .subscribed_by
523 .with_label_values(&["observer"])
524 .inc();
525 }
526 }
527
528 Ok(())
529 }
530
531 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
532 let mut counter = self.counter.lock();
533 counter.count -= 1;
534 *counter
535 .subscriptions_by_peer
536 .entry(peer.clone())
537 .or_insert(0) -= 1;
538
539 if counter.subscriptions_by_peer[peer] == 0 {
540 match peer {
541 PeerId::Validator(authority) => {
542 let peer_hostname = &self.context.committee.authority(*authority).hostname;
543 self.context
544 .metrics
545 .node_metrics
546 .subscribed_by
547 .with_label_values(&[peer_hostname])
548 .set(0);
549 }
550 PeerId::Observer(_) => {
551 self.context
552 .metrics
553 .node_metrics
554 .subscribed_by
555 .with_label_values(&["observer"])
556 .dec();
557 }
558 }
559 }
560
561 Ok(())
562 }
563}
564
565type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
568
569pub(crate) struct BroadcastStream<T> {
572 peer: Option<PeerId>,
573 inner: ReusableBoxFuture<
575 'static,
576 (
577 Result<T, broadcast::error::RecvError>,
578 broadcast::Receiver<T>,
579 ),
580 >,
581 max_items_per_poll: usize,
583 subscription_counter: Option<Arc<SubscriptionCounter>>,
585}
586
587impl<T: 'static + Clone + Send> BroadcastStream<T> {
588 pub fn new(
589 peer: PeerId,
590 rx: broadcast::Receiver<T>,
591 max_items_per_poll: usize,
592 subscription_counter: Arc<SubscriptionCounter>,
593 ) -> Self {
594 assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
595 if let Err(err) = subscription_counter.increment(&peer) {
596 match err {
597 ConsensusError::Shutdown => {}
598 _ => panic!("Unexpected error: {err}"),
599 }
600 }
601 Self {
602 peer: Some(peer),
603 inner: ReusableBoxFuture::new(make_recv_future(rx)),
604 max_items_per_poll,
605 subscription_counter: Some(subscription_counter),
606 }
607 }
608
609 pub fn new_untracked(rx: broadcast::Receiver<T>, max_items_per_poll: usize) -> Self {
611 assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
612 Self {
613 peer: None,
614 inner: ReusableBoxFuture::new(make_recv_future(rx)),
615 max_items_per_poll,
616 subscription_counter: None,
617 }
618 }
619}
620
621impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
622 type Item = Vec<T>;
623
624 fn poll_next(
625 mut self: Pin<&mut Self>,
626 cx: &mut task::Context<'_>,
627 ) -> task::Poll<Option<Self::Item>> {
628 loop {
629 let (result, mut rx) = ready!(self.inner.poll(cx));
630
631 match result {
632 Ok(item) => {
633 let mut items = Vec::new();
634 items.push(item);
635
636 while items.len() < self.max_items_per_poll {
638 match rx.try_recv() {
639 Ok(item) => items.push(item),
640 Err(broadcast::error::TryRecvError::Empty) => break,
641 Err(broadcast::error::TryRecvError::Closed) => break,
642 Err(broadcast::error::TryRecvError::Lagged(n)) => {
643 warn!("BroadcastStream {:?} lagged by {} messages", self.peer, n);
644 break;
645 }
646 }
647 }
648
649 self.inner.set(make_recv_future(rx));
650 return task::Poll::Ready(Some(items));
651 }
652 Err(broadcast::error::RecvError::Closed) => {
653 info!("BroadcastStream {:?} closed", self.peer);
654 return task::Poll::Ready(None);
655 }
656 Err(broadcast::error::RecvError::Lagged(n)) => {
657 warn!("BroadcastStream {:?} lagged by {} messages", self.peer, n);
658 self.inner.set(make_recv_future(rx));
660 continue;
661 }
662 }
663 }
664 }
665}
666
667impl<T> Drop for BroadcastStream<T> {
668 fn drop(&mut self) {
669 if let (Some(counter), Some(peer)) = (&self.subscription_counter, &self.peer)
670 && let Err(err) = counter.decrement(peer)
671 {
672 match err {
673 ConsensusError::Shutdown => {}
674 _ => panic!("Unexpected error: {err}"),
675 }
676 }
677 }
678}
679
680async fn make_recv_future<T: Clone>(
681 mut rx: broadcast::Receiver<T>,
682) -> (
683 Result<T, broadcast::error::RecvError>,
684 broadcast::Receiver<T>,
685) {
686 let result = rx.recv().await;
687 (result, rx)
688}
689
690#[cfg(test)]
691mod tests {
692 use std::{
693 collections::{BTreeMap, BTreeSet},
694 sync::Arc,
695 time::Duration,
696 };
697
698 use async_trait::async_trait;
699 use bytes::Bytes;
700 use consensus_config::AuthorityIndex;
701 use consensus_types::block::{BlockDigest, BlockRef, Round};
702 use parking_lot::{Mutex, RwLock};
703 use tokio::{sync::broadcast, time::sleep};
704
705 use futures::StreamExt as _;
706
707 use crate::{
708 authority_service::AuthorityService,
709 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
710 block_sync_service::BlockSyncService,
711 commit::{CertifiedCommits, CommitRange},
712 commit_vote_monitor::CommitVoteMonitor,
713 context::Context,
714 core_thread::{CoreError, CoreThreadDispatcher},
715 dag_state::DagState,
716 error::ConsensusResult,
717 network::{
718 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
719 ValidatorNetworkClient, ValidatorNetworkService,
720 },
721 peers_pool::PeersPool,
722 round_tracker::RoundTracker,
723 storage::mem_store::MemStore,
724 synchronizer::Synchronizer,
725 test_dag_builder::DagBuilder,
726 transaction_vote_tracker::TransactionVoteTracker,
727 };
728 struct FakeCoreThreadDispatcher {
729 blocks: Mutex<Vec<VerifiedBlock>>,
730 }
731
732 impl FakeCoreThreadDispatcher {
733 fn new() -> Self {
734 Self {
735 blocks: Mutex::new(vec![]),
736 }
737 }
738
739 fn get_blocks(&self) -> Vec<VerifiedBlock> {
740 self.blocks.lock().clone()
741 }
742 }
743
744 #[async_trait]
745 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
746 async fn add_blocks(
747 &self,
748 blocks: Vec<VerifiedBlock>,
749 ) -> Result<BTreeSet<BlockRef>, CoreError> {
750 let block_refs = blocks.iter().map(|b| b.reference()).collect();
751 self.blocks.lock().extend(blocks);
752 Ok(block_refs)
753 }
754
755 async fn check_block_refs(
756 &self,
757 _block_refs: Vec<BlockRef>,
758 ) -> Result<BTreeSet<BlockRef>, CoreError> {
759 Ok(BTreeSet::new())
760 }
761
762 async fn add_certified_commits(
763 &self,
764 _commits: CertifiedCommits,
765 ) -> Result<BTreeSet<BlockRef>, CoreError> {
766 todo!()
767 }
768
769 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
770 Ok(())
771 }
772
773 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
774 Ok(Default::default())
775 }
776
777 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
778 todo!()
779 }
780
781 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
782 todo!()
783 }
784 }
785
786 #[derive(Default)]
787 struct FakeNetworkClient {}
788
789 #[async_trait]
790 impl ValidatorNetworkClient for FakeNetworkClient {
791 async fn send_block(
792 &self,
793 _peer: AuthorityIndex,
794 _block: &VerifiedBlock,
795 _timeout: Duration,
796 ) -> ConsensusResult<()> {
797 unimplemented!("Unimplemented")
798 }
799
800 async fn subscribe_blocks(
801 &self,
802 _peer: AuthorityIndex,
803 _last_received: Round,
804 _timeout: Duration,
805 ) -> ConsensusResult<BlockStream> {
806 unimplemented!("Unimplemented")
807 }
808
809 async fn fetch_blocks(
810 &self,
811 _peer: AuthorityIndex,
812 _block_refs: Vec<BlockRef>,
813 _fetch_after_rounds: Vec<Round>,
814 _fetch_missing_ancestors: bool,
815 _timeout: Duration,
816 ) -> ConsensusResult<Vec<Bytes>> {
817 unimplemented!("Unimplemented")
818 }
819
820 async fn fetch_commits(
821 &self,
822 _peer: AuthorityIndex,
823 _commit_range: CommitRange,
824 _timeout: Duration,
825 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
826 unimplemented!("Unimplemented")
827 }
828
829 async fn fetch_latest_blocks(
830 &self,
831 _peer: AuthorityIndex,
832 _authorities: Vec<AuthorityIndex>,
833 _timeout: Duration,
834 ) -> ConsensusResult<Vec<Bytes>> {
835 unimplemented!("Unimplemented")
836 }
837
838 async fn get_latest_rounds(
839 &self,
840 _peer: AuthorityIndex,
841 _timeout: Duration,
842 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
843 unimplemented!("Unimplemented")
844 }
845 }
846
847 #[async_trait]
848 impl ObserverNetworkClient for FakeNetworkClient {
849 async fn stream_blocks(
850 &self,
851 _peer: crate::network::PeerId,
852 _highest_round_per_authority: Vec<u64>,
853 _timeout: Duration,
854 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
855 unimplemented!("Unimplemented")
856 }
857
858 async fn fetch_blocks(
859 &self,
860 _peer: crate::network::PeerId,
861 _block_refs: Vec<BlockRef>,
862 _fetch_after_rounds: Vec<Round>,
863 _fetch_missing_ancestors: bool,
864 _timeout: Duration,
865 ) -> ConsensusResult<Vec<Bytes>> {
866 unimplemented!("Unimplemented")
867 }
868
869 async fn fetch_commits(
870 &self,
871 _peer: crate::network::PeerId,
872 _commit_range: CommitRange,
873 _timeout: Duration,
874 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
875 unimplemented!("Unimplemented")
876 }
877 }
878
879 #[tokio::test(flavor = "current_thread", start_paused = true)]
880 async fn test_handle_send_block() {
881 let (context, _keys) = Context::new_for_test(4);
882 let context = Arc::new(context);
883 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
884 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
885 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
886 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
887 let fake_client = Arc::new(FakeNetworkClient::default());
888 let network_client = Arc::new(SynchronizerClient::new(
889 context.clone(),
890 Some(fake_client.clone()),
891 Some(fake_client.clone()),
892 ));
893 let store = Arc::new(MemStore::new());
894 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
895 let transaction_vote_tracker =
896 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
897 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
898 let peers_pool = Arc::new(PeersPool::new(context.clone()));
899 let synchronizer = Synchronizer::start(
900 network_client,
901 context.clone(),
902 core_dispatcher.clone(),
903 commit_vote_monitor.clone(),
904 block_verifier.clone(),
905 transaction_vote_tracker.clone(),
906 round_tracker.clone(),
907 dag_state.clone(),
908 peers_pool.clone(),
909 false,
910 );
911 let block_sync_service = Arc::new(BlockSyncService::new(
912 context.clone(),
913 dag_state.clone(),
914 store.clone(),
915 ));
916 let authority_service = Arc::new(AuthorityService::new(
917 context.clone(),
918 block_verifier,
919 commit_vote_monitor,
920 round_tracker,
921 synchronizer,
922 core_dispatcher.clone(),
923 rx_block_broadcast,
924 transaction_vote_tracker,
925 dag_state,
926 block_sync_service,
927 ));
928
929 let now = context.clock.timestamp_utc_ms();
931 let max_drift = context.parameters.max_forward_time_drift;
932 let input_block = VerifiedBlock::new_for_test(
933 TestBlock::new(9, 0)
934 .set_timestamp_ms(now + max_drift.as_millis() as u64)
935 .build(),
936 );
937
938 let service = authority_service.clone();
939 let serialized = ExtendedSerializedBlock {
940 block: input_block.serialized().clone(),
941 excluded_ancestors: vec![],
942 };
943
944 tokio::spawn({
945 let service = service.clone();
946 let context = context.clone();
947 async move {
948 service
949 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
950 .await
951 .unwrap();
952 }
953 });
954
955 sleep(max_drift / 2).await;
956
957 let blocks = core_dispatcher.get_blocks();
958 assert_eq!(blocks.len(), 1);
959 assert_eq!(blocks[0], input_block);
960
961 let invalid_block =
963 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
964 let extended_block = ExtendedSerializedBlock {
965 block: invalid_block.serialized().clone(),
966 excluded_ancestors: vec![],
967 };
968 service
969 .handle_send_block(
970 context.committee.to_authority_index(0).unwrap(),
971 extended_block,
972 )
973 .await
974 .unwrap_err();
975
976 let invalid_excluded_ancestors = vec![
978 bcs::to_bytes(&BlockRef::new(
979 10,
980 AuthorityIndex::new_for_test(1000),
981 BlockDigest::MIN,
982 ))
983 .unwrap(),
984 vec![3u8; 40],
985 bcs::to_bytes(&invalid_block.reference()).unwrap(),
986 ];
987 let extended_block = ExtendedSerializedBlock {
988 block: input_block.serialized().clone(),
989 excluded_ancestors: invalid_excluded_ancestors,
990 };
991 service
992 .handle_send_block(
993 context.committee.to_authority_index(0).unwrap(),
994 extended_block,
995 )
996 .await
997 .unwrap_err();
998 }
999
1000 #[tokio::test(flavor = "current_thread", start_paused = true)]
1001 async fn test_handle_fetch_blocks() {
1002 const NUM_AUTHORITIES: usize = 40;
1005 const NUM_ROUNDS: usize = 40;
1006 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1007 context.parameters.max_blocks_per_fetch = 50;
1008 let context = Arc::new(context);
1009 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1010 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1011 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1012 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1013 let fake_client = Arc::new(FakeNetworkClient::default());
1014 let network_client = Arc::new(SynchronizerClient::new(
1015 context.clone(),
1016 Some(fake_client.clone()),
1017 Some(fake_client.clone()),
1018 ));
1019 let store = Arc::new(MemStore::new());
1020 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1021 let transaction_vote_tracker =
1022 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1023 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1024 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1025 let synchronizer = Synchronizer::start(
1026 network_client,
1027 context.clone(),
1028 core_dispatcher.clone(),
1029 commit_vote_monitor.clone(),
1030 block_verifier.clone(),
1031 transaction_vote_tracker.clone(),
1032 round_tracker.clone(),
1033 dag_state.clone(),
1034 peers_pool.clone(),
1035 false,
1036 );
1037 let block_sync_service = Arc::new(BlockSyncService::new(
1038 context.clone(),
1039 dag_state.clone(),
1040 store.clone(),
1041 ));
1042 let authority_service = Arc::new(AuthorityService::new(
1043 context.clone(),
1044 block_verifier,
1045 commit_vote_monitor,
1046 round_tracker,
1047 synchronizer,
1048 core_dispatcher.clone(),
1049 rx_block_broadcast,
1050 transaction_vote_tracker,
1051 dag_state.clone(),
1052 block_sync_service,
1053 ));
1054
1055 let mut dag_builder = DagBuilder::new(context.clone());
1057 dag_builder
1058 .layers(1..=(NUM_ROUNDS as u32))
1059 .build()
1060 .persist_layers(dag_state.clone());
1061 dag_state.write().flush();
1062 let all_blocks = dag_builder.all_blocks();
1063
1064 let missing_block_refs: Vec<BlockRef> = all_blocks
1066 .iter()
1067 .rev()
1068 .take(2)
1069 .map(|b| b.reference())
1070 .collect();
1071 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1072 let results = authority_service
1073 .handle_fetch_blocks(
1074 AuthorityIndex::new_for_test(0),
1075 missing_block_refs.clone(),
1076 highest_accepted_rounds,
1077 true,
1078 )
1079 .await
1080 .unwrap();
1081
1082 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1084 .iter()
1085 .map(|b| {
1086 let signed = bcs::from_bytes(b).unwrap();
1087 let block = VerifiedBlock::new_verified(signed, b.clone());
1088 (block.reference(), block)
1089 })
1090 .collect();
1091 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1092 for b in &missing_block_refs {
1094 assert!(blocks.contains_key(b));
1095 }
1096 let num_missing_ancestors = blocks
1097 .keys()
1098 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1099 .count();
1100 assert_eq!(
1101 num_missing_ancestors,
1102 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1103 );
1104
1105 let missing_round = NUM_ROUNDS as Round - 3;
1107 let missing_block_refs: Vec<BlockRef> = all_blocks
1108 .iter()
1109 .filter(|b| b.reference().round == missing_round)
1110 .map(|b| b.reference())
1111 .take(2)
1112 .collect();
1113 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1114 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1116 let results = authority_service
1117 .handle_fetch_blocks(
1118 AuthorityIndex::new_for_test(0),
1119 missing_block_refs.clone(),
1120 highest_accepted_rounds,
1121 false,
1122 )
1123 .await
1124 .unwrap();
1125
1126 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1128 .iter()
1129 .map(|b| {
1130 let signed = bcs::from_bytes(b).unwrap();
1131 let block = VerifiedBlock::new_verified(signed, b.clone());
1132 (block.reference(), block)
1133 })
1134 .collect();
1135 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1136 for b in &missing_block_refs {
1138 assert!(blocks.contains_key(b));
1139 }
1140 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1142 for b in blocks.keys() {
1143 assert!(b.round <= missing_round);
1144 assert!(expected_authors.contains(&b.author));
1145 }
1146
1147 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1149 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1151 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1152 let results = authority_service
1153 .handle_fetch_blocks(
1154 AuthorityIndex::new_for_test(0),
1155 vec![],
1156 highest_accepted_rounds.clone(),
1157 false,
1158 )
1159 .await
1160 .unwrap();
1161
1162 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1164 .iter()
1165 .map(|b| {
1166 let signed = bcs::from_bytes(b).unwrap();
1167 let block = VerifiedBlock::new_verified(signed, b.clone());
1168 (block.reference(), block)
1169 })
1170 .collect();
1171 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1172 for block_ref in blocks.keys() {
1174 let accepted = highest_accepted_rounds[block_ref.author];
1175 assert!(block_ref.round > accepted);
1176 }
1177 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1180 assert!(
1183 max_round_in_result <= 4,
1184 "Expected low rounds from fair round-order fetching, got max round {}",
1185 max_round_in_result
1186 );
1187
1188 let missing_block_refs: Vec<BlockRef> = all_blocks
1190 .iter()
1191 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1192 .map(|b| b.reference())
1193 .take(5)
1194 .collect();
1195 let results = authority_service
1196 .handle_fetch_blocks(
1197 AuthorityIndex::new_for_test(0),
1198 missing_block_refs.clone(),
1199 vec![],
1200 false,
1201 )
1202 .await
1203 .unwrap();
1204
1205 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1207 .iter()
1208 .map(|b| {
1209 let signed = bcs::from_bytes(b).unwrap();
1210 let block = VerifiedBlock::new_verified(signed, b.clone());
1211 (block.reference(), block)
1212 })
1213 .collect();
1214 assert_eq!(blocks.len(), 5);
1215 for b in &missing_block_refs {
1216 assert!(blocks.contains_key(b));
1217 }
1218 }
1219
1220 #[tokio::test(flavor = "current_thread", start_paused = true)]
1221 async fn test_handle_fetch_latest_blocks() {
1222 let (context, _keys) = Context::new_for_test(4);
1224 let context = Arc::new(context);
1225 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1226 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1227 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1228 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1229 let fake_client = Arc::new(FakeNetworkClient::default());
1230 let network_client = Arc::new(SynchronizerClient::new(
1231 context.clone(),
1232 Some(fake_client.clone()),
1233 Some(fake_client.clone()),
1234 ));
1235 let store = Arc::new(MemStore::new());
1236 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1237 let transaction_vote_tracker =
1238 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1239 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1240 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1241 let synchronizer = Synchronizer::start(
1242 network_client,
1243 context.clone(),
1244 core_dispatcher.clone(),
1245 commit_vote_monitor.clone(),
1246 block_verifier.clone(),
1247 transaction_vote_tracker.clone(),
1248 round_tracker.clone(),
1249 dag_state.clone(),
1250 peers_pool.clone(),
1251 true,
1252 );
1253 let block_sync_service = Arc::new(BlockSyncService::new(
1254 context.clone(),
1255 dag_state.clone(),
1256 store.clone(),
1257 ));
1258 let authority_service = Arc::new(AuthorityService::new(
1259 context.clone(),
1260 block_verifier,
1261 commit_vote_monitor,
1262 round_tracker,
1263 synchronizer,
1264 core_dispatcher.clone(),
1265 rx_block_broadcast,
1266 transaction_vote_tracker,
1267 dag_state.clone(),
1268 block_sync_service,
1269 ));
1270
1271 let mut dag_builder = DagBuilder::new(context.clone());
1273 dag_builder
1274 .layers(1..=10)
1275 .authorities(vec![AuthorityIndex::new_for_test(2)])
1276 .equivocate(1)
1277 .build()
1278 .persist_layers(dag_state);
1279
1280 let authorities_to_request = vec![
1282 AuthorityIndex::new_for_test(1),
1283 AuthorityIndex::new_for_test(2),
1284 ];
1285 let results = authority_service
1286 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1287 .await;
1288
1289 let serialised_blocks = results.unwrap();
1291 for serialised_block in serialised_blocks {
1292 let signed_block: SignedBlock =
1293 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1294 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1295
1296 assert_eq!(verified_block.round(), 10);
1297 }
1298 }
1299
1300 #[tokio::test(flavor = "current_thread", start_paused = true)]
1301 async fn test_handle_subscribe_blocks() {
1302 let (context, _keys) = Context::new_for_test(4);
1303 let context = Arc::new(context);
1304 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1305 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1306 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1307 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1308 let fake_client = Arc::new(FakeNetworkClient::default());
1309 let network_client = Arc::new(SynchronizerClient::new(
1310 context.clone(),
1311 Some(fake_client.clone()),
1312 Some(fake_client.clone()),
1313 ));
1314 let store = Arc::new(MemStore::new());
1315 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1316 let transaction_vote_tracker =
1317 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1318 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1319 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1320 let synchronizer = Synchronizer::start(
1321 network_client,
1322 context.clone(),
1323 core_dispatcher.clone(),
1324 commit_vote_monitor.clone(),
1325 block_verifier.clone(),
1326 transaction_vote_tracker.clone(),
1327 round_tracker.clone(),
1328 dag_state.clone(),
1329 peers_pool.clone(),
1330 false,
1331 );
1332 let block_sync_service = Arc::new(BlockSyncService::new(
1333 context.clone(),
1334 dag_state.clone(),
1335 store.clone(),
1336 ));
1337
1338 dag_state
1340 .write()
1341 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1342 dag_state
1343 .write()
1344 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1345 dag_state
1346 .write()
1347 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1348
1349 let authority_service = Arc::new(AuthorityService::new(
1350 context.clone(),
1351 block_verifier,
1352 commit_vote_monitor,
1353 round_tracker,
1354 synchronizer,
1355 core_dispatcher.clone(),
1356 rx_block_broadcast,
1357 transaction_vote_tracker,
1358 dag_state.clone(),
1359 block_sync_service,
1360 ));
1361
1362 let peer = context.committee.to_authority_index(1).unwrap();
1363
1364 {
1367 let mut stream = authority_service
1368 .handle_subscribe_blocks(peer, 100)
1369 .await
1370 .unwrap();
1371 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1372 assert_eq!(
1373 block.round(),
1374 15,
1375 "Should return last proposed block as fallback"
1376 );
1377 assert_eq!(block.author().value(), 0);
1378 }
1379
1380 {
1383 let mut stream = authority_service
1384 .handle_subscribe_blocks(peer, 7)
1385 .await
1386 .unwrap();
1387
1388 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1389 assert_eq!(block1.round(), 10, "Should return block at round 10");
1390
1391 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1392 assert_eq!(block2.round(), 15, "Should return block at round 15");
1393 }
1394 }
1395
1396 #[tokio::test(flavor = "current_thread", start_paused = true)]
1397 async fn test_handle_subscribe_blocks_not_proposed() {
1398 let (context, _keys) = Context::new_for_test(4);
1399 let context = Arc::new(context);
1400 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1401 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1402 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1403 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1404 let fake_client = Arc::new(FakeNetworkClient::default());
1405 let network_client = Arc::new(SynchronizerClient::new(
1406 context.clone(),
1407 Some(fake_client.clone()),
1408 Some(fake_client.clone()),
1409 ));
1410 let store = Arc::new(MemStore::new());
1411 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1412 let transaction_vote_tracker =
1413 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1414 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1415 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1416 let synchronizer = Synchronizer::start(
1417 network_client,
1418 context.clone(),
1419 core_dispatcher.clone(),
1420 commit_vote_monitor.clone(),
1421 block_verifier.clone(),
1422 transaction_vote_tracker.clone(),
1423 round_tracker.clone(),
1424 dag_state.clone(),
1425 peers_pool.clone(),
1426 false,
1427 );
1428 let block_sync_service = Arc::new(BlockSyncService::new(
1429 context.clone(),
1430 dag_state.clone(),
1431 store.clone(),
1432 ));
1433
1434 let authority_service = Arc::new(AuthorityService::new(
1437 context.clone(),
1438 block_verifier,
1439 commit_vote_monitor,
1440 round_tracker,
1441 synchronizer,
1442 core_dispatcher.clone(),
1443 rx_block_broadcast,
1444 transaction_vote_tracker,
1445 dag_state.clone(),
1446 block_sync_service,
1447 ));
1448
1449 let peer = context.committee.to_authority_index(1).unwrap();
1450
1451 let mut stream = authority_service
1453 .handle_subscribe_blocks(peer, 0)
1454 .await
1455 .unwrap();
1456
1457 use futures::poll;
1459 use std::task::Poll;
1460 let poll_result = poll!(stream.next());
1461 assert!(
1462 matches!(poll_result, Poll::Pending),
1463 "Should not receive genesis block on subscription stream"
1464 );
1465 }
1466}