1use std::{collections::BTreeMap, pin::Pin, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use consensus_config::AuthorityIndex;
9use consensus_types::block::{BlockRef, Round};
10use futures::{Stream, StreamExt, ready, stream, task};
11use mysten_metrics::spawn_monitored_task;
12use parking_lot::RwLock;
13use sui_macros::fail_point_async;
14use tap::TapFallible;
15use tokio::sync::broadcast;
16use tokio_util::sync::ReusableBoxFuture;
17use tracing::{debug, info, warn};
18
19use crate::{
20 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
21 block_sync_service::BlockSyncService,
22 block_verifier::BlockVerifier,
23 commit::{CommitRange, TrustedCommit},
24 commit_vote_monitor::CommitVoteMonitor,
25 context::Context,
26 core_thread::CoreThreadDispatcher,
27 dag_state::DagState,
28 error::{ConsensusError, ConsensusResult},
29 network::{BlockStream, ExtendedSerializedBlock, PeerId, ValidatorNetworkService},
30 round_tracker::RoundTracker,
31 synchronizer::SynchronizerHandle,
32 transaction_vote_tracker::TransactionVoteTracker,
33};
34
35pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
36
37pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
39 context: Arc<Context>,
40 commit_vote_monitor: Arc<CommitVoteMonitor>,
41 block_verifier: Arc<dyn BlockVerifier>,
42 synchronizer: Arc<SynchronizerHandle>,
43 core_dispatcher: Arc<C>,
44 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
45 subscription_counter: Arc<SubscriptionCounter>,
46 transaction_vote_tracker: TransactionVoteTracker,
47 dag_state: Arc<RwLock<DagState>>,
48 round_tracker: Arc<RwLock<RoundTracker>>,
49 block_sync_service: Arc<BlockSyncService>,
50}
51
52impl<C: CoreThreadDispatcher> AuthorityService<C> {
53 pub(crate) fn new(
54 context: Arc<Context>,
55 block_verifier: Arc<dyn BlockVerifier>,
56 commit_vote_monitor: Arc<CommitVoteMonitor>,
57 round_tracker: Arc<RwLock<RoundTracker>>,
58 synchronizer: Arc<SynchronizerHandle>,
59 core_dispatcher: Arc<C>,
60 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
61 transaction_vote_tracker: TransactionVoteTracker,
62 dag_state: Arc<RwLock<DagState>>,
63 block_sync_service: Arc<BlockSyncService>,
64 ) -> Self {
65 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
66 Self {
67 context,
68 block_verifier,
69 commit_vote_monitor,
70 synchronizer,
71 core_dispatcher,
72 rx_block_broadcast,
73 subscription_counter,
74 transaction_vote_tracker,
75 dag_state,
76 round_tracker,
77 block_sync_service,
78 }
79 }
80
81 fn parse_excluded_ancestors(
83 &self,
84 peer: AuthorityIndex,
85 block: &VerifiedBlock,
86 mut excluded_ancestors: Vec<Vec<u8>>,
87 ) -> ConsensusResult<Vec<BlockRef>> {
88 let peer_hostname = &self.context.committee.authority(peer).hostname;
89
90 let excluded_ancestors_limit = self.context.committee.size() * 2;
91 if excluded_ancestors.len() > excluded_ancestors_limit {
92 debug!(
93 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
94 excluded_ancestors.len() - excluded_ancestors_limit,
95 peer,
96 peer_hostname,
97 );
98 excluded_ancestors.truncate(excluded_ancestors_limit);
99 }
100
101 let excluded_ancestors = excluded_ancestors
102 .into_iter()
103 .map(|serialized| {
104 let block_ref: BlockRef =
105 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
106 if !self.context.committee.is_valid_index(block_ref.author) {
107 return Err(ConsensusError::InvalidAuthorityIndex {
108 index: block_ref.author,
109 max: self.context.committee.size(),
110 });
111 }
112 if block_ref.round >= block.round() {
113 return Err(ConsensusError::InvalidAncestorRound {
114 ancestor: block_ref.round,
115 block: block.round(),
116 });
117 }
118 Ok(block_ref)
119 })
120 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
121
122 for excluded_ancestor in &excluded_ancestors {
123 let excluded_ancestor_hostname = &self
124 .context
125 .committee
126 .authority(excluded_ancestor.author)
127 .hostname;
128 self.context
129 .metrics
130 .node_metrics
131 .network_excluded_ancestors_count_by_authority
132 .with_label_values(&[excluded_ancestor_hostname])
133 .inc();
134 }
135 self.context
136 .metrics
137 .node_metrics
138 .network_received_excluded_ancestors_from_authority
139 .with_label_values(&[peer_hostname])
140 .inc_by(excluded_ancestors.len() as u64);
141
142 Ok(excluded_ancestors)
143 }
144}
145
146#[async_trait]
147impl<C: CoreThreadDispatcher> ValidatorNetworkService for AuthorityService<C> {
148 async fn handle_send_block(
149 &self,
150 peer: AuthorityIndex,
151 serialized_block: ExtendedSerializedBlock,
152 ) -> ConsensusResult<()> {
153 fail_point_async!("consensus-rpc-response");
154
155 let peer_hostname = &self.context.committee.authority(peer).hostname;
156
157 let signed_block: SignedBlock =
159 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
160
161 if peer != signed_block.author() {
163 self.context
164 .metrics
165 .node_metrics
166 .invalid_blocks
167 .with_label_values(&[
168 peer_hostname.as_str(),
169 "handle_send_block",
170 "UnexpectedAuthority",
171 ])
172 .inc();
173 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
174 info!("Block with wrong authority from {}: {}", peer, e);
175 return Err(e);
176 }
177
178 let (verified_block, reject_txn_votes) = self
180 .block_verifier
181 .verify_and_vote(signed_block, serialized_block.block)
182 .tap_err(|e| {
183 self.context
184 .metrics
185 .node_metrics
186 .invalid_blocks
187 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
188 .inc();
189 info!("Invalid block from {}: {}", peer, e);
190 })?;
191 let excluded_ancestors = self
192 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
193 .tap_err(|e| {
194 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
195 self.context
196 .metrics
197 .node_metrics
198 .invalid_blocks
199 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
200 .inc();
201 })?;
202
203 let block_ref = verified_block.reference();
204 debug!("Received block {} via send block.", block_ref);
205
206 self.context
207 .metrics
208 .node_metrics
209 .verified_blocks
210 .with_label_values(&[peer_hostname])
211 .inc();
212
213 let now = self.context.clock.timestamp_utc_ms();
214 let forward_time_drift =
215 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
216
217 self.context
218 .metrics
219 .node_metrics
220 .block_timestamp_drift_ms
221 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
222 .inc_by(forward_time_drift.as_millis() as u64);
223
224 self.commit_vote_monitor.observe_block(&verified_block);
227
228 self.round_tracker
230 .write()
231 .update_from_verified_block(&ExtendedBlock {
232 block: verified_block.clone(),
233 excluded_ancestors: excluded_ancestors.clone(),
234 });
235
236 let last_commit_index = self.dag_state.read().last_commit_index();
245 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
246 if last_commit_index
249 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
250 < quorum_commit_index
251 {
252 self.context
253 .metrics
254 .node_metrics
255 .rejected_blocks
256 .with_label_values(&["commit_lagging"])
257 .inc();
258 debug!(
259 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
260 block_ref, last_commit_index, quorum_commit_index,
261 );
262 return Err(ConsensusError::BlockRejected {
263 block_ref,
264 reason: format!(
265 "Last commit index is lagging quorum commit index too much ({} < {})",
266 last_commit_index, quorum_commit_index,
267 ),
268 });
269 }
270
271 if self.context.protocol_config.transaction_voting_enabled() {
274 self.transaction_vote_tracker
275 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
276 }
277
278 let missing_ancestors = self
280 .core_dispatcher
281 .add_blocks(vec![verified_block.clone()])
282 .await
283 .map_err(|_| ConsensusError::Shutdown)?;
284
285 if !missing_ancestors.is_empty() {
287 self.context
288 .metrics
289 .node_metrics
290 .handler_received_block_missing_ancestors
291 .with_label_values(&[peer_hostname])
292 .inc_by(missing_ancestors.len() as u64);
293 let synchronizer = self.synchronizer.clone();
294 spawn_monitored_task!(async move {
295 if let Err(err) = synchronizer
300 .fetch_blocks(missing_ancestors, PeerId::Validator(peer))
301 .await
302 {
303 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
304 }
305 });
306 }
307
308 let missing_excluded_ancestors = self
310 .core_dispatcher
311 .check_block_refs(excluded_ancestors)
312 .await
313 .map_err(|_| ConsensusError::Shutdown)?;
314 if !missing_excluded_ancestors.is_empty() {
315 self.context
316 .metrics
317 .node_metrics
318 .network_excluded_ancestors_sent_to_fetch
319 .with_label_values(&[peer_hostname])
320 .inc_by(missing_excluded_ancestors.len() as u64);
321
322 let synchronizer = self.synchronizer.clone();
323 spawn_monitored_task!(async move {
324 if let Err(err) = synchronizer
325 .fetch_blocks(missing_excluded_ancestors, PeerId::Validator(peer))
326 .await
327 {
328 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
329 }
330 });
331 }
332
333 Ok(())
334 }
335
336 async fn handle_subscribe_blocks(
337 &self,
338 peer: AuthorityIndex,
339 last_received: Round,
340 ) -> ConsensusResult<BlockStream> {
341 fail_point_async!("consensus-rpc-response");
342
343 let past_proposed_blocks = {
352 let dag_state = self.dag_state.read();
353
354 let mut proposed_blocks =
355 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
356 if proposed_blocks.is_empty() {
357 let last_proposed_block = dag_state
358 .get_last_proposed_block()
359 .expect("Last proposed block should be returned on validators");
360 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
361 vec![last_proposed_block]
362 } else {
363 vec![]
364 };
365 }
366 stream::iter(
367 proposed_blocks
368 .into_iter()
369 .map(|block| ExtendedSerializedBlock {
370 block: block.serialized().clone(),
371 excluded_ancestors: vec![],
372 }),
373 )
374 };
375
376 const MAX_BLOCKS_PER_POLL: usize = 1;
377 let broadcasted_blocks = BroadcastedBlockStream::new(
378 PeerId::Validator(peer),
379 self.rx_block_broadcast.resubscribe(),
380 MAX_BLOCKS_PER_POLL,
381 self.subscription_counter.clone(),
382 );
383
384 Ok(Box::pin(past_proposed_blocks.chain(
386 broadcasted_blocks.flat_map(|items| {
387 debug_assert!(
388 items.len() <= MAX_BLOCKS_PER_POLL,
389 "Too many blocks received from broadcast"
390 );
391 stream::iter(items.into_iter().map(ExtendedSerializedBlock::from))
392 }),
393 )))
394 }
395
396 async fn handle_fetch_blocks(
412 &self,
413 _peer: AuthorityIndex,
414 block_refs: Vec<BlockRef>,
415 fetch_after_rounds: Vec<Round>,
416 fetch_missing_ancestors: bool,
417 ) -> ConsensusResult<Vec<Bytes>> {
418 fail_point_async!("consensus-rpc-response");
419
420 self.block_sync_service
422 .fetch_blocks(block_refs, fetch_after_rounds, fetch_missing_ancestors)
423 .await
424 }
425
426 async fn handle_fetch_commits(
427 &self,
428 _peer: AuthorityIndex,
429 commit_range: CommitRange,
430 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
431 fail_point_async!("consensus-rpc-response");
432
433 self.block_sync_service.fetch_commits(commit_range).await
435 }
436
437 async fn handle_fetch_latest_blocks(
438 &self,
439 peer: AuthorityIndex,
440 authorities: Vec<AuthorityIndex>,
441 ) -> ConsensusResult<Vec<Bytes>> {
442 fail_point_async!("consensus-rpc-response");
443
444 self.block_sync_service
446 .fetch_latest_blocks(peer, authorities)
447 .await
448 }
449
450 async fn handle_get_latest_rounds(
451 &self,
452 _peer: AuthorityIndex,
453 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
454 fail_point_async!("consensus-rpc-response");
455
456 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
457
458 let blocks = self
459 .dag_state
460 .read()
461 .get_last_cached_block_per_authority(Round::MAX);
462 let highest_accepted_rounds = blocks
463 .into_iter()
464 .map(|(block, _)| block.round())
465 .collect::<Vec<_>>();
466
467 Ok((highest_received_rounds, highest_accepted_rounds))
468 }
469}
470
471struct Counter {
472 count: usize,
473 subscriptions_by_peer: BTreeMap<PeerId, usize>,
474}
475
476pub(crate) struct SubscriptionCounter {
478 context: Arc<Context>,
479 counter: parking_lot::Mutex<Counter>,
480}
481
482impl SubscriptionCounter {
483 pub(crate) fn new(context: Arc<Context>) -> Self {
484 for (_, authority) in context.committee.authorities() {
486 context
487 .metrics
488 .node_metrics
489 .subscribed_by
490 .with_label_values(&[authority.hostname.as_str()])
491 .set(0);
492 }
493
494 Self {
495 counter: parking_lot::Mutex::new(Counter {
496 count: 0,
497 subscriptions_by_peer: BTreeMap::new(),
498 }),
499 context,
500 }
501 }
502
503 fn increment(&self, peer: &PeerId) -> Result<(), ConsensusError> {
504 let mut counter = self.counter.lock();
505 counter.count += 1;
506 *counter
507 .subscriptions_by_peer
508 .entry(peer.clone())
509 .or_insert(0) += 1;
510
511 match peer {
512 PeerId::Validator(authority) => {
513 let peer_hostname = &self.context.committee.authority(*authority).hostname;
514 self.context
515 .metrics
516 .node_metrics
517 .subscribed_by
518 .with_label_values(&[peer_hostname])
519 .set(1);
520 }
521 PeerId::Observer(_) => {
522 self.context
523 .metrics
524 .node_metrics
525 .subscribed_by
526 .with_label_values(&["observer"])
527 .inc();
528 }
529 }
530
531 Ok(())
532 }
533
534 fn decrement(&self, peer: &PeerId) -> Result<(), ConsensusError> {
535 let mut counter = self.counter.lock();
536 counter.count -= 1;
537 *counter
538 .subscriptions_by_peer
539 .entry(peer.clone())
540 .or_insert(0) -= 1;
541
542 if counter.subscriptions_by_peer[peer] == 0 {
543 match peer {
544 PeerId::Validator(authority) => {
545 let peer_hostname = &self.context.committee.authority(*authority).hostname;
546 self.context
547 .metrics
548 .node_metrics
549 .subscribed_by
550 .with_label_values(&[peer_hostname])
551 .set(0);
552 }
553 PeerId::Observer(_) => {
554 self.context
555 .metrics
556 .node_metrics
557 .subscribed_by
558 .with_label_values(&["observer"])
559 .dec();
560 }
561 }
562 }
563
564 Ok(())
565 }
566}
567
568type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
571
572pub(crate) struct BroadcastStream<T> {
575 peer: PeerId,
576 inner: ReusableBoxFuture<
578 'static,
579 (
580 Result<T, broadcast::error::RecvError>,
581 broadcast::Receiver<T>,
582 ),
583 >,
584 max_items_per_poll: usize,
586 subscription_counter: Arc<SubscriptionCounter>,
588}
589
590impl<T: 'static + Clone + Send> BroadcastStream<T> {
591 pub fn new(
592 peer: PeerId,
593 rx: broadcast::Receiver<T>,
594 max_items_per_poll: usize,
595 subscription_counter: Arc<SubscriptionCounter>,
596 ) -> Self {
597 assert!(max_items_per_poll > 0, "max_items_per_poll must be > 0");
598 if let Err(err) = subscription_counter.increment(&peer) {
599 match err {
600 ConsensusError::Shutdown => {}
601 _ => panic!("Unexpected error: {err}"),
602 }
603 }
604 Self {
605 peer,
606 inner: ReusableBoxFuture::new(make_recv_future(rx)),
607 max_items_per_poll,
608 subscription_counter,
609 }
610 }
611}
612
613impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
614 type Item = Vec<T>;
615
616 fn poll_next(
617 mut self: Pin<&mut Self>,
618 cx: &mut task::Context<'_>,
619 ) -> task::Poll<Option<Self::Item>> {
620 loop {
621 let (result, mut rx) = ready!(self.inner.poll(cx));
622
623 match result {
624 Ok(item) => {
625 let mut items = Vec::new();
626 items.push(item);
627
628 while items.len() < self.max_items_per_poll {
630 match rx.try_recv() {
631 Ok(item) => items.push(item),
632 Err(broadcast::error::TryRecvError::Empty) => break,
633 Err(broadcast::error::TryRecvError::Closed) => break,
634 Err(broadcast::error::TryRecvError::Lagged(n)) => {
635 warn!(
636 "Block BroadcastedBlockStream {} lagged by {} messages",
637 self.peer, n
638 );
639 break;
640 }
641 }
642 }
643
644 self.inner.set(make_recv_future(rx));
645 return task::Poll::Ready(Some(items));
646 }
647 Err(broadcast::error::RecvError::Closed) => {
648 info!("Block BroadcastedBlockStream {} closed", self.peer);
649 return task::Poll::Ready(None);
650 }
651 Err(broadcast::error::RecvError::Lagged(n)) => {
652 warn!(
653 "Block BroadcastedBlockStream {} lagged by {} messages",
654 self.peer, n
655 );
656 self.inner.set(make_recv_future(rx));
658 continue;
659 }
660 }
661 }
662 }
663}
664
665impl<T> Drop for BroadcastStream<T> {
666 fn drop(&mut self) {
667 if let Err(err) = self.subscription_counter.decrement(&self.peer) {
668 match err {
669 ConsensusError::Shutdown => {}
670 _ => panic!("Unexpected error: {err}"),
671 }
672 }
673 }
674}
675
676async fn make_recv_future<T: Clone>(
677 mut rx: broadcast::Receiver<T>,
678) -> (
679 Result<T, broadcast::error::RecvError>,
680 broadcast::Receiver<T>,
681) {
682 let result = rx.recv().await;
683 (result, rx)
684}
685
686#[cfg(test)]
689mod tests {
690 use std::{
691 collections::{BTreeMap, BTreeSet},
692 sync::Arc,
693 time::Duration,
694 };
695
696 use async_trait::async_trait;
697 use bytes::Bytes;
698 use consensus_config::AuthorityIndex;
699 use consensus_types::block::{BlockDigest, BlockRef, Round};
700 use parking_lot::{Mutex, RwLock};
701 use tokio::{sync::broadcast, time::sleep};
702
703 use futures::StreamExt as _;
704
705 use crate::{
706 authority_service::AuthorityService,
707 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
708 block_sync_service::BlockSyncService,
709 commit::{CertifiedCommits, CommitRange},
710 commit_vote_monitor::CommitVoteMonitor,
711 context::Context,
712 core_thread::{CoreError, CoreThreadDispatcher},
713 dag_state::DagState,
714 error::ConsensusResult,
715 network::{
716 BlockStream, ExtendedSerializedBlock, ObserverNetworkClient, SynchronizerClient,
717 ValidatorNetworkClient, ValidatorNetworkService,
718 },
719 peers_pool::PeersPool,
720 round_tracker::RoundTracker,
721 storage::mem_store::MemStore,
722 synchronizer::Synchronizer,
723 test_dag_builder::DagBuilder,
724 transaction_vote_tracker::TransactionVoteTracker,
725 };
726 struct FakeCoreThreadDispatcher {
727 blocks: Mutex<Vec<VerifiedBlock>>,
728 }
729
730 impl FakeCoreThreadDispatcher {
731 fn new() -> Self {
732 Self {
733 blocks: Mutex::new(vec![]),
734 }
735 }
736
737 fn get_blocks(&self) -> Vec<VerifiedBlock> {
738 self.blocks.lock().clone()
739 }
740 }
741
742 #[async_trait]
743 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
744 async fn add_blocks(
745 &self,
746 blocks: Vec<VerifiedBlock>,
747 ) -> Result<BTreeSet<BlockRef>, CoreError> {
748 let block_refs = blocks.iter().map(|b| b.reference()).collect();
749 self.blocks.lock().extend(blocks);
750 Ok(block_refs)
751 }
752
753 async fn check_block_refs(
754 &self,
755 _block_refs: Vec<BlockRef>,
756 ) -> Result<BTreeSet<BlockRef>, CoreError> {
757 Ok(BTreeSet::new())
758 }
759
760 async fn add_certified_commits(
761 &self,
762 _commits: CertifiedCommits,
763 ) -> Result<BTreeSet<BlockRef>, CoreError> {
764 todo!()
765 }
766
767 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
768 Ok(())
769 }
770
771 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
772 Ok(Default::default())
773 }
774
775 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
776 todo!()
777 }
778
779 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
780 todo!()
781 }
782 }
783
784 #[derive(Default)]
785 struct FakeNetworkClient {}
786
787 #[async_trait]
788 impl ValidatorNetworkClient for FakeNetworkClient {
789 async fn send_block(
790 &self,
791 _peer: AuthorityIndex,
792 _block: &VerifiedBlock,
793 _timeout: Duration,
794 ) -> ConsensusResult<()> {
795 unimplemented!("Unimplemented")
796 }
797
798 async fn subscribe_blocks(
799 &self,
800 _peer: AuthorityIndex,
801 _last_received: Round,
802 _timeout: Duration,
803 ) -> ConsensusResult<BlockStream> {
804 unimplemented!("Unimplemented")
805 }
806
807 async fn fetch_blocks(
808 &self,
809 _peer: AuthorityIndex,
810 _block_refs: Vec<BlockRef>,
811 _fetch_after_rounds: Vec<Round>,
812 _fetch_missing_ancestors: bool,
813 _timeout: Duration,
814 ) -> ConsensusResult<Vec<Bytes>> {
815 unimplemented!("Unimplemented")
816 }
817
818 async fn fetch_commits(
819 &self,
820 _peer: AuthorityIndex,
821 _commit_range: CommitRange,
822 _timeout: Duration,
823 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
824 unimplemented!("Unimplemented")
825 }
826
827 async fn fetch_latest_blocks(
828 &self,
829 _peer: AuthorityIndex,
830 _authorities: Vec<AuthorityIndex>,
831 _timeout: Duration,
832 ) -> ConsensusResult<Vec<Bytes>> {
833 unimplemented!("Unimplemented")
834 }
835
836 async fn get_latest_rounds(
837 &self,
838 _peer: AuthorityIndex,
839 _timeout: Duration,
840 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
841 unimplemented!("Unimplemented")
842 }
843 }
844
845 #[async_trait]
846 impl ObserverNetworkClient for FakeNetworkClient {
847 async fn stream_blocks(
848 &self,
849 _peer: crate::network::PeerId,
850 _highest_round_per_authority: Vec<u64>,
851 _timeout: Duration,
852 ) -> ConsensusResult<crate::network::ObserverBlockStream> {
853 unimplemented!("Unimplemented")
854 }
855
856 async fn fetch_blocks(
857 &self,
858 _peer: crate::network::PeerId,
859 _block_refs: Vec<BlockRef>,
860 _fetch_after_rounds: Vec<Round>,
861 _fetch_missing_ancestors: bool,
862 _timeout: Duration,
863 ) -> ConsensusResult<Vec<Bytes>> {
864 unimplemented!("Unimplemented")
865 }
866
867 async fn fetch_commits(
868 &self,
869 _peer: crate::network::PeerId,
870 _commit_range: CommitRange,
871 _timeout: Duration,
872 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
873 unimplemented!("Unimplemented")
874 }
875 }
876
877 #[tokio::test(flavor = "current_thread", start_paused = true)]
878 async fn test_handle_send_block() {
879 let (context, _keys) = Context::new_for_test(4);
880 let context = Arc::new(context);
881 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
882 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
883 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
884 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
885 let fake_client = Arc::new(FakeNetworkClient::default());
886 let network_client = Arc::new(SynchronizerClient::new(
887 context.clone(),
888 Some(fake_client.clone()),
889 Some(fake_client.clone()),
890 ));
891 let store = Arc::new(MemStore::new());
892 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
893 let transaction_vote_tracker =
894 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
895 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
896 let peers_pool = Arc::new(PeersPool::new(context.clone()));
897 let synchronizer = Synchronizer::start(
898 network_client,
899 context.clone(),
900 core_dispatcher.clone(),
901 commit_vote_monitor.clone(),
902 block_verifier.clone(),
903 transaction_vote_tracker.clone(),
904 round_tracker.clone(),
905 dag_state.clone(),
906 peers_pool.clone(),
907 false,
908 );
909 let block_sync_service = Arc::new(BlockSyncService::new(
910 context.clone(),
911 dag_state.clone(),
912 store.clone(),
913 ));
914 let authority_service = Arc::new(AuthorityService::new(
915 context.clone(),
916 block_verifier,
917 commit_vote_monitor,
918 round_tracker,
919 synchronizer,
920 core_dispatcher.clone(),
921 rx_block_broadcast,
922 transaction_vote_tracker,
923 dag_state,
924 block_sync_service,
925 ));
926
927 let now = context.clock.timestamp_utc_ms();
929 let max_drift = context.parameters.max_forward_time_drift;
930 let input_block = VerifiedBlock::new_for_test(
931 TestBlock::new(9, 0)
932 .set_timestamp_ms(now + max_drift.as_millis() as u64)
933 .build(),
934 );
935
936 let service = authority_service.clone();
937 let serialized = ExtendedSerializedBlock {
938 block: input_block.serialized().clone(),
939 excluded_ancestors: vec![],
940 };
941
942 tokio::spawn({
943 let service = service.clone();
944 let context = context.clone();
945 async move {
946 service
947 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
948 .await
949 .unwrap();
950 }
951 });
952
953 sleep(max_drift / 2).await;
954
955 let blocks = core_dispatcher.get_blocks();
956 assert_eq!(blocks.len(), 1);
957 assert_eq!(blocks[0], input_block);
958
959 let invalid_block =
961 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
962 let extended_block = ExtendedSerializedBlock {
963 block: invalid_block.serialized().clone(),
964 excluded_ancestors: vec![],
965 };
966 service
967 .handle_send_block(
968 context.committee.to_authority_index(0).unwrap(),
969 extended_block,
970 )
971 .await
972 .unwrap_err();
973
974 let invalid_excluded_ancestors = vec![
976 bcs::to_bytes(&BlockRef::new(
977 10,
978 AuthorityIndex::new_for_test(1000),
979 BlockDigest::MIN,
980 ))
981 .unwrap(),
982 vec![3u8; 40],
983 bcs::to_bytes(&invalid_block.reference()).unwrap(),
984 ];
985 let extended_block = ExtendedSerializedBlock {
986 block: input_block.serialized().clone(),
987 excluded_ancestors: invalid_excluded_ancestors,
988 };
989 service
990 .handle_send_block(
991 context.committee.to_authority_index(0).unwrap(),
992 extended_block,
993 )
994 .await
995 .unwrap_err();
996 }
997
998 #[tokio::test(flavor = "current_thread", start_paused = true)]
999 async fn test_handle_fetch_blocks() {
1000 const NUM_AUTHORITIES: usize = 40;
1003 const NUM_ROUNDS: usize = 40;
1004 let (mut context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1005 context.parameters.max_blocks_per_fetch = 50;
1006 let context = Arc::new(context);
1007 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1008 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1009 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1010 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1011 let fake_client = Arc::new(FakeNetworkClient::default());
1012 let network_client = Arc::new(SynchronizerClient::new(
1013 context.clone(),
1014 Some(fake_client.clone()),
1015 Some(fake_client.clone()),
1016 ));
1017 let store = Arc::new(MemStore::new());
1018 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1019 let transaction_vote_tracker =
1020 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1021 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1022 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1023 let synchronizer = Synchronizer::start(
1024 network_client,
1025 context.clone(),
1026 core_dispatcher.clone(),
1027 commit_vote_monitor.clone(),
1028 block_verifier.clone(),
1029 transaction_vote_tracker.clone(),
1030 round_tracker.clone(),
1031 dag_state.clone(),
1032 peers_pool.clone(),
1033 false,
1034 );
1035 let block_sync_service = Arc::new(BlockSyncService::new(
1036 context.clone(),
1037 dag_state.clone(),
1038 store.clone(),
1039 ));
1040 let authority_service = Arc::new(AuthorityService::new(
1041 context.clone(),
1042 block_verifier,
1043 commit_vote_monitor,
1044 round_tracker,
1045 synchronizer,
1046 core_dispatcher.clone(),
1047 rx_block_broadcast,
1048 transaction_vote_tracker,
1049 dag_state.clone(),
1050 block_sync_service,
1051 ));
1052
1053 let mut dag_builder = DagBuilder::new(context.clone());
1055 dag_builder
1056 .layers(1..=(NUM_ROUNDS as u32))
1057 .build()
1058 .persist_layers(dag_state.clone());
1059 dag_state.write().flush();
1060 let all_blocks = dag_builder.all_blocks();
1061
1062 let missing_block_refs: Vec<BlockRef> = all_blocks
1064 .iter()
1065 .rev()
1066 .take(2)
1067 .map(|b| b.reference())
1068 .collect();
1069 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1070 let results = authority_service
1071 .handle_fetch_blocks(
1072 AuthorityIndex::new_for_test(0),
1073 missing_block_refs.clone(),
1074 highest_accepted_rounds,
1075 true,
1076 )
1077 .await
1078 .unwrap();
1079
1080 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1082 .iter()
1083 .map(|b| {
1084 let signed = bcs::from_bytes(b).unwrap();
1085 let block = VerifiedBlock::new_verified(signed, b.clone());
1086 (block.reference(), block)
1087 })
1088 .collect();
1089 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1090 for b in &missing_block_refs {
1092 assert!(blocks.contains_key(b));
1093 }
1094 let num_missing_ancestors = blocks
1095 .keys()
1096 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1097 .count();
1098 assert_eq!(
1099 num_missing_ancestors,
1100 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1101 );
1102
1103 let missing_round = NUM_ROUNDS as Round - 3;
1105 let missing_block_refs: Vec<BlockRef> = all_blocks
1106 .iter()
1107 .filter(|b| b.reference().round == missing_round)
1108 .map(|b| b.reference())
1109 .take(2)
1110 .collect();
1111 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1112 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1114 let results = authority_service
1115 .handle_fetch_blocks(
1116 AuthorityIndex::new_for_test(0),
1117 missing_block_refs.clone(),
1118 highest_accepted_rounds,
1119 false,
1120 )
1121 .await
1122 .unwrap();
1123
1124 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1126 .iter()
1127 .map(|b| {
1128 let signed = bcs::from_bytes(b).unwrap();
1129 let block = VerifiedBlock::new_verified(signed, b.clone());
1130 (block.reference(), block)
1131 })
1132 .collect();
1133 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1134 for b in &missing_block_refs {
1136 assert!(blocks.contains_key(b));
1137 }
1138 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1140 for b in blocks.keys() {
1141 assert!(b.round <= missing_round);
1142 assert!(expected_authors.contains(&b.author));
1143 }
1144
1145 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1147 highest_accepted_rounds[0] = (NUM_ROUNDS as Round) - 5;
1149 highest_accepted_rounds[1] = (NUM_ROUNDS as Round) - 3;
1150 let results = authority_service
1151 .handle_fetch_blocks(
1152 AuthorityIndex::new_for_test(0),
1153 vec![],
1154 highest_accepted_rounds.clone(),
1155 false,
1156 )
1157 .await
1158 .unwrap();
1159
1160 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1162 .iter()
1163 .map(|b| {
1164 let signed = bcs::from_bytes(b).unwrap();
1165 let block = VerifiedBlock::new_verified(signed, b.clone());
1166 (block.reference(), block)
1167 })
1168 .collect();
1169 assert_eq!(blocks.len(), context.parameters.max_blocks_per_fetch);
1170 for block_ref in blocks.keys() {
1172 let accepted = highest_accepted_rounds[block_ref.author];
1173 assert!(block_ref.round > accepted);
1174 }
1175 let max_round_in_result = blocks.keys().map(|b| b.round).max().unwrap();
1178 assert!(
1181 max_round_in_result <= 4,
1182 "Expected low rounds from fair round-order fetching, got max round {}",
1183 max_round_in_result
1184 );
1185
1186 let missing_block_refs: Vec<BlockRef> = all_blocks
1188 .iter()
1189 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1190 .map(|b| b.reference())
1191 .take(5)
1192 .collect();
1193 let results = authority_service
1194 .handle_fetch_blocks(
1195 AuthorityIndex::new_for_test(0),
1196 missing_block_refs.clone(),
1197 vec![],
1198 false,
1199 )
1200 .await
1201 .unwrap();
1202
1203 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1205 .iter()
1206 .map(|b| {
1207 let signed = bcs::from_bytes(b).unwrap();
1208 let block = VerifiedBlock::new_verified(signed, b.clone());
1209 (block.reference(), block)
1210 })
1211 .collect();
1212 assert_eq!(blocks.len(), 5);
1213 for b in &missing_block_refs {
1214 assert!(blocks.contains_key(b));
1215 }
1216 }
1217
1218 #[tokio::test(flavor = "current_thread", start_paused = true)]
1219 async fn test_handle_fetch_latest_blocks() {
1220 let (context, _keys) = Context::new_for_test(4);
1222 let context = Arc::new(context);
1223 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1224 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1225 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1226 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1227 let fake_client = Arc::new(FakeNetworkClient::default());
1228 let network_client = Arc::new(SynchronizerClient::new(
1229 context.clone(),
1230 Some(fake_client.clone()),
1231 Some(fake_client.clone()),
1232 ));
1233 let store = Arc::new(MemStore::new());
1234 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1235 let transaction_vote_tracker =
1236 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1237 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1238 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1239 let synchronizer = Synchronizer::start(
1240 network_client,
1241 context.clone(),
1242 core_dispatcher.clone(),
1243 commit_vote_monitor.clone(),
1244 block_verifier.clone(),
1245 transaction_vote_tracker.clone(),
1246 round_tracker.clone(),
1247 dag_state.clone(),
1248 peers_pool.clone(),
1249 true,
1250 );
1251 let block_sync_service = Arc::new(BlockSyncService::new(
1252 context.clone(),
1253 dag_state.clone(),
1254 store.clone(),
1255 ));
1256 let authority_service = Arc::new(AuthorityService::new(
1257 context.clone(),
1258 block_verifier,
1259 commit_vote_monitor,
1260 round_tracker,
1261 synchronizer,
1262 core_dispatcher.clone(),
1263 rx_block_broadcast,
1264 transaction_vote_tracker,
1265 dag_state.clone(),
1266 block_sync_service,
1267 ));
1268
1269 let mut dag_builder = DagBuilder::new(context.clone());
1271 dag_builder
1272 .layers(1..=10)
1273 .authorities(vec![AuthorityIndex::new_for_test(2)])
1274 .equivocate(1)
1275 .build()
1276 .persist_layers(dag_state);
1277
1278 let authorities_to_request = vec![
1280 AuthorityIndex::new_for_test(1),
1281 AuthorityIndex::new_for_test(2),
1282 ];
1283 let results = authority_service
1284 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1285 .await;
1286
1287 let serialised_blocks = results.unwrap();
1289 for serialised_block in serialised_blocks {
1290 let signed_block: SignedBlock =
1291 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1292 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1293
1294 assert_eq!(verified_block.round(), 10);
1295 }
1296 }
1297
1298 #[tokio::test(flavor = "current_thread", start_paused = true)]
1299 async fn test_handle_subscribe_blocks() {
1300 let (context, _keys) = Context::new_for_test(4);
1301 let context = Arc::new(context);
1302 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1303 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1304 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1305 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1306 let fake_client = Arc::new(FakeNetworkClient::default());
1307 let network_client = Arc::new(SynchronizerClient::new(
1308 context.clone(),
1309 Some(fake_client.clone()),
1310 Some(fake_client.clone()),
1311 ));
1312 let store = Arc::new(MemStore::new());
1313 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1314 let transaction_vote_tracker =
1315 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1316 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1317 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1318 let synchronizer = Synchronizer::start(
1319 network_client,
1320 context.clone(),
1321 core_dispatcher.clone(),
1322 commit_vote_monitor.clone(),
1323 block_verifier.clone(),
1324 transaction_vote_tracker.clone(),
1325 round_tracker.clone(),
1326 dag_state.clone(),
1327 peers_pool.clone(),
1328 false,
1329 );
1330 let block_sync_service = Arc::new(BlockSyncService::new(
1331 context.clone(),
1332 dag_state.clone(),
1333 store.clone(),
1334 ));
1335
1336 dag_state
1338 .write()
1339 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1340 dag_state
1341 .write()
1342 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1343 dag_state
1344 .write()
1345 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1346
1347 let authority_service = Arc::new(AuthorityService::new(
1348 context.clone(),
1349 block_verifier,
1350 commit_vote_monitor,
1351 round_tracker,
1352 synchronizer,
1353 core_dispatcher.clone(),
1354 rx_block_broadcast,
1355 transaction_vote_tracker,
1356 dag_state.clone(),
1357 block_sync_service,
1358 ));
1359
1360 let peer = context.committee.to_authority_index(1).unwrap();
1361
1362 {
1365 let mut stream = authority_service
1366 .handle_subscribe_blocks(peer, 100)
1367 .await
1368 .unwrap();
1369 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1370 assert_eq!(
1371 block.round(),
1372 15,
1373 "Should return last proposed block as fallback"
1374 );
1375 assert_eq!(block.author().value(), 0);
1376 }
1377
1378 {
1381 let mut stream = authority_service
1382 .handle_subscribe_blocks(peer, 7)
1383 .await
1384 .unwrap();
1385
1386 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1387 assert_eq!(block1.round(), 10, "Should return block at round 10");
1388
1389 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1390 assert_eq!(block2.round(), 15, "Should return block at round 15");
1391 }
1392 }
1393
1394 #[tokio::test(flavor = "current_thread", start_paused = true)]
1395 async fn test_handle_subscribe_blocks_not_proposed() {
1396 let (context, _keys) = Context::new_for_test(4);
1397 let context = Arc::new(context);
1398 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1399 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1400 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1401 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1402 let fake_client = Arc::new(FakeNetworkClient::default());
1403 let network_client = Arc::new(SynchronizerClient::new(
1404 context.clone(),
1405 Some(fake_client.clone()),
1406 Some(fake_client.clone()),
1407 ));
1408 let store = Arc::new(MemStore::new());
1409 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1410 let transaction_vote_tracker =
1411 TransactionVoteTracker::new(context.clone(), block_verifier.clone(), dag_state.clone());
1412 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1413 let peers_pool = Arc::new(PeersPool::new(context.clone()));
1414 let synchronizer = Synchronizer::start(
1415 network_client,
1416 context.clone(),
1417 core_dispatcher.clone(),
1418 commit_vote_monitor.clone(),
1419 block_verifier.clone(),
1420 transaction_vote_tracker.clone(),
1421 round_tracker.clone(),
1422 dag_state.clone(),
1423 peers_pool.clone(),
1424 false,
1425 );
1426 let block_sync_service = Arc::new(BlockSyncService::new(
1427 context.clone(),
1428 dag_state.clone(),
1429 store.clone(),
1430 ));
1431
1432 let authority_service = Arc::new(AuthorityService::new(
1435 context.clone(),
1436 block_verifier,
1437 commit_vote_monitor,
1438 round_tracker,
1439 synchronizer,
1440 core_dispatcher.clone(),
1441 rx_block_broadcast,
1442 transaction_vote_tracker,
1443 dag_state.clone(),
1444 block_sync_service,
1445 ));
1446
1447 let peer = context.committee.to_authority_index(1).unwrap();
1448
1449 let mut stream = authority_service
1451 .handle_subscribe_blocks(peer, 0)
1452 .await
1453 .unwrap();
1454
1455 use futures::poll;
1457 use std::task::Poll;
1458 let poll_result = poll!(stream.next());
1459 assert!(
1460 matches!(poll_result, Poll::Pending),
1461 "Should not receive genesis block on subscription stream"
1462 );
1463 }
1464}