1use std::{
5 collections::{BTreeMap, BTreeSet},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26 CommitIndex,
27 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28 block_verifier::BlockVerifier,
29 commit::{CommitAPI as _, CommitRange, TrustedCommit},
30 commit_vote_monitor::CommitVoteMonitor,
31 context::Context,
32 core_thread::CoreThreadDispatcher,
33 dag_state::DagState,
34 error::{ConsensusError, ConsensusResult},
35 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36 round_tracker::RoundTracker,
37 stake_aggregator::{QuorumThreshold, StakeAggregator},
38 storage::Store,
39 synchronizer::SynchronizerHandle,
40 transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47 context: Arc<Context>,
48 commit_vote_monitor: Arc<CommitVoteMonitor>,
49 block_verifier: Arc<dyn BlockVerifier>,
50 synchronizer: Arc<SynchronizerHandle>,
51 core_dispatcher: Arc<C>,
52 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53 subscription_counter: Arc<SubscriptionCounter>,
54 transaction_certifier: TransactionCertifier,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 round_tracker: Arc<RwLock<RoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61 pub(crate) fn new(
62 context: Arc<Context>,
63 block_verifier: Arc<dyn BlockVerifier>,
64 commit_vote_monitor: Arc<CommitVoteMonitor>,
65 round_tracker: Arc<RwLock<RoundTracker>>,
66 synchronizer: Arc<SynchronizerHandle>,
67 core_dispatcher: Arc<C>,
68 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69 transaction_certifier: TransactionCertifier,
70 dag_state: Arc<RwLock<DagState>>,
71 store: Arc<dyn Store>,
72 ) -> Self {
73 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74 Self {
75 context,
76 block_verifier,
77 commit_vote_monitor,
78 synchronizer,
79 core_dispatcher,
80 rx_block_broadcast,
81 subscription_counter,
82 transaction_certifier,
83 dag_state,
84 store,
85 round_tracker,
86 }
87 }
88
89 fn parse_excluded_ancestors(
91 &self,
92 peer: AuthorityIndex,
93 block: &VerifiedBlock,
94 mut excluded_ancestors: Vec<Vec<u8>>,
95 ) -> ConsensusResult<Vec<BlockRef>> {
96 let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98 let excluded_ancestors_limit = self.context.committee.size() * 2;
99 if excluded_ancestors.len() > excluded_ancestors_limit {
100 debug!(
101 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102 excluded_ancestors.len() - excluded_ancestors_limit,
103 peer,
104 peer_hostname,
105 );
106 excluded_ancestors.truncate(excluded_ancestors_limit);
107 }
108
109 let excluded_ancestors = excluded_ancestors
110 .into_iter()
111 .map(|serialized| {
112 let block_ref: BlockRef =
113 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114 if !self.context.committee.is_valid_index(block_ref.author) {
115 return Err(ConsensusError::InvalidAuthorityIndex {
116 index: block_ref.author,
117 max: self.context.committee.size(),
118 });
119 }
120 if block_ref.round >= block.round() {
121 return Err(ConsensusError::InvalidAncestorRound {
122 ancestor: block_ref.round,
123 block: block.round(),
124 });
125 }
126 Ok(block_ref)
127 })
128 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130 for excluded_ancestor in &excluded_ancestors {
131 let excluded_ancestor_hostname = &self
132 .context
133 .committee
134 .authority(excluded_ancestor.author)
135 .hostname;
136 self.context
137 .metrics
138 .node_metrics
139 .network_excluded_ancestors_count_by_authority
140 .with_label_values(&[excluded_ancestor_hostname])
141 .inc();
142 }
143 self.context
144 .metrics
145 .node_metrics
146 .network_received_excluded_ancestors_from_authority
147 .with_label_values(&[peer_hostname])
148 .inc_by(excluded_ancestors.len() as u64);
149
150 Ok(excluded_ancestors)
151 }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156 async fn handle_send_block(
157 &self,
158 peer: AuthorityIndex,
159 serialized_block: ExtendedSerializedBlock,
160 ) -> ConsensusResult<()> {
161 fail_point_async!("consensus-rpc-response");
162
163 let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165 let signed_block: SignedBlock =
167 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169 if peer != signed_block.author() {
171 self.context
172 .metrics
173 .node_metrics
174 .invalid_blocks
175 .with_label_values(&[
176 peer_hostname.as_str(),
177 "handle_send_block",
178 "UnexpectedAuthority",
179 ])
180 .inc();
181 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
182 info!("Block with wrong authority from {}: {}", peer, e);
183 return Err(e);
184 }
185
186 let (verified_block, reject_txn_votes) = self
188 .block_verifier
189 .verify_and_vote(signed_block, serialized_block.block)
190 .tap_err(|e| {
191 self.context
192 .metrics
193 .node_metrics
194 .invalid_blocks
195 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
196 .inc();
197 info!("Invalid block from {}: {}", peer, e);
198 })?;
199 let excluded_ancestors = self
200 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
201 .tap_err(|e| {
202 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
203 self.context
204 .metrics
205 .node_metrics
206 .invalid_blocks
207 .with_label_values(&[peer_hostname.as_str(), "handle_send_block", e.name()])
208 .inc();
209 })?;
210
211 let block_ref = verified_block.reference();
212 debug!("Received block {} via send block.", block_ref);
213
214 self.context
215 .metrics
216 .node_metrics
217 .verified_blocks
218 .with_label_values(&[peer_hostname])
219 .inc();
220
221 let now = self.context.clock.timestamp_utc_ms();
222 let forward_time_drift =
223 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
224
225 self.context
226 .metrics
227 .node_metrics
228 .block_timestamp_drift_ms
229 .with_label_values(&[peer_hostname.as_str(), "handle_send_block"])
230 .inc_by(forward_time_drift.as_millis() as u64);
231
232 self.commit_vote_monitor.observe_block(&verified_block);
235
236 self.round_tracker
238 .write()
239 .update_from_verified_block(&ExtendedBlock {
240 block: verified_block.clone(),
241 excluded_ancestors: excluded_ancestors.clone(),
242 });
243
244 let last_commit_index = self.dag_state.read().last_commit_index();
253 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
254 if last_commit_index
257 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
258 < quorum_commit_index
259 {
260 self.context
261 .metrics
262 .node_metrics
263 .rejected_blocks
264 .with_label_values(&["commit_lagging"])
265 .inc();
266 debug!(
267 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
268 block_ref, last_commit_index, quorum_commit_index,
269 );
270 return Err(ConsensusError::BlockRejected {
271 block_ref,
272 reason: format!(
273 "Last commit index is lagging quorum commit index too much ({} < {})",
274 last_commit_index, quorum_commit_index,
275 ),
276 });
277 }
278
279 if self.context.protocol_config.mysticeti_fastpath() {
282 self.transaction_certifier
283 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
284 }
285
286 let missing_ancestors = self
288 .core_dispatcher
289 .add_blocks(vec![verified_block.clone()])
290 .await
291 .map_err(|_| ConsensusError::Shutdown)?;
292
293 if !missing_ancestors.is_empty() {
295 self.context
296 .metrics
297 .node_metrics
298 .handler_received_block_missing_ancestors
299 .with_label_values(&[peer_hostname])
300 .inc_by(missing_ancestors.len() as u64);
301 let synchronizer = self.synchronizer.clone();
302 spawn_monitored_task!(async move {
303 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
308 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
309 }
310 });
311 }
312
313 let missing_excluded_ancestors = self
315 .core_dispatcher
316 .check_block_refs(excluded_ancestors)
317 .await
318 .map_err(|_| ConsensusError::Shutdown)?;
319 if !missing_excluded_ancestors.is_empty() {
320 self.context
321 .metrics
322 .node_metrics
323 .network_excluded_ancestors_sent_to_fetch
324 .with_label_values(&[peer_hostname])
325 .inc_by(missing_excluded_ancestors.len() as u64);
326
327 let synchronizer = self.synchronizer.clone();
328 spawn_monitored_task!(async move {
329 if let Err(err) = synchronizer
330 .fetch_blocks(missing_excluded_ancestors, peer)
331 .await
332 {
333 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
334 }
335 });
336 }
337
338 Ok(())
339 }
340
341 async fn handle_subscribe_blocks(
342 &self,
343 peer: AuthorityIndex,
344 last_received: Round,
345 ) -> ConsensusResult<BlockStream> {
346 fail_point_async!("consensus-rpc-response");
347
348 let past_proposed_blocks = {
357 let dag_state = self.dag_state.read();
358
359 let mut proposed_blocks =
360 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
361 if proposed_blocks.is_empty() {
362 let last_proposed_block = dag_state.get_last_proposed_block();
363 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
364 vec![last_proposed_block]
365 } else {
366 vec![]
367 };
368 }
369 stream::iter(
370 proposed_blocks
371 .into_iter()
372 .map(|block| ExtendedSerializedBlock {
373 block: block.serialized().clone(),
374 excluded_ancestors: vec![],
375 }),
376 )
377 };
378
379 let broadcasted_blocks = BroadcastedBlockStream::new(
380 peer,
381 self.rx_block_broadcast.resubscribe(),
382 self.subscription_counter.clone(),
383 );
384
385 Ok(Box::pin(past_proposed_blocks.chain(
387 broadcasted_blocks.map(ExtendedSerializedBlock::from),
388 )))
389 }
390
391 async fn handle_fetch_blocks(
399 &self,
400 _peer: AuthorityIndex,
401 mut block_refs: Vec<BlockRef>,
402 highest_accepted_rounds: Vec<Round>,
403 breadth_first: bool,
404 ) -> ConsensusResult<Vec<Bytes>> {
405 fail_point_async!("consensus-rpc-response");
406
407 if !highest_accepted_rounds.is_empty()
408 && highest_accepted_rounds.len() != self.context.committee.size()
409 {
410 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
411 highest_accepted_rounds.len(),
412 self.context.committee.size(),
413 ));
414 }
415
416 let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
418 self.context.parameters.max_blocks_per_sync
419 } else {
420 self.context.parameters.max_blocks_per_fetch
421 };
422 if block_refs.len() > max_response_num_blocks {
423 block_refs.truncate(max_response_num_blocks);
424 }
425
426 for block in &block_refs {
428 if !self.context.committee.is_valid_index(block.author) {
429 return Err(ConsensusError::InvalidAuthorityIndex {
430 index: block.author,
431 max: self.context.committee.size(),
432 });
433 }
434 if block.round == GENESIS_ROUND {
435 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
436 }
437 }
438
439 let blocks = if !highest_accepted_rounds.is_empty() {
441 block_refs.sort();
442 block_refs.dedup();
443 let mut blocks = self
444 .dag_state
445 .read()
446 .get_blocks(&block_refs)
447 .into_iter()
448 .flatten()
449 .collect::<Vec<_>>();
450
451 if breadth_first {
452 let mut missing_ancestors = blocks
454 .iter()
455 .flat_map(|block| block.ancestors().to_vec())
456 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
457 .collect::<BTreeSet<_>>()
458 .into_iter()
459 .collect::<Vec<_>>();
460
461 let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
464 if selected_num_blocks < missing_ancestors.len() {
465 missing_ancestors = missing_ancestors
466 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
467 .copied()
468 .collect::<Vec<_>>();
469 }
470 let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
471 blocks.extend(ancestor_blocks.into_iter().flatten());
472 } else {
473 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
476 for block_ref in blocks.iter().map(|b| b.reference()) {
477 let entry = lowest_missing_rounds
478 .entry(block_ref.author)
479 .or_insert(block_ref.round);
480 *entry = (*entry).min(block_ref.round);
481 }
482
483 let dag_state = self.dag_state.read();
489 for (authority, lowest_missing_round) in lowest_missing_rounds {
490 let highest_accepted_round = highest_accepted_rounds[authority];
491 if highest_accepted_round >= lowest_missing_round {
492 continue;
493 }
494 let missing_blocks = dag_state.get_cached_blocks_in_range(
495 authority,
496 highest_accepted_round + 1,
497 lowest_missing_round,
498 self.context
499 .parameters
500 .max_blocks_per_sync
501 .saturating_sub(blocks.len()),
502 );
503 blocks.extend(missing_blocks);
504 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
505 blocks.truncate(self.context.parameters.max_blocks_per_sync);
506 break;
507 }
508 }
509 }
510
511 blocks
512 } else {
513 self.dag_state
514 .read()
515 .get_blocks(&block_refs)
516 .into_iter()
517 .flatten()
518 .collect()
519 };
520
521 let bytes = blocks
523 .into_iter()
524 .map(|block| block.serialized().clone())
525 .collect::<Vec<_>>();
526 Ok(bytes)
527 }
528
529 async fn handle_fetch_commits(
530 &self,
531 _peer: AuthorityIndex,
532 commit_range: CommitRange,
533 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
534 fail_point_async!("consensus-rpc-response");
535
536 let inclusive_end = commit_range.end().min(
538 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
539 - 1,
540 );
541 let mut commits = self
542 .store
543 .scan_commits((commit_range.start()..=inclusive_end).into())?;
544 let mut certifier_block_refs = vec![];
545 'commit: while let Some(c) = commits.last() {
546 let index = c.index();
547 let votes = self.store.read_commit_votes(index)?;
548 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
549 for v in &votes {
550 stake_aggregator.add(v.author, &self.context.committee);
551 }
552 if stake_aggregator.reached_threshold(&self.context.committee) {
553 certifier_block_refs = votes;
554 break 'commit;
555 } else {
556 debug!(
557 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
558 index,
559 stake_aggregator.stake(),
560 stake_aggregator.threshold(&self.context.committee)
561 );
562 self.context
563 .metrics
564 .node_metrics
565 .commit_sync_fetch_commits_handler_uncertified_skipped
566 .inc();
567 commits.pop();
568 }
569 }
570 let certifier_blocks = self
571 .store
572 .read_blocks(&certifier_block_refs)?
573 .into_iter()
574 .flatten()
575 .collect();
576 Ok((commits, certifier_blocks))
577 }
578
579 async fn handle_fetch_latest_blocks(
580 &self,
581 peer: AuthorityIndex,
582 authorities: Vec<AuthorityIndex>,
583 ) -> ConsensusResult<Vec<Bytes>> {
584 fail_point_async!("consensus-rpc-response");
585
586 if authorities.len() > self.context.committee.size() {
587 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
588 }
589
590 for authority in &authorities {
592 if !self.context.committee.is_valid_index(*authority) {
593 return Err(ConsensusError::InvalidAuthorityIndex {
594 index: *authority,
595 max: self.context.committee.size(),
596 });
597 }
598 }
599
600 let mut blocks = vec![];
604 let dag_state = self.dag_state.read();
605 for authority in authorities {
606 let block = dag_state.get_last_block_for_authority(authority);
607
608 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
609
610 if block.round() != GENESIS_ROUND {
612 blocks.push(block);
613 }
614 }
615
616 let result = blocks
618 .into_iter()
619 .map(|block| block.serialized().clone())
620 .collect::<Vec<_>>();
621
622 Ok(result)
623 }
624
625 async fn handle_get_latest_rounds(
626 &self,
627 _peer: AuthorityIndex,
628 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
629 fail_point_async!("consensus-rpc-response");
630
631 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
632
633 let blocks = self
634 .dag_state
635 .read()
636 .get_last_cached_block_per_authority(Round::MAX);
637 let highest_accepted_rounds = blocks
638 .into_iter()
639 .map(|(block, _)| block.round())
640 .collect::<Vec<_>>();
641
642 Ok((highest_received_rounds, highest_accepted_rounds))
643 }
644}
645
646struct Counter {
647 count: usize,
648 subscriptions_by_authority: Vec<usize>,
649}
650
651struct SubscriptionCounter {
653 context: Arc<Context>,
654 counter: parking_lot::Mutex<Counter>,
655}
656
657impl SubscriptionCounter {
658 fn new(context: Arc<Context>) -> Self {
659 for (_, authority) in context.committee.authorities() {
661 context
662 .metrics
663 .node_metrics
664 .subscribed_by
665 .with_label_values(&[authority.hostname.as_str()])
666 .set(0);
667 }
668
669 Self {
670 counter: parking_lot::Mutex::new(Counter {
671 count: 0,
672 subscriptions_by_authority: vec![0; context.committee.size()],
673 }),
674 context,
675 }
676 }
677
678 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
679 let mut counter = self.counter.lock();
680 counter.count += 1;
681 counter.subscriptions_by_authority[peer] += 1;
682
683 let peer_hostname = &self.context.committee.authority(peer).hostname;
684 self.context
685 .metrics
686 .node_metrics
687 .subscribed_by
688 .with_label_values(&[peer_hostname])
689 .set(1);
690
691 Ok(())
692 }
693
694 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
695 let mut counter = self.counter.lock();
696 counter.count -= 1;
697 counter.subscriptions_by_authority[peer] -= 1;
698
699 if counter.subscriptions_by_authority[peer] == 0 {
700 let peer_hostname = &self.context.committee.authority(peer).hostname;
701 self.context
702 .metrics
703 .node_metrics
704 .subscribed_by
705 .with_label_values(&[peer_hostname])
706 .set(0);
707 }
708
709 Ok(())
710 }
711}
712
713type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
716
717struct BroadcastStream<T> {
720 peer: AuthorityIndex,
721 inner: ReusableBoxFuture<
723 'static,
724 (
725 Result<T, broadcast::error::RecvError>,
726 broadcast::Receiver<T>,
727 ),
728 >,
729 subscription_counter: Arc<SubscriptionCounter>,
731}
732
733impl<T: 'static + Clone + Send> BroadcastStream<T> {
734 pub fn new(
735 peer: AuthorityIndex,
736 rx: broadcast::Receiver<T>,
737 subscription_counter: Arc<SubscriptionCounter>,
738 ) -> Self {
739 if let Err(err) = subscription_counter.increment(peer) {
740 match err {
741 ConsensusError::Shutdown => {}
742 _ => panic!("Unexpected error: {err}"),
743 }
744 }
745 Self {
746 peer,
747 inner: ReusableBoxFuture::new(make_recv_future(rx)),
748 subscription_counter,
749 }
750 }
751}
752
753impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
754 type Item = T;
755
756 fn poll_next(
757 mut self: Pin<&mut Self>,
758 cx: &mut task::Context<'_>,
759 ) -> task::Poll<Option<Self::Item>> {
760 let peer = self.peer;
761 let maybe_item = loop {
762 let (result, rx) = ready!(self.inner.poll(cx));
763 self.inner.set(make_recv_future(rx));
764
765 match result {
766 Ok(item) => break Some(item),
767 Err(broadcast::error::RecvError::Closed) => {
768 info!("Block BroadcastedBlockStream {} closed", peer);
769 break None;
770 }
771 Err(broadcast::error::RecvError::Lagged(n)) => {
772 warn!(
773 "Block BroadcastedBlockStream {} lagged by {} messages",
774 peer, n
775 );
776 continue;
777 }
778 }
779 };
780 task::Poll::Ready(maybe_item)
781 }
782}
783
784impl<T> Drop for BroadcastStream<T> {
785 fn drop(&mut self) {
786 if let Err(err) = self.subscription_counter.decrement(self.peer) {
787 match err {
788 ConsensusError::Shutdown => {}
789 _ => panic!("Unexpected error: {err}"),
790 }
791 }
792 }
793}
794
795async fn make_recv_future<T: Clone>(
796 mut rx: broadcast::Receiver<T>,
797) -> (
798 Result<T, broadcast::error::RecvError>,
799 broadcast::Receiver<T>,
800) {
801 let result = rx.recv().await;
802 (result, rx)
803}
804
805#[cfg(test)]
808mod tests {
809 use std::{
810 collections::{BTreeMap, BTreeSet},
811 sync::Arc,
812 time::Duration,
813 };
814
815 use async_trait::async_trait;
816 use bytes::Bytes;
817 use consensus_config::AuthorityIndex;
818 use consensus_types::block::{BlockDigest, BlockRef, Round};
819 use mysten_metrics::monitored_mpsc;
820 use parking_lot::{Mutex, RwLock};
821 use tokio::{sync::broadcast, time::sleep};
822
823 use futures::StreamExt as _;
824
825 use crate::{
826 authority_service::AuthorityService,
827 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
828 commit::{CertifiedCommits, CommitRange},
829 commit_vote_monitor::CommitVoteMonitor,
830 context::Context,
831 core_thread::{CoreError, CoreThreadDispatcher},
832 dag_state::DagState,
833 error::ConsensusResult,
834 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
835 round_tracker::RoundTracker,
836 storage::mem_store::MemStore,
837 synchronizer::Synchronizer,
838 test_dag_builder::DagBuilder,
839 transaction_certifier::TransactionCertifier,
840 };
841 struct FakeCoreThreadDispatcher {
842 blocks: Mutex<Vec<VerifiedBlock>>,
843 }
844
845 impl FakeCoreThreadDispatcher {
846 fn new() -> Self {
847 Self {
848 blocks: Mutex::new(vec![]),
849 }
850 }
851
852 fn get_blocks(&self) -> Vec<VerifiedBlock> {
853 self.blocks.lock().clone()
854 }
855 }
856
857 #[async_trait]
858 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
859 async fn add_blocks(
860 &self,
861 blocks: Vec<VerifiedBlock>,
862 ) -> Result<BTreeSet<BlockRef>, CoreError> {
863 let block_refs = blocks.iter().map(|b| b.reference()).collect();
864 self.blocks.lock().extend(blocks);
865 Ok(block_refs)
866 }
867
868 async fn check_block_refs(
869 &self,
870 _block_refs: Vec<BlockRef>,
871 ) -> Result<BTreeSet<BlockRef>, CoreError> {
872 Ok(BTreeSet::new())
873 }
874
875 async fn add_certified_commits(
876 &self,
877 _commits: CertifiedCommits,
878 ) -> Result<BTreeSet<BlockRef>, CoreError> {
879 todo!()
880 }
881
882 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
883 Ok(())
884 }
885
886 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
887 Ok(Default::default())
888 }
889
890 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
891 todo!()
892 }
893
894 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
895 todo!()
896 }
897 }
898
899 #[derive(Default)]
900 struct FakeNetworkClient {}
901
902 #[async_trait]
903 impl NetworkClient for FakeNetworkClient {
904 async fn send_block(
905 &self,
906 _peer: AuthorityIndex,
907 _block: &VerifiedBlock,
908 _timeout: Duration,
909 ) -> ConsensusResult<()> {
910 unimplemented!("Unimplemented")
911 }
912
913 async fn subscribe_blocks(
914 &self,
915 _peer: AuthorityIndex,
916 _last_received: Round,
917 _timeout: Duration,
918 ) -> ConsensusResult<BlockStream> {
919 unimplemented!("Unimplemented")
920 }
921
922 async fn fetch_blocks(
923 &self,
924 _peer: AuthorityIndex,
925 _block_refs: Vec<BlockRef>,
926 _highest_accepted_rounds: Vec<Round>,
927 _breadth_first: bool,
928 _timeout: Duration,
929 ) -> ConsensusResult<Vec<Bytes>> {
930 unimplemented!("Unimplemented")
931 }
932
933 async fn fetch_commits(
934 &self,
935 _peer: AuthorityIndex,
936 _commit_range: CommitRange,
937 _timeout: Duration,
938 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
939 unimplemented!("Unimplemented")
940 }
941
942 async fn fetch_latest_blocks(
943 &self,
944 _peer: AuthorityIndex,
945 _authorities: Vec<AuthorityIndex>,
946 _timeout: Duration,
947 ) -> ConsensusResult<Vec<Bytes>> {
948 unimplemented!("Unimplemented")
949 }
950
951 async fn get_latest_rounds(
952 &self,
953 _peer: AuthorityIndex,
954 _timeout: Duration,
955 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
956 unimplemented!("Unimplemented")
957 }
958 }
959
960 #[tokio::test(flavor = "current_thread", start_paused = true)]
961 async fn test_handle_send_block() {
962 let (context, _keys) = Context::new_for_test(4);
963 let context = Arc::new(context);
964 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
965 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
966 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
967 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
968 let network_client = Arc::new(FakeNetworkClient::default());
969 let (blocks_sender, _blocks_receiver) =
970 monitored_mpsc::unbounded_channel("consensus_block_output");
971 let store = Arc::new(MemStore::new());
972 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
973 let transaction_certifier = TransactionCertifier::new(
974 context.clone(),
975 block_verifier.clone(),
976 dag_state.clone(),
977 blocks_sender,
978 );
979 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
980 let synchronizer = Synchronizer::start(
981 network_client,
982 context.clone(),
983 core_dispatcher.clone(),
984 commit_vote_monitor.clone(),
985 block_verifier.clone(),
986 transaction_certifier.clone(),
987 round_tracker.clone(),
988 dag_state.clone(),
989 false,
990 );
991 let authority_service = Arc::new(AuthorityService::new(
992 context.clone(),
993 block_verifier,
994 commit_vote_monitor,
995 round_tracker,
996 synchronizer,
997 core_dispatcher.clone(),
998 rx_block_broadcast,
999 transaction_certifier,
1000 dag_state,
1001 store,
1002 ));
1003
1004 let now = context.clock.timestamp_utc_ms();
1006 let max_drift = context.parameters.max_forward_time_drift;
1007 let input_block = VerifiedBlock::new_for_test(
1008 TestBlock::new(9, 0)
1009 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1010 .build(),
1011 );
1012
1013 let service = authority_service.clone();
1014 let serialized = ExtendedSerializedBlock {
1015 block: input_block.serialized().clone(),
1016 excluded_ancestors: vec![],
1017 };
1018
1019 tokio::spawn({
1020 let service = service.clone();
1021 let context = context.clone();
1022 async move {
1023 service
1024 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1025 .await
1026 .unwrap();
1027 }
1028 });
1029
1030 sleep(max_drift / 2).await;
1031
1032 let blocks = core_dispatcher.get_blocks();
1033 assert_eq!(blocks.len(), 1);
1034 assert_eq!(blocks[0], input_block);
1035
1036 let invalid_block =
1038 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1039 let extended_block = ExtendedSerializedBlock {
1040 block: invalid_block.serialized().clone(),
1041 excluded_ancestors: vec![],
1042 };
1043 service
1044 .handle_send_block(
1045 context.committee.to_authority_index(0).unwrap(),
1046 extended_block,
1047 )
1048 .await
1049 .unwrap_err();
1050
1051 let invalid_excluded_ancestors = vec![
1053 bcs::to_bytes(&BlockRef::new(
1054 10,
1055 AuthorityIndex::new_for_test(1000),
1056 BlockDigest::MIN,
1057 ))
1058 .unwrap(),
1059 vec![3u8; 40],
1060 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1061 ];
1062 let extended_block = ExtendedSerializedBlock {
1063 block: input_block.serialized().clone(),
1064 excluded_ancestors: invalid_excluded_ancestors,
1065 };
1066 service
1067 .handle_send_block(
1068 context.committee.to_authority_index(0).unwrap(),
1069 extended_block,
1070 )
1071 .await
1072 .unwrap_err();
1073 }
1074
1075 #[tokio::test(flavor = "current_thread", start_paused = true)]
1076 async fn test_handle_fetch_blocks() {
1077 const NUM_AUTHORITIES: usize = 40;
1080 const NUM_ROUNDS: usize = 40;
1081 let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1082 let context = Arc::new(context);
1083 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1084 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1085 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1086 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1087 let network_client = Arc::new(FakeNetworkClient::default());
1088 let (blocks_sender, _blocks_receiver) =
1089 monitored_mpsc::unbounded_channel("consensus_block_output");
1090 let store = Arc::new(MemStore::new());
1091 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1092 let transaction_certifier = TransactionCertifier::new(
1093 context.clone(),
1094 block_verifier.clone(),
1095 dag_state.clone(),
1096 blocks_sender,
1097 );
1098 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1099 let synchronizer = Synchronizer::start(
1100 network_client,
1101 context.clone(),
1102 core_dispatcher.clone(),
1103 commit_vote_monitor.clone(),
1104 block_verifier.clone(),
1105 transaction_certifier.clone(),
1106 round_tracker.clone(),
1107 dag_state.clone(),
1108 false,
1109 );
1110 let authority_service = Arc::new(AuthorityService::new(
1111 context.clone(),
1112 block_verifier,
1113 commit_vote_monitor,
1114 round_tracker,
1115 synchronizer,
1116 core_dispatcher.clone(),
1117 rx_block_broadcast,
1118 transaction_certifier,
1119 dag_state.clone(),
1120 store,
1121 ));
1122
1123 let mut dag_builder = DagBuilder::new(context.clone());
1125 dag_builder
1126 .layers(1..=(NUM_ROUNDS as u32))
1127 .build()
1128 .persist_layers(dag_state.clone());
1129 let all_blocks = dag_builder.all_blocks();
1130
1131 let missing_block_refs: Vec<BlockRef> = all_blocks
1133 .iter()
1134 .rev()
1135 .take(2)
1136 .map(|b| b.reference())
1137 .collect();
1138 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1139 let results = authority_service
1140 .handle_fetch_blocks(
1141 AuthorityIndex::new_for_test(0),
1142 missing_block_refs.clone(),
1143 highest_accepted_rounds,
1144 true,
1145 )
1146 .await
1147 .unwrap();
1148
1149 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1151 .iter()
1152 .map(|b| {
1153 let signed = bcs::from_bytes(b).unwrap();
1154 let block = VerifiedBlock::new_verified(signed, b.clone());
1155 (block.reference(), block)
1156 })
1157 .collect();
1158 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1159 for b in &missing_block_refs {
1161 assert!(blocks.contains_key(b));
1162 }
1163 let num_missing_ancestors = blocks
1164 .keys()
1165 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1166 .count();
1167 assert_eq!(
1168 num_missing_ancestors,
1169 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1170 );
1171
1172 let missing_round = NUM_ROUNDS as Round - 3;
1174 let missing_block_refs: Vec<BlockRef> = all_blocks
1175 .iter()
1176 .filter(|b| b.reference().round == missing_round)
1177 .map(|b| b.reference())
1178 .take(2)
1179 .collect();
1180 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1181 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1183 let results = authority_service
1184 .handle_fetch_blocks(
1185 AuthorityIndex::new_for_test(0),
1186 missing_block_refs.clone(),
1187 highest_accepted_rounds,
1188 false,
1189 )
1190 .await
1191 .unwrap();
1192
1193 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1195 .iter()
1196 .map(|b| {
1197 let signed = bcs::from_bytes(b).unwrap();
1198 let block = VerifiedBlock::new_verified(signed, b.clone());
1199 (block.reference(), block)
1200 })
1201 .collect();
1202 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1203 for b in &missing_block_refs {
1205 assert!(blocks.contains_key(b));
1206 }
1207 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1209 for b in blocks.keys() {
1210 assert!(b.round <= missing_round);
1211 assert!(expected_authors.contains(&b.author));
1212 }
1213
1214 let missing_block_refs: Vec<BlockRef> = all_blocks
1216 .iter()
1217 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1218 .map(|b| b.reference())
1219 .take(5)
1220 .collect();
1221 let results = authority_service
1222 .handle_fetch_blocks(
1223 AuthorityIndex::new_for_test(0),
1224 missing_block_refs.clone(),
1225 vec![],
1226 false,
1227 )
1228 .await
1229 .unwrap();
1230
1231 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1233 .iter()
1234 .map(|b| {
1235 let signed = bcs::from_bytes(b).unwrap();
1236 let block = VerifiedBlock::new_verified(signed, b.clone());
1237 (block.reference(), block)
1238 })
1239 .collect();
1240 assert_eq!(blocks.len(), 5);
1241 for b in &missing_block_refs {
1242 assert!(blocks.contains_key(b));
1243 }
1244 }
1245
1246 #[tokio::test(flavor = "current_thread", start_paused = true)]
1247 async fn test_handle_fetch_latest_blocks() {
1248 let (context, _keys) = Context::new_for_test(4);
1250 let context = Arc::new(context);
1251 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1252 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1253 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1254 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1255 let network_client = Arc::new(FakeNetworkClient::default());
1256 let (blocks_sender, _blocks_receiver) =
1257 monitored_mpsc::unbounded_channel("consensus_block_output");
1258 let store = Arc::new(MemStore::new());
1259 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1260 let transaction_certifier = TransactionCertifier::new(
1261 context.clone(),
1262 block_verifier.clone(),
1263 dag_state.clone(),
1264 blocks_sender,
1265 );
1266 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1267 let synchronizer = Synchronizer::start(
1268 network_client,
1269 context.clone(),
1270 core_dispatcher.clone(),
1271 commit_vote_monitor.clone(),
1272 block_verifier.clone(),
1273 transaction_certifier.clone(),
1274 round_tracker.clone(),
1275 dag_state.clone(),
1276 true,
1277 );
1278 let authority_service = Arc::new(AuthorityService::new(
1279 context.clone(),
1280 block_verifier,
1281 commit_vote_monitor,
1282 round_tracker,
1283 synchronizer,
1284 core_dispatcher.clone(),
1285 rx_block_broadcast,
1286 transaction_certifier,
1287 dag_state.clone(),
1288 store,
1289 ));
1290
1291 let mut dag_builder = DagBuilder::new(context.clone());
1293 dag_builder
1294 .layers(1..=10)
1295 .authorities(vec![AuthorityIndex::new_for_test(2)])
1296 .equivocate(1)
1297 .build()
1298 .persist_layers(dag_state);
1299
1300 let authorities_to_request = vec![
1302 AuthorityIndex::new_for_test(1),
1303 AuthorityIndex::new_for_test(2),
1304 ];
1305 let results = authority_service
1306 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1307 .await;
1308
1309 let serialised_blocks = results.unwrap();
1311 for serialised_block in serialised_blocks {
1312 let signed_block: SignedBlock =
1313 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1314 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1315
1316 assert_eq!(verified_block.round(), 10);
1317 }
1318 }
1319
1320 #[tokio::test(flavor = "current_thread", start_paused = true)]
1321 async fn test_handle_subscribe_blocks() {
1322 let (context, _keys) = Context::new_for_test(4);
1323 let context = Arc::new(context);
1324 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1325 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1326 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1327 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1328 let network_client = Arc::new(FakeNetworkClient::default());
1329 let (blocks_sender, _blocks_receiver) =
1330 monitored_mpsc::unbounded_channel("consensus_block_output");
1331 let store = Arc::new(MemStore::new());
1332 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1333 let transaction_certifier = TransactionCertifier::new(
1334 context.clone(),
1335 block_verifier.clone(),
1336 dag_state.clone(),
1337 blocks_sender,
1338 );
1339 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1340 let synchronizer = Synchronizer::start(
1341 network_client,
1342 context.clone(),
1343 core_dispatcher.clone(),
1344 commit_vote_monitor.clone(),
1345 block_verifier.clone(),
1346 transaction_certifier.clone(),
1347 round_tracker.clone(),
1348 dag_state.clone(),
1349 false,
1350 );
1351
1352 dag_state
1354 .write()
1355 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1356 dag_state
1357 .write()
1358 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1359 dag_state
1360 .write()
1361 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1362
1363 let authority_service = Arc::new(AuthorityService::new(
1364 context.clone(),
1365 block_verifier,
1366 commit_vote_monitor,
1367 round_tracker,
1368 synchronizer,
1369 core_dispatcher.clone(),
1370 rx_block_broadcast,
1371 transaction_certifier,
1372 dag_state.clone(),
1373 store,
1374 ));
1375
1376 let peer = context.committee.to_authority_index(1).unwrap();
1377
1378 {
1381 let mut stream = authority_service
1382 .handle_subscribe_blocks(peer, 100)
1383 .await
1384 .unwrap();
1385 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1386 assert_eq!(
1387 block.round(),
1388 15,
1389 "Should return last proposed block as fallback"
1390 );
1391 assert_eq!(block.author().value(), 0);
1392 }
1393
1394 {
1397 let mut stream = authority_service
1398 .handle_subscribe_blocks(peer, 7)
1399 .await
1400 .unwrap();
1401
1402 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1403 assert_eq!(block1.round(), 10, "Should return block at round 10");
1404
1405 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1406 assert_eq!(block2.round(), 15, "Should return block at round 15");
1407 }
1408 }
1409
1410 #[tokio::test(flavor = "current_thread", start_paused = true)]
1411 async fn test_handle_subscribe_blocks_not_proposed() {
1412 let (context, _keys) = Context::new_for_test(4);
1413 let context = Arc::new(context);
1414 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1415 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1416 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1417 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1418 let network_client = Arc::new(FakeNetworkClient::default());
1419 let (blocks_sender, _blocks_receiver) =
1420 monitored_mpsc::unbounded_channel("consensus_block_output");
1421 let store = Arc::new(MemStore::new());
1422 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1423 let transaction_certifier = TransactionCertifier::new(
1424 context.clone(),
1425 block_verifier.clone(),
1426 dag_state.clone(),
1427 blocks_sender,
1428 );
1429 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1430 let synchronizer = Synchronizer::start(
1431 network_client,
1432 context.clone(),
1433 core_dispatcher.clone(),
1434 commit_vote_monitor.clone(),
1435 block_verifier.clone(),
1436 transaction_certifier.clone(),
1437 round_tracker.clone(),
1438 dag_state.clone(),
1439 false,
1440 );
1441
1442 let authority_service = Arc::new(AuthorityService::new(
1445 context.clone(),
1446 block_verifier,
1447 commit_vote_monitor,
1448 round_tracker,
1449 synchronizer,
1450 core_dispatcher.clone(),
1451 rx_block_broadcast,
1452 transaction_certifier,
1453 dag_state.clone(),
1454 store,
1455 ));
1456
1457 let peer = context.committee.to_authority_index(1).unwrap();
1458
1459 let mut stream = authority_service
1461 .handle_subscribe_blocks(peer, 0)
1462 .await
1463 .unwrap();
1464
1465 use futures::poll;
1467 use std::task::Poll;
1468 let poll_result = poll!(stream.next());
1469 assert!(
1470 matches!(poll_result, Poll::Pending),
1471 "Should not receive genesis block on subscription stream"
1472 );
1473 }
1474}