1use std::{
5 collections::{BTreeMap, BTreeSet},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26 CommitIndex,
27 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28 block_verifier::BlockVerifier,
29 commit::{CommitAPI as _, CommitRange, TrustedCommit},
30 commit_vote_monitor::CommitVoteMonitor,
31 context::Context,
32 core_thread::CoreThreadDispatcher,
33 dag_state::DagState,
34 error::{ConsensusError, ConsensusResult},
35 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36 round_tracker::RoundTracker,
37 stake_aggregator::{QuorumThreshold, StakeAggregator},
38 storage::Store,
39 synchronizer::SynchronizerHandle,
40 transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47 context: Arc<Context>,
48 commit_vote_monitor: Arc<CommitVoteMonitor>,
49 block_verifier: Arc<dyn BlockVerifier>,
50 synchronizer: Arc<SynchronizerHandle>,
51 core_dispatcher: Arc<C>,
52 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53 subscription_counter: Arc<SubscriptionCounter>,
54 transaction_certifier: TransactionCertifier,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 round_tracker: Arc<RwLock<RoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61 pub(crate) fn new(
62 context: Arc<Context>,
63 block_verifier: Arc<dyn BlockVerifier>,
64 commit_vote_monitor: Arc<CommitVoteMonitor>,
65 round_tracker: Arc<RwLock<RoundTracker>>,
66 synchronizer: Arc<SynchronizerHandle>,
67 core_dispatcher: Arc<C>,
68 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69 transaction_certifier: TransactionCertifier,
70 dag_state: Arc<RwLock<DagState>>,
71 store: Arc<dyn Store>,
72 ) -> Self {
73 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74 Self {
75 context,
76 block_verifier,
77 commit_vote_monitor,
78 synchronizer,
79 core_dispatcher,
80 rx_block_broadcast,
81 subscription_counter,
82 transaction_certifier,
83 dag_state,
84 store,
85 round_tracker,
86 }
87 }
88
89 fn parse_excluded_ancestors(
91 &self,
92 peer: AuthorityIndex,
93 block: &VerifiedBlock,
94 mut excluded_ancestors: Vec<Vec<u8>>,
95 ) -> ConsensusResult<Vec<BlockRef>> {
96 let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98 let excluded_ancestors_limit = self.context.committee.size() * 2;
99 if excluded_ancestors.len() > excluded_ancestors_limit {
100 debug!(
101 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102 excluded_ancestors.len() - excluded_ancestors_limit,
103 peer,
104 peer_hostname,
105 );
106 excluded_ancestors.truncate(excluded_ancestors_limit);
107 }
108
109 let excluded_ancestors = excluded_ancestors
110 .into_iter()
111 .map(|serialized| {
112 let block_ref: BlockRef =
113 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114 if !self.context.committee.is_valid_index(block_ref.author) {
115 return Err(ConsensusError::InvalidAuthorityIndex {
116 index: block_ref.author,
117 max: self.context.committee.size(),
118 });
119 }
120 if block_ref.round >= block.round() {
121 return Err(ConsensusError::InvalidAncestorRound {
122 ancestor: block_ref.round,
123 block: block.round(),
124 });
125 }
126 Ok(block_ref)
127 })
128 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130 for excluded_ancestor in &excluded_ancestors {
131 let excluded_ancestor_hostname = &self
132 .context
133 .committee
134 .authority(excluded_ancestor.author)
135 .hostname;
136 self.context
137 .metrics
138 .node_metrics
139 .network_excluded_ancestors_count_by_authority
140 .with_label_values(&[excluded_ancestor_hostname])
141 .inc();
142 }
143 self.context
144 .metrics
145 .node_metrics
146 .network_received_excluded_ancestors_from_authority
147 .with_label_values(&[peer_hostname])
148 .inc_by(excluded_ancestors.len() as u64);
149
150 Ok(excluded_ancestors)
151 }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156 async fn handle_send_block(
157 &self,
158 peer: AuthorityIndex,
159 serialized_block: ExtendedSerializedBlock,
160 ) -> ConsensusResult<()> {
161 fail_point_async!("consensus-rpc-response");
162
163 let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165 let signed_block: SignedBlock =
167 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169 if peer != signed_block.author() {
171 self.context
172 .metrics
173 .node_metrics
174 .invalid_blocks
175 .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
176 .inc();
177 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
178 info!("Block with wrong authority from {}: {}", peer, e);
179 return Err(e);
180 }
181
182 let (verified_block, reject_txn_votes) = self
184 .block_verifier
185 .verify_and_vote(signed_block, serialized_block.block)
186 .tap_err(|e| {
187 self.context
188 .metrics
189 .node_metrics
190 .invalid_blocks
191 .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
192 .inc();
193 info!("Invalid block from {}: {}", peer, e);
194 })?;
195 let excluded_ancestors = self
196 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
197 .tap_err(|e| {
198 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
199 self.context
200 .metrics
201 .node_metrics
202 .invalid_blocks
203 .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
204 .inc();
205 })?;
206
207 let block_ref = verified_block.reference();
208 debug!("Received block {} via send block.", block_ref);
209
210 self.context
211 .metrics
212 .node_metrics
213 .verified_blocks
214 .with_label_values(&[peer_hostname])
215 .inc();
216
217 let now = self.context.clock.timestamp_utc_ms();
218 let forward_time_drift =
219 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
220
221 self.context
222 .metrics
223 .node_metrics
224 .block_timestamp_drift_ms
225 .with_label_values(&[peer_hostname, "handle_send_block"])
226 .inc_by(forward_time_drift.as_millis() as u64);
227
228 self.commit_vote_monitor.observe_block(&verified_block);
231
232 self.round_tracker
234 .write()
235 .update_from_verified_block(&ExtendedBlock {
236 block: verified_block.clone(),
237 excluded_ancestors: excluded_ancestors.clone(),
238 });
239
240 let last_commit_index = self.dag_state.read().last_commit_index();
249 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
250 if last_commit_index
253 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
254 < quorum_commit_index
255 {
256 self.context
257 .metrics
258 .node_metrics
259 .rejected_blocks
260 .with_label_values(&["commit_lagging"])
261 .inc();
262 debug!(
263 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
264 block_ref, last_commit_index, quorum_commit_index,
265 );
266 return Err(ConsensusError::BlockRejected {
267 block_ref,
268 reason: format!(
269 "Last commit index is lagging quorum commit index too much ({} < {})",
270 last_commit_index, quorum_commit_index,
271 ),
272 });
273 }
274
275 if self.context.protocol_config.mysticeti_fastpath() {
278 self.transaction_certifier
279 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
280 }
281
282 let missing_ancestors = self
284 .core_dispatcher
285 .add_blocks(vec![verified_block.clone()])
286 .await
287 .map_err(|_| ConsensusError::Shutdown)?;
288
289 if !missing_ancestors.is_empty() {
291 self.context
292 .metrics
293 .node_metrics
294 .handler_received_block_missing_ancestors
295 .with_label_values(&[peer_hostname])
296 .inc_by(missing_ancestors.len() as u64);
297 let synchronizer = self.synchronizer.clone();
298 spawn_monitored_task!(async move {
299 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
304 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
305 }
306 });
307 }
308
309 let missing_excluded_ancestors = self
311 .core_dispatcher
312 .check_block_refs(excluded_ancestors)
313 .await
314 .map_err(|_| ConsensusError::Shutdown)?;
315 if !missing_excluded_ancestors.is_empty() {
316 self.context
317 .metrics
318 .node_metrics
319 .network_excluded_ancestors_sent_to_fetch
320 .with_label_values(&[peer_hostname])
321 .inc_by(missing_excluded_ancestors.len() as u64);
322
323 let synchronizer = self.synchronizer.clone();
324 spawn_monitored_task!(async move {
325 if let Err(err) = synchronizer
326 .fetch_blocks(missing_excluded_ancestors, peer)
327 .await
328 {
329 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
330 }
331 });
332 }
333
334 Ok(())
335 }
336
337 async fn handle_subscribe_blocks(
338 &self,
339 peer: AuthorityIndex,
340 last_received: Round,
341 ) -> ConsensusResult<BlockStream> {
342 fail_point_async!("consensus-rpc-response");
343
344 let past_proposed_blocks = {
353 let dag_state = self.dag_state.read();
354
355 let mut proposed_blocks =
356 dag_state.get_cached_blocks(self.context.own_index, last_received + 1);
357 if proposed_blocks.is_empty() {
358 let last_proposed_block = dag_state.get_last_proposed_block();
359 proposed_blocks = if last_proposed_block.round() > GENESIS_ROUND {
360 vec![last_proposed_block]
361 } else {
362 vec![]
363 };
364 }
365 stream::iter(
366 proposed_blocks
367 .into_iter()
368 .map(|block| ExtendedSerializedBlock {
369 block: block.serialized().clone(),
370 excluded_ancestors: vec![],
371 }),
372 )
373 };
374
375 let broadcasted_blocks = BroadcastedBlockStream::new(
376 peer,
377 self.rx_block_broadcast.resubscribe(),
378 self.subscription_counter.clone(),
379 );
380
381 Ok(Box::pin(past_proposed_blocks.chain(
383 broadcasted_blocks.map(ExtendedSerializedBlock::from),
384 )))
385 }
386
387 async fn handle_fetch_blocks(
395 &self,
396 _peer: AuthorityIndex,
397 mut block_refs: Vec<BlockRef>,
398 highest_accepted_rounds: Vec<Round>,
399 breadth_first: bool,
400 ) -> ConsensusResult<Vec<Bytes>> {
401 fail_point_async!("consensus-rpc-response");
402
403 if !highest_accepted_rounds.is_empty()
404 && highest_accepted_rounds.len() != self.context.committee.size()
405 {
406 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
407 highest_accepted_rounds.len(),
408 self.context.committee.size(),
409 ));
410 }
411
412 let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
414 self.context.parameters.max_blocks_per_sync
415 } else {
416 self.context.parameters.max_blocks_per_fetch
417 };
418 if block_refs.len() > max_response_num_blocks {
419 block_refs.truncate(max_response_num_blocks);
420 }
421
422 for block in &block_refs {
424 if !self.context.committee.is_valid_index(block.author) {
425 return Err(ConsensusError::InvalidAuthorityIndex {
426 index: block.author,
427 max: self.context.committee.size(),
428 });
429 }
430 if block.round == GENESIS_ROUND {
431 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
432 }
433 }
434
435 let blocks = if !highest_accepted_rounds.is_empty() {
437 block_refs.sort();
438 block_refs.dedup();
439 let mut blocks = self
440 .dag_state
441 .read()
442 .get_blocks(&block_refs)
443 .into_iter()
444 .flatten()
445 .collect::<Vec<_>>();
446
447 if breadth_first {
448 let mut missing_ancestors = blocks
450 .iter()
451 .flat_map(|block| block.ancestors().to_vec())
452 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
453 .collect::<BTreeSet<_>>()
454 .into_iter()
455 .collect::<Vec<_>>();
456
457 let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
460 if selected_num_blocks < missing_ancestors.len() {
461 missing_ancestors = missing_ancestors
462 .choose_multiple(&mut mysten_common::random::get_rng(), selected_num_blocks)
463 .copied()
464 .collect::<Vec<_>>();
465 }
466 let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
467 blocks.extend(ancestor_blocks.into_iter().flatten());
468 } else {
469 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
472 for block_ref in blocks.iter().map(|b| b.reference()) {
473 let entry = lowest_missing_rounds
474 .entry(block_ref.author)
475 .or_insert(block_ref.round);
476 *entry = (*entry).min(block_ref.round);
477 }
478
479 let dag_state = self.dag_state.read();
485 for (authority, lowest_missing_round) in lowest_missing_rounds {
486 let highest_accepted_round = highest_accepted_rounds[authority];
487 if highest_accepted_round >= lowest_missing_round {
488 continue;
489 }
490 let missing_blocks = dag_state.get_cached_blocks_in_range(
491 authority,
492 highest_accepted_round + 1,
493 lowest_missing_round,
494 self.context
495 .parameters
496 .max_blocks_per_sync
497 .saturating_sub(blocks.len()),
498 );
499 blocks.extend(missing_blocks);
500 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
501 blocks.truncate(self.context.parameters.max_blocks_per_sync);
502 break;
503 }
504 }
505 }
506
507 blocks
508 } else {
509 self.dag_state
510 .read()
511 .get_blocks(&block_refs)
512 .into_iter()
513 .flatten()
514 .collect()
515 };
516
517 let bytes = blocks
519 .into_iter()
520 .map(|block| block.serialized().clone())
521 .collect::<Vec<_>>();
522 Ok(bytes)
523 }
524
525 async fn handle_fetch_commits(
526 &self,
527 _peer: AuthorityIndex,
528 commit_range: CommitRange,
529 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
530 fail_point_async!("consensus-rpc-response");
531
532 let inclusive_end = commit_range.end().min(
534 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
535 - 1,
536 );
537 let mut commits = self
538 .store
539 .scan_commits((commit_range.start()..=inclusive_end).into())?;
540 let mut certifier_block_refs = vec![];
541 'commit: while let Some(c) = commits.last() {
542 let index = c.index();
543 let votes = self.store.read_commit_votes(index)?;
544 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
545 for v in &votes {
546 stake_aggregator.add(v.author, &self.context.committee);
547 }
548 if stake_aggregator.reached_threshold(&self.context.committee) {
549 certifier_block_refs = votes;
550 break 'commit;
551 } else {
552 debug!(
553 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
554 index,
555 stake_aggregator.stake(),
556 stake_aggregator.threshold(&self.context.committee)
557 );
558 self.context
559 .metrics
560 .node_metrics
561 .commit_sync_fetch_commits_handler_uncertified_skipped
562 .inc();
563 commits.pop();
564 }
565 }
566 let certifier_blocks = self
567 .store
568 .read_blocks(&certifier_block_refs)?
569 .into_iter()
570 .flatten()
571 .collect();
572 Ok((commits, certifier_blocks))
573 }
574
575 async fn handle_fetch_latest_blocks(
576 &self,
577 peer: AuthorityIndex,
578 authorities: Vec<AuthorityIndex>,
579 ) -> ConsensusResult<Vec<Bytes>> {
580 fail_point_async!("consensus-rpc-response");
581
582 if authorities.len() > self.context.committee.size() {
583 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
584 }
585
586 for authority in &authorities {
588 if !self.context.committee.is_valid_index(*authority) {
589 return Err(ConsensusError::InvalidAuthorityIndex {
590 index: *authority,
591 max: self.context.committee.size(),
592 });
593 }
594 }
595
596 let mut blocks = vec![];
600 let dag_state = self.dag_state.read();
601 for authority in authorities {
602 let block = dag_state.get_last_block_for_authority(authority);
603
604 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
605
606 if block.round() != GENESIS_ROUND {
608 blocks.push(block);
609 }
610 }
611
612 let result = blocks
614 .into_iter()
615 .map(|block| block.serialized().clone())
616 .collect::<Vec<_>>();
617
618 Ok(result)
619 }
620
621 async fn handle_get_latest_rounds(
622 &self,
623 _peer: AuthorityIndex,
624 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
625 fail_point_async!("consensus-rpc-response");
626
627 let highest_received_rounds = self.round_tracker.read().local_highest_received_rounds();
628
629 let blocks = self
630 .dag_state
631 .read()
632 .get_last_cached_block_per_authority(Round::MAX);
633 let highest_accepted_rounds = blocks
634 .into_iter()
635 .map(|(block, _)| block.round())
636 .collect::<Vec<_>>();
637
638 Ok((highest_received_rounds, highest_accepted_rounds))
639 }
640}
641
642struct Counter {
643 count: usize,
644 subscriptions_by_authority: Vec<usize>,
645}
646
647struct SubscriptionCounter {
649 context: Arc<Context>,
650 counter: parking_lot::Mutex<Counter>,
651}
652
653impl SubscriptionCounter {
654 fn new(context: Arc<Context>) -> Self {
655 for (_, authority) in context.committee.authorities() {
657 context
658 .metrics
659 .node_metrics
660 .subscribed_by
661 .with_label_values(&[authority.hostname.as_str()])
662 .set(0);
663 }
664
665 Self {
666 counter: parking_lot::Mutex::new(Counter {
667 count: 0,
668 subscriptions_by_authority: vec![0; context.committee.size()],
669 }),
670 context,
671 }
672 }
673
674 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
675 let mut counter = self.counter.lock();
676 counter.count += 1;
677 counter.subscriptions_by_authority[peer] += 1;
678
679 let peer_hostname = &self.context.committee.authority(peer).hostname;
680 self.context
681 .metrics
682 .node_metrics
683 .subscribed_by
684 .with_label_values(&[peer_hostname])
685 .set(1);
686
687 Ok(())
688 }
689
690 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
691 let mut counter = self.counter.lock();
692 counter.count -= 1;
693 counter.subscriptions_by_authority[peer] -= 1;
694
695 if counter.subscriptions_by_authority[peer] == 0 {
696 let peer_hostname = &self.context.committee.authority(peer).hostname;
697 self.context
698 .metrics
699 .node_metrics
700 .subscribed_by
701 .with_label_values(&[peer_hostname])
702 .set(0);
703 }
704
705 Ok(())
706 }
707}
708
709type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
712
713struct BroadcastStream<T> {
716 peer: AuthorityIndex,
717 inner: ReusableBoxFuture<
719 'static,
720 (
721 Result<T, broadcast::error::RecvError>,
722 broadcast::Receiver<T>,
723 ),
724 >,
725 subscription_counter: Arc<SubscriptionCounter>,
727}
728
729impl<T: 'static + Clone + Send> BroadcastStream<T> {
730 pub fn new(
731 peer: AuthorityIndex,
732 rx: broadcast::Receiver<T>,
733 subscription_counter: Arc<SubscriptionCounter>,
734 ) -> Self {
735 if let Err(err) = subscription_counter.increment(peer) {
736 match err {
737 ConsensusError::Shutdown => {}
738 _ => panic!("Unexpected error: {err}"),
739 }
740 }
741 Self {
742 peer,
743 inner: ReusableBoxFuture::new(make_recv_future(rx)),
744 subscription_counter,
745 }
746 }
747}
748
749impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
750 type Item = T;
751
752 fn poll_next(
753 mut self: Pin<&mut Self>,
754 cx: &mut task::Context<'_>,
755 ) -> task::Poll<Option<Self::Item>> {
756 let peer = self.peer;
757 let maybe_item = loop {
758 let (result, rx) = ready!(self.inner.poll(cx));
759 self.inner.set(make_recv_future(rx));
760
761 match result {
762 Ok(item) => break Some(item),
763 Err(broadcast::error::RecvError::Closed) => {
764 info!("Block BroadcastedBlockStream {} closed", peer);
765 break None;
766 }
767 Err(broadcast::error::RecvError::Lagged(n)) => {
768 warn!(
769 "Block BroadcastedBlockStream {} lagged by {} messages",
770 peer, n
771 );
772 continue;
773 }
774 }
775 };
776 task::Poll::Ready(maybe_item)
777 }
778}
779
780impl<T> Drop for BroadcastStream<T> {
781 fn drop(&mut self) {
782 if let Err(err) = self.subscription_counter.decrement(self.peer) {
783 match err {
784 ConsensusError::Shutdown => {}
785 _ => panic!("Unexpected error: {err}"),
786 }
787 }
788 }
789}
790
791async fn make_recv_future<T: Clone>(
792 mut rx: broadcast::Receiver<T>,
793) -> (
794 Result<T, broadcast::error::RecvError>,
795 broadcast::Receiver<T>,
796) {
797 let result = rx.recv().await;
798 (result, rx)
799}
800
801#[cfg(test)]
804mod tests {
805 use std::{
806 collections::{BTreeMap, BTreeSet},
807 sync::Arc,
808 time::Duration,
809 };
810
811 use async_trait::async_trait;
812 use bytes::Bytes;
813 use consensus_config::AuthorityIndex;
814 use consensus_types::block::{BlockDigest, BlockRef, Round};
815 use mysten_metrics::monitored_mpsc;
816 use parking_lot::{Mutex, RwLock};
817 use tokio::{sync::broadcast, time::sleep};
818
819 use futures::StreamExt as _;
820
821 use crate::{
822 authority_service::AuthorityService,
823 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
824 commit::{CertifiedCommits, CommitRange},
825 commit_vote_monitor::CommitVoteMonitor,
826 context::Context,
827 core_thread::{CoreError, CoreThreadDispatcher},
828 dag_state::DagState,
829 error::ConsensusResult,
830 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
831 round_tracker::RoundTracker,
832 storage::mem_store::MemStore,
833 synchronizer::Synchronizer,
834 test_dag_builder::DagBuilder,
835 transaction_certifier::TransactionCertifier,
836 };
837 struct FakeCoreThreadDispatcher {
838 blocks: Mutex<Vec<VerifiedBlock>>,
839 }
840
841 impl FakeCoreThreadDispatcher {
842 fn new() -> Self {
843 Self {
844 blocks: Mutex::new(vec![]),
845 }
846 }
847
848 fn get_blocks(&self) -> Vec<VerifiedBlock> {
849 self.blocks.lock().clone()
850 }
851 }
852
853 #[async_trait]
854 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
855 async fn add_blocks(
856 &self,
857 blocks: Vec<VerifiedBlock>,
858 ) -> Result<BTreeSet<BlockRef>, CoreError> {
859 let block_refs = blocks.iter().map(|b| b.reference()).collect();
860 self.blocks.lock().extend(blocks);
861 Ok(block_refs)
862 }
863
864 async fn check_block_refs(
865 &self,
866 _block_refs: Vec<BlockRef>,
867 ) -> Result<BTreeSet<BlockRef>, CoreError> {
868 Ok(BTreeSet::new())
869 }
870
871 async fn add_certified_commits(
872 &self,
873 _commits: CertifiedCommits,
874 ) -> Result<BTreeSet<BlockRef>, CoreError> {
875 todo!()
876 }
877
878 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
879 Ok(())
880 }
881
882 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
883 Ok(Default::default())
884 }
885
886 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
887 todo!()
888 }
889
890 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
891 todo!()
892 }
893 }
894
895 #[derive(Default)]
896 struct FakeNetworkClient {}
897
898 #[async_trait]
899 impl NetworkClient for FakeNetworkClient {
900 async fn send_block(
901 &self,
902 _peer: AuthorityIndex,
903 _block: &VerifiedBlock,
904 _timeout: Duration,
905 ) -> ConsensusResult<()> {
906 unimplemented!("Unimplemented")
907 }
908
909 async fn subscribe_blocks(
910 &self,
911 _peer: AuthorityIndex,
912 _last_received: Round,
913 _timeout: Duration,
914 ) -> ConsensusResult<BlockStream> {
915 unimplemented!("Unimplemented")
916 }
917
918 async fn fetch_blocks(
919 &self,
920 _peer: AuthorityIndex,
921 _block_refs: Vec<BlockRef>,
922 _highest_accepted_rounds: Vec<Round>,
923 _breadth_first: bool,
924 _timeout: Duration,
925 ) -> ConsensusResult<Vec<Bytes>> {
926 unimplemented!("Unimplemented")
927 }
928
929 async fn fetch_commits(
930 &self,
931 _peer: AuthorityIndex,
932 _commit_range: CommitRange,
933 _timeout: Duration,
934 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
935 unimplemented!("Unimplemented")
936 }
937
938 async fn fetch_latest_blocks(
939 &self,
940 _peer: AuthorityIndex,
941 _authorities: Vec<AuthorityIndex>,
942 _timeout: Duration,
943 ) -> ConsensusResult<Vec<Bytes>> {
944 unimplemented!("Unimplemented")
945 }
946
947 async fn get_latest_rounds(
948 &self,
949 _peer: AuthorityIndex,
950 _timeout: Duration,
951 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
952 unimplemented!("Unimplemented")
953 }
954 }
955
956 #[tokio::test(flavor = "current_thread", start_paused = true)]
957 async fn test_handle_send_block() {
958 let (context, _keys) = Context::new_for_test(4);
959 let context = Arc::new(context);
960 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
961 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
962 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
963 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
964 let network_client = Arc::new(FakeNetworkClient::default());
965 let (blocks_sender, _blocks_receiver) =
966 monitored_mpsc::unbounded_channel("consensus_block_output");
967 let store = Arc::new(MemStore::new());
968 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
969 let transaction_certifier = TransactionCertifier::new(
970 context.clone(),
971 block_verifier.clone(),
972 dag_state.clone(),
973 blocks_sender,
974 );
975 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
976 let synchronizer = Synchronizer::start(
977 network_client,
978 context.clone(),
979 core_dispatcher.clone(),
980 commit_vote_monitor.clone(),
981 block_verifier.clone(),
982 transaction_certifier.clone(),
983 round_tracker.clone(),
984 dag_state.clone(),
985 false,
986 );
987 let authority_service = Arc::new(AuthorityService::new(
988 context.clone(),
989 block_verifier,
990 commit_vote_monitor,
991 round_tracker,
992 synchronizer,
993 core_dispatcher.clone(),
994 rx_block_broadcast,
995 transaction_certifier,
996 dag_state,
997 store,
998 ));
999
1000 let now = context.clock.timestamp_utc_ms();
1002 let max_drift = context.parameters.max_forward_time_drift;
1003 let input_block = VerifiedBlock::new_for_test(
1004 TestBlock::new(9, 0)
1005 .set_timestamp_ms(now + max_drift.as_millis() as u64)
1006 .build(),
1007 );
1008
1009 let service = authority_service.clone();
1010 let serialized = ExtendedSerializedBlock {
1011 block: input_block.serialized().clone(),
1012 excluded_ancestors: vec![],
1013 };
1014
1015 tokio::spawn({
1016 let service = service.clone();
1017 let context = context.clone();
1018 async move {
1019 service
1020 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1021 .await
1022 .unwrap();
1023 }
1024 });
1025
1026 sleep(max_drift / 2).await;
1027
1028 let blocks = core_dispatcher.get_blocks();
1029 assert_eq!(blocks.len(), 1);
1030 assert_eq!(blocks[0], input_block);
1031
1032 let invalid_block =
1034 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1035 let extended_block = ExtendedSerializedBlock {
1036 block: invalid_block.serialized().clone(),
1037 excluded_ancestors: vec![],
1038 };
1039 service
1040 .handle_send_block(
1041 context.committee.to_authority_index(0).unwrap(),
1042 extended_block,
1043 )
1044 .await
1045 .unwrap_err();
1046
1047 let invalid_excluded_ancestors = vec![
1049 bcs::to_bytes(&BlockRef::new(
1050 10,
1051 AuthorityIndex::new_for_test(1000),
1052 BlockDigest::MIN,
1053 ))
1054 .unwrap(),
1055 vec![3u8; 40],
1056 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1057 ];
1058 let extended_block = ExtendedSerializedBlock {
1059 block: input_block.serialized().clone(),
1060 excluded_ancestors: invalid_excluded_ancestors,
1061 };
1062 service
1063 .handle_send_block(
1064 context.committee.to_authority_index(0).unwrap(),
1065 extended_block,
1066 )
1067 .await
1068 .unwrap_err();
1069 }
1070
1071 #[tokio::test(flavor = "current_thread", start_paused = true)]
1072 async fn test_handle_fetch_blocks() {
1073 const NUM_AUTHORITIES: usize = 40;
1076 const NUM_ROUNDS: usize = 40;
1077 let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1078 let context = Arc::new(context);
1079 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1080 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1081 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1082 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1083 let network_client = Arc::new(FakeNetworkClient::default());
1084 let (blocks_sender, _blocks_receiver) =
1085 monitored_mpsc::unbounded_channel("consensus_block_output");
1086 let store = Arc::new(MemStore::new());
1087 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1088 let transaction_certifier = TransactionCertifier::new(
1089 context.clone(),
1090 block_verifier.clone(),
1091 dag_state.clone(),
1092 blocks_sender,
1093 );
1094 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1095 let synchronizer = Synchronizer::start(
1096 network_client,
1097 context.clone(),
1098 core_dispatcher.clone(),
1099 commit_vote_monitor.clone(),
1100 block_verifier.clone(),
1101 transaction_certifier.clone(),
1102 round_tracker.clone(),
1103 dag_state.clone(),
1104 false,
1105 );
1106 let authority_service = Arc::new(AuthorityService::new(
1107 context.clone(),
1108 block_verifier,
1109 commit_vote_monitor,
1110 round_tracker,
1111 synchronizer,
1112 core_dispatcher.clone(),
1113 rx_block_broadcast,
1114 transaction_certifier,
1115 dag_state.clone(),
1116 store,
1117 ));
1118
1119 let mut dag_builder = DagBuilder::new(context.clone());
1121 dag_builder
1122 .layers(1..=(NUM_ROUNDS as u32))
1123 .build()
1124 .persist_layers(dag_state.clone());
1125 let all_blocks = dag_builder.all_blocks();
1126
1127 let missing_block_refs: Vec<BlockRef> = all_blocks
1129 .iter()
1130 .rev()
1131 .take(2)
1132 .map(|b| b.reference())
1133 .collect();
1134 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1135 let results = authority_service
1136 .handle_fetch_blocks(
1137 AuthorityIndex::new_for_test(0),
1138 missing_block_refs.clone(),
1139 highest_accepted_rounds,
1140 true,
1141 )
1142 .await
1143 .unwrap();
1144
1145 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1147 .iter()
1148 .map(|b| {
1149 let signed = bcs::from_bytes(b).unwrap();
1150 let block = VerifiedBlock::new_verified(signed, b.clone());
1151 (block.reference(), block)
1152 })
1153 .collect();
1154 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1155 for b in &missing_block_refs {
1157 assert!(blocks.contains_key(b));
1158 }
1159 let num_missing_ancestors = blocks
1160 .keys()
1161 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1162 .count();
1163 assert_eq!(
1164 num_missing_ancestors,
1165 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1166 );
1167
1168 let missing_round = NUM_ROUNDS as Round - 3;
1170 let missing_block_refs: Vec<BlockRef> = all_blocks
1171 .iter()
1172 .filter(|b| b.reference().round == missing_round)
1173 .map(|b| b.reference())
1174 .take(2)
1175 .collect();
1176 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1177 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1179 let results = authority_service
1180 .handle_fetch_blocks(
1181 AuthorityIndex::new_for_test(0),
1182 missing_block_refs.clone(),
1183 highest_accepted_rounds,
1184 false,
1185 )
1186 .await
1187 .unwrap();
1188
1189 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1191 .iter()
1192 .map(|b| {
1193 let signed = bcs::from_bytes(b).unwrap();
1194 let block = VerifiedBlock::new_verified(signed, b.clone());
1195 (block.reference(), block)
1196 })
1197 .collect();
1198 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1199 for b in &missing_block_refs {
1201 assert!(blocks.contains_key(b));
1202 }
1203 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1205 for b in blocks.keys() {
1206 assert!(b.round <= missing_round);
1207 assert!(expected_authors.contains(&b.author));
1208 }
1209
1210 let missing_block_refs: Vec<BlockRef> = all_blocks
1212 .iter()
1213 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1214 .map(|b| b.reference())
1215 .take(5)
1216 .collect();
1217 let results = authority_service
1218 .handle_fetch_blocks(
1219 AuthorityIndex::new_for_test(0),
1220 missing_block_refs.clone(),
1221 vec![],
1222 false,
1223 )
1224 .await
1225 .unwrap();
1226
1227 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1229 .iter()
1230 .map(|b| {
1231 let signed = bcs::from_bytes(b).unwrap();
1232 let block = VerifiedBlock::new_verified(signed, b.clone());
1233 (block.reference(), block)
1234 })
1235 .collect();
1236 assert_eq!(blocks.len(), 5);
1237 for b in &missing_block_refs {
1238 assert!(blocks.contains_key(b));
1239 }
1240 }
1241
1242 #[tokio::test(flavor = "current_thread", start_paused = true)]
1243 async fn test_handle_fetch_latest_blocks() {
1244 let (context, _keys) = Context::new_for_test(4);
1246 let context = Arc::new(context);
1247 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1248 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1249 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1250 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1251 let network_client = Arc::new(FakeNetworkClient::default());
1252 let (blocks_sender, _blocks_receiver) =
1253 monitored_mpsc::unbounded_channel("consensus_block_output");
1254 let store = Arc::new(MemStore::new());
1255 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1256 let transaction_certifier = TransactionCertifier::new(
1257 context.clone(),
1258 block_verifier.clone(),
1259 dag_state.clone(),
1260 blocks_sender,
1261 );
1262 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1263 let synchronizer = Synchronizer::start(
1264 network_client,
1265 context.clone(),
1266 core_dispatcher.clone(),
1267 commit_vote_monitor.clone(),
1268 block_verifier.clone(),
1269 transaction_certifier.clone(),
1270 round_tracker.clone(),
1271 dag_state.clone(),
1272 true,
1273 );
1274 let authority_service = Arc::new(AuthorityService::new(
1275 context.clone(),
1276 block_verifier,
1277 commit_vote_monitor,
1278 round_tracker,
1279 synchronizer,
1280 core_dispatcher.clone(),
1281 rx_block_broadcast,
1282 transaction_certifier,
1283 dag_state.clone(),
1284 store,
1285 ));
1286
1287 let mut dag_builder = DagBuilder::new(context.clone());
1289 dag_builder
1290 .layers(1..=10)
1291 .authorities(vec![AuthorityIndex::new_for_test(2)])
1292 .equivocate(1)
1293 .build()
1294 .persist_layers(dag_state);
1295
1296 let authorities_to_request = vec![
1298 AuthorityIndex::new_for_test(1),
1299 AuthorityIndex::new_for_test(2),
1300 ];
1301 let results = authority_service
1302 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1303 .await;
1304
1305 let serialised_blocks = results.unwrap();
1307 for serialised_block in serialised_blocks {
1308 let signed_block: SignedBlock =
1309 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1310 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1311
1312 assert_eq!(verified_block.round(), 10);
1313 }
1314 }
1315
1316 #[tokio::test(flavor = "current_thread", start_paused = true)]
1317 async fn test_handle_subscribe_blocks() {
1318 let (context, _keys) = Context::new_for_test(4);
1319 let context = Arc::new(context);
1320 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1321 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1322 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1323 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1324 let network_client = Arc::new(FakeNetworkClient::default());
1325 let (blocks_sender, _blocks_receiver) =
1326 monitored_mpsc::unbounded_channel("consensus_block_output");
1327 let store = Arc::new(MemStore::new());
1328 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1329 let transaction_certifier = TransactionCertifier::new(
1330 context.clone(),
1331 block_verifier.clone(),
1332 dag_state.clone(),
1333 blocks_sender,
1334 );
1335 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1336 let synchronizer = Synchronizer::start(
1337 network_client,
1338 context.clone(),
1339 core_dispatcher.clone(),
1340 commit_vote_monitor.clone(),
1341 block_verifier.clone(),
1342 transaction_certifier.clone(),
1343 round_tracker.clone(),
1344 dag_state.clone(),
1345 false,
1346 );
1347
1348 dag_state
1350 .write()
1351 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(5, 0).build()));
1352 dag_state
1353 .write()
1354 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(10, 0).build()));
1355 dag_state
1356 .write()
1357 .accept_block(VerifiedBlock::new_for_test(TestBlock::new(15, 0).build()));
1358
1359 let authority_service = Arc::new(AuthorityService::new(
1360 context.clone(),
1361 block_verifier,
1362 commit_vote_monitor,
1363 round_tracker,
1364 synchronizer,
1365 core_dispatcher.clone(),
1366 rx_block_broadcast,
1367 transaction_certifier,
1368 dag_state.clone(),
1369 store,
1370 ));
1371
1372 let peer = context.committee.to_authority_index(1).unwrap();
1373
1374 {
1377 let mut stream = authority_service
1378 .handle_subscribe_blocks(peer, 100)
1379 .await
1380 .unwrap();
1381 let block: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1382 assert_eq!(
1383 block.round(),
1384 15,
1385 "Should return last proposed block as fallback"
1386 );
1387 assert_eq!(block.author().value(), 0);
1388 }
1389
1390 {
1393 let mut stream = authority_service
1394 .handle_subscribe_blocks(peer, 7)
1395 .await
1396 .unwrap();
1397
1398 let block1: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1399 assert_eq!(block1.round(), 10, "Should return block at round 10");
1400
1401 let block2: SignedBlock = bcs::from_bytes(&stream.next().await.unwrap().block).unwrap();
1402 assert_eq!(block2.round(), 15, "Should return block at round 15");
1403 }
1404 }
1405
1406 #[tokio::test(flavor = "current_thread", start_paused = true)]
1407 async fn test_handle_subscribe_blocks_not_proposed() {
1408 let (context, _keys) = Context::new_for_test(4);
1409 let context = Arc::new(context);
1410 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1411 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1412 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1413 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1414 let network_client = Arc::new(FakeNetworkClient::default());
1415 let (blocks_sender, _blocks_receiver) =
1416 monitored_mpsc::unbounded_channel("consensus_block_output");
1417 let store = Arc::new(MemStore::new());
1418 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1419 let transaction_certifier = TransactionCertifier::new(
1420 context.clone(),
1421 block_verifier.clone(),
1422 dag_state.clone(),
1423 blocks_sender,
1424 );
1425 let round_tracker = Arc::new(RwLock::new(RoundTracker::new(context.clone(), vec![])));
1426 let synchronizer = Synchronizer::start(
1427 network_client,
1428 context.clone(),
1429 core_dispatcher.clone(),
1430 commit_vote_monitor.clone(),
1431 block_verifier.clone(),
1432 transaction_certifier.clone(),
1433 round_tracker.clone(),
1434 dag_state.clone(),
1435 false,
1436 );
1437
1438 let authority_service = Arc::new(AuthorityService::new(
1441 context.clone(),
1442 block_verifier,
1443 commit_vote_monitor,
1444 round_tracker,
1445 synchronizer,
1446 core_dispatcher.clone(),
1447 rx_block_broadcast,
1448 transaction_certifier,
1449 dag_state.clone(),
1450 store,
1451 ));
1452
1453 let peer = context.committee.to_authority_index(1).unwrap();
1454
1455 let mut stream = authority_service
1457 .handle_subscribe_blocks(peer, 0)
1458 .await
1459 .unwrap();
1460
1461 use futures::poll;
1463 use std::task::Poll;
1464 let poll_result = poll!(stream.next());
1465 assert!(
1466 matches!(poll_result, Poll::Pending),
1467 "Should not receive genesis block on subscription stream"
1468 );
1469 }
1470}