1use std::{
5 collections::{BTreeMap, BTreeSet},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26 CommitIndex,
27 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28 block_verifier::BlockVerifier,
29 commit::{CommitAPI as _, CommitRange, TrustedCommit},
30 commit_vote_monitor::CommitVoteMonitor,
31 context::Context,
32 core_thread::CoreThreadDispatcher,
33 dag_state::DagState,
34 error::{ConsensusError, ConsensusResult},
35 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36 round_tracker::PeerRoundTracker,
37 stake_aggregator::{QuorumThreshold, StakeAggregator},
38 storage::Store,
39 synchronizer::SynchronizerHandle,
40 transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47 context: Arc<Context>,
48 commit_vote_monitor: Arc<CommitVoteMonitor>,
49 block_verifier: Arc<dyn BlockVerifier>,
50 synchronizer: Arc<SynchronizerHandle>,
51 core_dispatcher: Arc<C>,
52 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53 subscription_counter: Arc<SubscriptionCounter>,
54 transaction_certifier: TransactionCertifier,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 round_tracker: Arc<RwLock<PeerRoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61 pub(crate) fn new(
62 context: Arc<Context>,
63 block_verifier: Arc<dyn BlockVerifier>,
64 commit_vote_monitor: Arc<CommitVoteMonitor>,
65 round_tracker: Arc<RwLock<PeerRoundTracker>>,
66 synchronizer: Arc<SynchronizerHandle>,
67 core_dispatcher: Arc<C>,
68 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69 transaction_certifier: TransactionCertifier,
70 dag_state: Arc<RwLock<DagState>>,
71 store: Arc<dyn Store>,
72 ) -> Self {
73 let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74 Self {
75 context,
76 block_verifier,
77 commit_vote_monitor,
78 synchronizer,
79 core_dispatcher,
80 rx_block_broadcast,
81 subscription_counter,
82 transaction_certifier,
83 dag_state,
84 store,
85 round_tracker,
86 }
87 }
88
89 fn parse_excluded_ancestors(
91 &self,
92 peer: AuthorityIndex,
93 block: &VerifiedBlock,
94 mut excluded_ancestors: Vec<Vec<u8>>,
95 ) -> ConsensusResult<Vec<BlockRef>> {
96 let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98 let excluded_ancestors_limit = self.context.committee.size() * 2;
99 if excluded_ancestors.len() > excluded_ancestors_limit {
100 debug!(
101 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102 excluded_ancestors.len() - excluded_ancestors_limit,
103 peer,
104 peer_hostname,
105 );
106 excluded_ancestors.truncate(excluded_ancestors_limit);
107 }
108
109 let excluded_ancestors = excluded_ancestors
110 .into_iter()
111 .map(|serialized| {
112 let block_ref: BlockRef =
113 bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114 if !self.context.committee.is_valid_index(block_ref.author) {
115 return Err(ConsensusError::InvalidAuthorityIndex {
116 index: block_ref.author,
117 max: self.context.committee.size(),
118 });
119 }
120 if block_ref.round >= block.round() {
121 return Err(ConsensusError::InvalidAncestorRound {
122 ancestor: block_ref.round,
123 block: block.round(),
124 });
125 }
126 Ok(block_ref)
127 })
128 .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130 for excluded_ancestor in &excluded_ancestors {
131 let excluded_ancestor_hostname = &self
132 .context
133 .committee
134 .authority(excluded_ancestor.author)
135 .hostname;
136 self.context
137 .metrics
138 .node_metrics
139 .network_excluded_ancestors_count_by_authority
140 .with_label_values(&[excluded_ancestor_hostname])
141 .inc();
142 }
143 self.context
144 .metrics
145 .node_metrics
146 .network_received_excluded_ancestors_from_authority
147 .with_label_values(&[peer_hostname])
148 .inc_by(excluded_ancestors.len() as u64);
149
150 Ok(excluded_ancestors)
151 }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156 async fn handle_send_block(
157 &self,
158 peer: AuthorityIndex,
159 serialized_block: ExtendedSerializedBlock,
160 ) -> ConsensusResult<()> {
161 fail_point_async!("consensus-rpc-response");
162
163 let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165 let signed_block: SignedBlock =
167 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169 if peer != signed_block.author() {
171 self.context
172 .metrics
173 .node_metrics
174 .invalid_blocks
175 .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
176 .inc();
177 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
178 info!("Block with wrong authority from {}: {}", peer, e);
179 return Err(e);
180 }
181
182 let (verified_block, reject_txn_votes) = self
184 .block_verifier
185 .verify_and_vote(signed_block, serialized_block.block)
186 .tap_err(|e| {
187 self.context
188 .metrics
189 .node_metrics
190 .invalid_blocks
191 .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
192 .inc();
193 info!("Invalid block from {}: {}", peer, e);
194 })?;
195 let block_ref = verified_block.reference();
196 debug!("Received block {} via send block.", block_ref);
197
198 let now = self.context.clock.timestamp_utc_ms();
199 let forward_time_drift =
200 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
201
202 self.context
203 .metrics
204 .node_metrics
205 .block_timestamp_drift_ms
206 .with_label_values(&[peer_hostname, "handle_send_block"])
207 .inc_by(forward_time_drift.as_millis() as u64);
208
209 self.commit_vote_monitor.observe_block(&verified_block);
212
213 let last_commit_index = self.dag_state.read().last_commit_index();
222 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
223 if last_commit_index
226 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
227 < quorum_commit_index
228 {
229 self.context
230 .metrics
231 .node_metrics
232 .rejected_blocks
233 .with_label_values(&["commit_lagging"])
234 .inc();
235 debug!(
236 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
237 block_ref, last_commit_index, quorum_commit_index,
238 );
239 return Err(ConsensusError::BlockRejected {
240 block_ref,
241 reason: format!(
242 "Last commit index is lagging quorum commit index too much ({} < {})",
243 last_commit_index, quorum_commit_index,
244 ),
245 });
246 }
247
248 self.context
249 .metrics
250 .node_metrics
251 .verified_blocks
252 .with_label_values(&[peer_hostname])
253 .inc();
254
255 if self.context.protocol_config.mysticeti_fastpath() {
257 self.transaction_certifier
258 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
259 }
260
261 let missing_ancestors = self
263 .core_dispatcher
264 .add_blocks(vec![verified_block.clone()])
265 .await
266 .map_err(|_| ConsensusError::Shutdown)?;
267
268 if !missing_ancestors.is_empty() {
270 self.context
271 .metrics
272 .node_metrics
273 .handler_received_block_missing_ancestors
274 .with_label_values(&[peer_hostname])
275 .inc_by(missing_ancestors.len() as u64);
276 let synchronizer = self.synchronizer.clone();
277 spawn_monitored_task!(async move {
278 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
283 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
284 }
285 });
286 }
287
288 let excluded_ancestors = self
291 .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
292 .tap_err(|e| {
293 debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
294 self.context
295 .metrics
296 .node_metrics
297 .invalid_blocks
298 .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
299 .inc();
300 })?;
301
302 self.round_tracker
303 .write()
304 .update_from_verified_block(&ExtendedBlock {
305 block: verified_block,
306 excluded_ancestors: excluded_ancestors.clone(),
307 });
308
309 let missing_excluded_ancestors = self
310 .core_dispatcher
311 .check_block_refs(excluded_ancestors)
312 .await
313 .map_err(|_| ConsensusError::Shutdown)?;
314
315 if !missing_excluded_ancestors.is_empty() {
317 self.context
318 .metrics
319 .node_metrics
320 .network_excluded_ancestors_sent_to_fetch
321 .with_label_values(&[peer_hostname])
322 .inc_by(missing_excluded_ancestors.len() as u64);
323
324 let synchronizer = self.synchronizer.clone();
325 spawn_monitored_task!(async move {
326 if let Err(err) = synchronizer
327 .fetch_blocks(missing_excluded_ancestors, peer)
328 .await
329 {
330 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
331 }
332 });
333 }
334
335 Ok(())
336 }
337
338 async fn handle_subscribe_blocks(
339 &self,
340 peer: AuthorityIndex,
341 last_received: Round,
342 ) -> ConsensusResult<BlockStream> {
343 fail_point_async!("consensus-rpc-response");
344
345 let dag_state = self.dag_state.read();
346 let missed_blocks = stream::iter(
350 dag_state
351 .get_cached_blocks(self.context.own_index, last_received + 1)
352 .into_iter()
353 .map(|block| ExtendedSerializedBlock {
354 block: block.serialized().clone(),
355 excluded_ancestors: vec![],
356 }),
357 );
358
359 let broadcasted_blocks = BroadcastedBlockStream::new(
360 peer,
361 self.rx_block_broadcast.resubscribe(),
362 self.subscription_counter.clone(),
363 );
364
365 Ok(Box::pin(missed_blocks.chain(
367 broadcasted_blocks.map(ExtendedSerializedBlock::from),
368 )))
369 }
370
371 async fn handle_fetch_blocks(
379 &self,
380 _peer: AuthorityIndex,
381 mut block_refs: Vec<BlockRef>,
382 highest_accepted_rounds: Vec<Round>,
383 breadth_first: bool,
384 ) -> ConsensusResult<Vec<Bytes>> {
385 fail_point_async!("consensus-rpc-response");
386
387 if !highest_accepted_rounds.is_empty()
388 && highest_accepted_rounds.len() != self.context.committee.size()
389 {
390 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
391 highest_accepted_rounds.len(),
392 self.context.committee.size(),
393 ));
394 }
395
396 let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
398 self.context.parameters.max_blocks_per_sync
399 } else {
400 self.context.parameters.max_blocks_per_fetch
401 };
402 if block_refs.len() > max_response_num_blocks {
403 block_refs.truncate(max_response_num_blocks);
404 }
405
406 for block in &block_refs {
408 if !self.context.committee.is_valid_index(block.author) {
409 return Err(ConsensusError::InvalidAuthorityIndex {
410 index: block.author,
411 max: self.context.committee.size(),
412 });
413 }
414 if block.round == GENESIS_ROUND {
415 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
416 }
417 }
418
419 let blocks = if !highest_accepted_rounds.is_empty() {
421 block_refs.sort();
422 block_refs.dedup();
423 let mut blocks = self
424 .dag_state
425 .read()
426 .get_blocks(&block_refs)
427 .into_iter()
428 .flatten()
429 .collect::<Vec<_>>();
430
431 if breadth_first {
432 let mut missing_ancestors = blocks
434 .iter()
435 .flat_map(|block| block.ancestors().to_vec())
436 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
437 .collect::<BTreeSet<_>>()
438 .into_iter()
439 .collect::<Vec<_>>();
440
441 let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
444 if selected_num_blocks < missing_ancestors.len() {
445 missing_ancestors = missing_ancestors
446 .choose_multiple(&mut rand::thread_rng(), selected_num_blocks)
447 .copied()
448 .collect::<Vec<_>>();
449 }
450 let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
451 blocks.extend(ancestor_blocks.into_iter().flatten());
452 } else {
453 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
456 for block_ref in blocks.iter().map(|b| b.reference()) {
457 let entry = lowest_missing_rounds
458 .entry(block_ref.author)
459 .or_insert(block_ref.round);
460 *entry = (*entry).min(block_ref.round);
461 }
462
463 let dag_state = self.dag_state.read();
469 for (authority, lowest_missing_round) in lowest_missing_rounds {
470 let highest_accepted_round = highest_accepted_rounds[authority];
471 if highest_accepted_round >= lowest_missing_round {
472 continue;
473 }
474 let missing_blocks = dag_state.get_cached_blocks_in_range(
475 authority,
476 highest_accepted_round + 1,
477 lowest_missing_round,
478 self.context
479 .parameters
480 .max_blocks_per_sync
481 .saturating_sub(blocks.len()),
482 );
483 blocks.extend(missing_blocks);
484 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
485 blocks.truncate(self.context.parameters.max_blocks_per_sync);
486 break;
487 }
488 }
489 }
490
491 blocks
492 } else {
493 self.dag_state
494 .read()
495 .get_blocks(&block_refs)
496 .into_iter()
497 .flatten()
498 .collect()
499 };
500
501 let bytes = blocks
503 .into_iter()
504 .map(|block| block.serialized().clone())
505 .collect::<Vec<_>>();
506 Ok(bytes)
507 }
508
509 async fn handle_fetch_commits(
510 &self,
511 _peer: AuthorityIndex,
512 commit_range: CommitRange,
513 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
514 fail_point_async!("consensus-rpc-response");
515
516 let inclusive_end = commit_range.end().min(
518 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
519 - 1,
520 );
521 let mut commits = self
522 .store
523 .scan_commits((commit_range.start()..=inclusive_end).into())?;
524 let mut certifier_block_refs = vec![];
525 'commit: while let Some(c) = commits.last() {
526 let index = c.index();
527 let votes = self.store.read_commit_votes(index)?;
528 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
529 for v in &votes {
530 stake_aggregator.add(v.author, &self.context.committee);
531 }
532 if stake_aggregator.reached_threshold(&self.context.committee) {
533 certifier_block_refs = votes;
534 break 'commit;
535 } else {
536 debug!(
537 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
538 index,
539 stake_aggregator.stake(),
540 stake_aggregator.threshold(&self.context.committee)
541 );
542 self.context
543 .metrics
544 .node_metrics
545 .commit_sync_fetch_commits_handler_uncertified_skipped
546 .inc();
547 commits.pop();
548 }
549 }
550 let certifier_blocks = self
551 .store
552 .read_blocks(&certifier_block_refs)?
553 .into_iter()
554 .flatten()
555 .collect();
556 Ok((commits, certifier_blocks))
557 }
558
559 async fn handle_fetch_latest_blocks(
560 &self,
561 peer: AuthorityIndex,
562 authorities: Vec<AuthorityIndex>,
563 ) -> ConsensusResult<Vec<Bytes>> {
564 fail_point_async!("consensus-rpc-response");
565
566 if authorities.len() > self.context.committee.size() {
567 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
568 }
569
570 for authority in &authorities {
572 if !self.context.committee.is_valid_index(*authority) {
573 return Err(ConsensusError::InvalidAuthorityIndex {
574 index: *authority,
575 max: self.context.committee.size(),
576 });
577 }
578 }
579
580 let mut blocks = vec![];
584 let dag_state = self.dag_state.read();
585 for authority in authorities {
586 let block = dag_state.get_last_block_for_authority(authority);
587
588 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
589
590 if block.round() != GENESIS_ROUND {
592 blocks.push(block);
593 }
594 }
595
596 let result = blocks
598 .into_iter()
599 .map(|block| block.serialized().clone())
600 .collect::<Vec<_>>();
601
602 Ok(result)
603 }
604
605 async fn handle_get_latest_rounds(
606 &self,
607 _peer: AuthorityIndex,
608 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
609 fail_point_async!("consensus-rpc-response");
610
611 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
612
613 let blocks = self
614 .dag_state
615 .read()
616 .get_last_cached_block_per_authority(Round::MAX);
617 let highest_accepted_rounds = blocks
618 .into_iter()
619 .map(|(block, _)| block.round())
620 .collect::<Vec<_>>();
621
622 highest_received_rounds[self.context.own_index] =
624 highest_accepted_rounds[self.context.own_index];
625
626 Ok((highest_received_rounds, highest_accepted_rounds))
627 }
628}
629
630struct Counter {
631 count: usize,
632 subscriptions_by_authority: Vec<usize>,
633}
634
635struct SubscriptionCounter {
637 context: Arc<Context>,
638 counter: parking_lot::Mutex<Counter>,
639}
640
641impl SubscriptionCounter {
642 fn new(context: Arc<Context>) -> Self {
643 for (_, authority) in context.committee.authorities() {
645 context
646 .metrics
647 .node_metrics
648 .subscribed_by
649 .with_label_values(&[authority.hostname.as_str()])
650 .set(0);
651 }
652
653 Self {
654 counter: parking_lot::Mutex::new(Counter {
655 count: 0,
656 subscriptions_by_authority: vec![0; context.committee.size()],
657 }),
658 context,
659 }
660 }
661
662 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
663 let mut counter = self.counter.lock();
664 counter.count += 1;
665 counter.subscriptions_by_authority[peer] += 1;
666
667 let peer_hostname = &self.context.committee.authority(peer).hostname;
668 self.context
669 .metrics
670 .node_metrics
671 .subscribed_by
672 .with_label_values(&[peer_hostname])
673 .set(1);
674
675 Ok(())
676 }
677
678 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
679 let mut counter = self.counter.lock();
680 counter.count -= 1;
681 counter.subscriptions_by_authority[peer] -= 1;
682
683 if counter.subscriptions_by_authority[peer] == 0 {
684 let peer_hostname = &self.context.committee.authority(peer).hostname;
685 self.context
686 .metrics
687 .node_metrics
688 .subscribed_by
689 .with_label_values(&[peer_hostname])
690 .set(0);
691 }
692
693 Ok(())
694 }
695}
696
697type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
700
701struct BroadcastStream<T> {
704 peer: AuthorityIndex,
705 inner: ReusableBoxFuture<
707 'static,
708 (
709 Result<T, broadcast::error::RecvError>,
710 broadcast::Receiver<T>,
711 ),
712 >,
713 subscription_counter: Arc<SubscriptionCounter>,
715}
716
717impl<T: 'static + Clone + Send> BroadcastStream<T> {
718 pub fn new(
719 peer: AuthorityIndex,
720 rx: broadcast::Receiver<T>,
721 subscription_counter: Arc<SubscriptionCounter>,
722 ) -> Self {
723 if let Err(err) = subscription_counter.increment(peer) {
724 match err {
725 ConsensusError::Shutdown => {}
726 _ => panic!("Unexpected error: {err}"),
727 }
728 }
729 Self {
730 peer,
731 inner: ReusableBoxFuture::new(make_recv_future(rx)),
732 subscription_counter,
733 }
734 }
735}
736
737impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
738 type Item = T;
739
740 fn poll_next(
741 mut self: Pin<&mut Self>,
742 cx: &mut task::Context<'_>,
743 ) -> task::Poll<Option<Self::Item>> {
744 let peer = self.peer;
745 let maybe_item = loop {
746 let (result, rx) = ready!(self.inner.poll(cx));
747 self.inner.set(make_recv_future(rx));
748
749 match result {
750 Ok(item) => break Some(item),
751 Err(broadcast::error::RecvError::Closed) => {
752 info!("Block BroadcastedBlockStream {} closed", peer);
753 break None;
754 }
755 Err(broadcast::error::RecvError::Lagged(n)) => {
756 warn!(
757 "Block BroadcastedBlockStream {} lagged by {} messages",
758 peer, n
759 );
760 continue;
761 }
762 }
763 };
764 task::Poll::Ready(maybe_item)
765 }
766}
767
768impl<T> Drop for BroadcastStream<T> {
769 fn drop(&mut self) {
770 if let Err(err) = self.subscription_counter.decrement(self.peer) {
771 match err {
772 ConsensusError::Shutdown => {}
773 _ => panic!("Unexpected error: {err}"),
774 }
775 }
776 }
777}
778
779async fn make_recv_future<T: Clone>(
780 mut rx: broadcast::Receiver<T>,
781) -> (
782 Result<T, broadcast::error::RecvError>,
783 broadcast::Receiver<T>,
784) {
785 let result = rx.recv().await;
786 (result, rx)
787}
788
789#[cfg(test)]
792mod tests {
793 use std::{
794 collections::{BTreeMap, BTreeSet},
795 sync::Arc,
796 time::Duration,
797 };
798
799 use async_trait::async_trait;
800 use bytes::Bytes;
801 use consensus_config::AuthorityIndex;
802 use consensus_types::block::{BlockDigest, BlockRef, Round};
803 use mysten_metrics::monitored_mpsc;
804 use parking_lot::{Mutex, RwLock};
805 use tokio::{sync::broadcast, time::sleep};
806
807 use crate::{
808 authority_service::AuthorityService,
809 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
810 commit::{CertifiedCommits, CommitRange},
811 commit_vote_monitor::CommitVoteMonitor,
812 context::Context,
813 core_thread::{CoreError, CoreThreadDispatcher},
814 dag_state::DagState,
815 error::ConsensusResult,
816 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
817 round_tracker::PeerRoundTracker,
818 storage::mem_store::MemStore,
819 synchronizer::Synchronizer,
820 test_dag_builder::DagBuilder,
821 transaction_certifier::TransactionCertifier,
822 };
823 struct FakeCoreThreadDispatcher {
824 blocks: Mutex<Vec<VerifiedBlock>>,
825 }
826
827 impl FakeCoreThreadDispatcher {
828 fn new() -> Self {
829 Self {
830 blocks: Mutex::new(vec![]),
831 }
832 }
833
834 fn get_blocks(&self) -> Vec<VerifiedBlock> {
835 self.blocks.lock().clone()
836 }
837 }
838
839 #[async_trait]
840 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
841 async fn add_blocks(
842 &self,
843 blocks: Vec<VerifiedBlock>,
844 ) -> Result<BTreeSet<BlockRef>, CoreError> {
845 let block_refs = blocks.iter().map(|b| b.reference()).collect();
846 self.blocks.lock().extend(blocks);
847 Ok(block_refs)
848 }
849
850 async fn check_block_refs(
851 &self,
852 _block_refs: Vec<BlockRef>,
853 ) -> Result<BTreeSet<BlockRef>, CoreError> {
854 Ok(BTreeSet::new())
855 }
856
857 async fn add_certified_commits(
858 &self,
859 _commits: CertifiedCommits,
860 ) -> Result<BTreeSet<BlockRef>, CoreError> {
861 todo!()
862 }
863
864 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
865 Ok(())
866 }
867
868 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
869 Ok(Default::default())
870 }
871
872 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
873 todo!()
874 }
875
876 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
877 todo!()
878 }
879
880 fn highest_received_rounds(&self) -> Vec<Round> {
881 todo!()
882 }
883 }
884
885 #[derive(Default)]
886 struct FakeNetworkClient {}
887
888 #[async_trait]
889 impl NetworkClient for FakeNetworkClient {
890 async fn send_block(
891 &self,
892 _peer: AuthorityIndex,
893 _block: &VerifiedBlock,
894 _timeout: Duration,
895 ) -> ConsensusResult<()> {
896 unimplemented!("Unimplemented")
897 }
898
899 async fn subscribe_blocks(
900 &self,
901 _peer: AuthorityIndex,
902 _last_received: Round,
903 _timeout: Duration,
904 ) -> ConsensusResult<BlockStream> {
905 unimplemented!("Unimplemented")
906 }
907
908 async fn fetch_blocks(
909 &self,
910 _peer: AuthorityIndex,
911 _block_refs: Vec<BlockRef>,
912 _highest_accepted_rounds: Vec<Round>,
913 _breadth_first: bool,
914 _timeout: Duration,
915 ) -> ConsensusResult<Vec<Bytes>> {
916 unimplemented!("Unimplemented")
917 }
918
919 async fn fetch_commits(
920 &self,
921 _peer: AuthorityIndex,
922 _commit_range: CommitRange,
923 _timeout: Duration,
924 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
925 unimplemented!("Unimplemented")
926 }
927
928 async fn fetch_latest_blocks(
929 &self,
930 _peer: AuthorityIndex,
931 _authorities: Vec<AuthorityIndex>,
932 _timeout: Duration,
933 ) -> ConsensusResult<Vec<Bytes>> {
934 unimplemented!("Unimplemented")
935 }
936
937 async fn get_latest_rounds(
938 &self,
939 _peer: AuthorityIndex,
940 _timeout: Duration,
941 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
942 unimplemented!("Unimplemented")
943 }
944 }
945
946 #[tokio::test(flavor = "current_thread", start_paused = true)]
947 async fn test_handle_send_block() {
948 let (context, _keys) = Context::new_for_test(4);
949 let context = Arc::new(context);
950 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
951 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
952 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
953 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
954 let network_client = Arc::new(FakeNetworkClient::default());
955 let (blocks_sender, _blocks_receiver) =
956 monitored_mpsc::unbounded_channel("consensus_block_output");
957 let store = Arc::new(MemStore::new());
958 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
959 let transaction_certifier = TransactionCertifier::new(
960 context.clone(),
961 block_verifier.clone(),
962 dag_state.clone(),
963 blocks_sender,
964 );
965 let synchronizer = Synchronizer::start(
966 network_client,
967 context.clone(),
968 core_dispatcher.clone(),
969 commit_vote_monitor.clone(),
970 block_verifier.clone(),
971 transaction_certifier.clone(),
972 dag_state.clone(),
973 false,
974 );
975 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
976 let authority_service = Arc::new(AuthorityService::new(
977 context.clone(),
978 block_verifier,
979 commit_vote_monitor,
980 round_tracker,
981 synchronizer,
982 core_dispatcher.clone(),
983 rx_block_broadcast,
984 transaction_certifier,
985 dag_state,
986 store,
987 ));
988
989 let now = context.clock.timestamp_utc_ms();
991 let max_drift = context.parameters.max_forward_time_drift;
992 let input_block = VerifiedBlock::new_for_test(
993 TestBlock::new(9, 0)
994 .set_timestamp_ms(now + max_drift.as_millis() as u64)
995 .build(),
996 );
997
998 let service = authority_service.clone();
999 let serialized = ExtendedSerializedBlock {
1000 block: input_block.serialized().clone(),
1001 excluded_ancestors: vec![],
1002 };
1003
1004 tokio::spawn({
1005 let service = service.clone();
1006 let context = context.clone();
1007 async move {
1008 service
1009 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1010 .await
1011 .unwrap();
1012 }
1013 });
1014
1015 sleep(max_drift / 2).await;
1016
1017 let blocks = core_dispatcher.get_blocks();
1018 assert_eq!(blocks.len(), 1);
1019 assert_eq!(blocks[0], input_block);
1020
1021 let invalid_block =
1023 VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1024 let extended_block = ExtendedSerializedBlock {
1025 block: invalid_block.serialized().clone(),
1026 excluded_ancestors: vec![],
1027 };
1028 service
1029 .handle_send_block(
1030 context.committee.to_authority_index(0).unwrap(),
1031 extended_block,
1032 )
1033 .await
1034 .unwrap_err();
1035
1036 let invalid_excluded_ancestors = vec![
1038 bcs::to_bytes(&BlockRef::new(
1039 10,
1040 AuthorityIndex::new_for_test(1000),
1041 BlockDigest::MIN,
1042 ))
1043 .unwrap(),
1044 vec![3u8; 40],
1045 bcs::to_bytes(&invalid_block.reference()).unwrap(),
1046 ];
1047 let extended_block = ExtendedSerializedBlock {
1048 block: input_block.serialized().clone(),
1049 excluded_ancestors: invalid_excluded_ancestors,
1050 };
1051 service
1052 .handle_send_block(
1053 context.committee.to_authority_index(0).unwrap(),
1054 extended_block,
1055 )
1056 .await
1057 .unwrap_err();
1058 }
1059
1060 #[tokio::test(flavor = "current_thread", start_paused = true)]
1061 async fn test_handle_fetch_blocks() {
1062 const NUM_AUTHORITIES: usize = 40;
1065 const NUM_ROUNDS: usize = 40;
1066 let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1067 let context = Arc::new(context);
1068 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1069 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1070 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1071 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1072 let network_client = Arc::new(FakeNetworkClient::default());
1073 let (blocks_sender, _blocks_receiver) =
1074 monitored_mpsc::unbounded_channel("consensus_block_output");
1075 let store = Arc::new(MemStore::new());
1076 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1077 let transaction_certifier = TransactionCertifier::new(
1078 context.clone(),
1079 block_verifier.clone(),
1080 dag_state.clone(),
1081 blocks_sender,
1082 );
1083 let synchronizer = Synchronizer::start(
1084 network_client,
1085 context.clone(),
1086 core_dispatcher.clone(),
1087 commit_vote_monitor.clone(),
1088 block_verifier.clone(),
1089 transaction_certifier.clone(),
1090 dag_state.clone(),
1091 false,
1092 );
1093 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1094 let authority_service = Arc::new(AuthorityService::new(
1095 context.clone(),
1096 block_verifier,
1097 commit_vote_monitor,
1098 round_tracker,
1099 synchronizer,
1100 core_dispatcher.clone(),
1101 rx_block_broadcast,
1102 transaction_certifier,
1103 dag_state.clone(),
1104 store,
1105 ));
1106
1107 let mut dag_builder = DagBuilder::new(context.clone());
1109 dag_builder
1110 .layers(1..=(NUM_ROUNDS as u32))
1111 .build()
1112 .persist_layers(dag_state.clone());
1113 let all_blocks = dag_builder.all_blocks();
1114
1115 let missing_block_refs: Vec<BlockRef> = all_blocks
1117 .iter()
1118 .rev()
1119 .take(2)
1120 .map(|b| b.reference())
1121 .collect();
1122 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1123 let results = authority_service
1124 .handle_fetch_blocks(
1125 AuthorityIndex::new_for_test(0),
1126 missing_block_refs.clone(),
1127 highest_accepted_rounds,
1128 true,
1129 )
1130 .await
1131 .unwrap();
1132
1133 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1135 .iter()
1136 .map(|b| {
1137 let signed = bcs::from_bytes(b).unwrap();
1138 let block = VerifiedBlock::new_verified(signed, b.clone());
1139 (block.reference(), block)
1140 })
1141 .collect();
1142 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1143 for b in &missing_block_refs {
1145 assert!(blocks.contains_key(b));
1146 }
1147 let num_missing_ancestors = blocks
1148 .keys()
1149 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1150 .count();
1151 assert_eq!(
1152 num_missing_ancestors,
1153 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1154 );
1155
1156 let missing_round = NUM_ROUNDS as Round - 3;
1158 let missing_block_refs: Vec<BlockRef> = all_blocks
1159 .iter()
1160 .filter(|b| b.reference().round == missing_round)
1161 .map(|b| b.reference())
1162 .take(2)
1163 .collect();
1164 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1165 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1167 let results = authority_service
1168 .handle_fetch_blocks(
1169 AuthorityIndex::new_for_test(0),
1170 missing_block_refs.clone(),
1171 highest_accepted_rounds,
1172 false,
1173 )
1174 .await
1175 .unwrap();
1176
1177 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1179 .iter()
1180 .map(|b| {
1181 let signed = bcs::from_bytes(b).unwrap();
1182 let block = VerifiedBlock::new_verified(signed, b.clone());
1183 (block.reference(), block)
1184 })
1185 .collect();
1186 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1187 for b in &missing_block_refs {
1189 assert!(blocks.contains_key(b));
1190 }
1191 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1193 for b in blocks.keys() {
1194 assert!(b.round <= missing_round);
1195 assert!(expected_authors.contains(&b.author));
1196 }
1197
1198 let missing_block_refs: Vec<BlockRef> = all_blocks
1200 .iter()
1201 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1202 .map(|b| b.reference())
1203 .take(5)
1204 .collect();
1205 let results = authority_service
1206 .handle_fetch_blocks(
1207 AuthorityIndex::new_for_test(0),
1208 missing_block_refs.clone(),
1209 vec![],
1210 false,
1211 )
1212 .await
1213 .unwrap();
1214
1215 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1217 .iter()
1218 .map(|b| {
1219 let signed = bcs::from_bytes(b).unwrap();
1220 let block = VerifiedBlock::new_verified(signed, b.clone());
1221 (block.reference(), block)
1222 })
1223 .collect();
1224 assert_eq!(blocks.len(), 5);
1225 for b in &missing_block_refs {
1226 assert!(blocks.contains_key(b));
1227 }
1228 }
1229
1230 #[tokio::test(flavor = "current_thread", start_paused = true)]
1231 async fn test_handle_fetch_latest_blocks() {
1232 let (context, _keys) = Context::new_for_test(4);
1234 let context = Arc::new(context);
1235 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1236 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1237 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1238 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1239 let network_client = Arc::new(FakeNetworkClient::default());
1240 let (blocks_sender, _blocks_receiver) =
1241 monitored_mpsc::unbounded_channel("consensus_block_output");
1242 let store = Arc::new(MemStore::new());
1243 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1244 let transaction_certifier = TransactionCertifier::new(
1245 context.clone(),
1246 block_verifier.clone(),
1247 dag_state.clone(),
1248 blocks_sender,
1249 );
1250 let synchronizer = Synchronizer::start(
1251 network_client,
1252 context.clone(),
1253 core_dispatcher.clone(),
1254 commit_vote_monitor.clone(),
1255 block_verifier.clone(),
1256 transaction_certifier.clone(),
1257 dag_state.clone(),
1258 true,
1259 );
1260 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1261 let authority_service = Arc::new(AuthorityService::new(
1262 context.clone(),
1263 block_verifier,
1264 commit_vote_monitor,
1265 round_tracker,
1266 synchronizer,
1267 core_dispatcher.clone(),
1268 rx_block_broadcast,
1269 transaction_certifier,
1270 dag_state.clone(),
1271 store,
1272 ));
1273
1274 let mut dag_builder = DagBuilder::new(context.clone());
1276 dag_builder
1277 .layers(1..=10)
1278 .authorities(vec![AuthorityIndex::new_for_test(2)])
1279 .equivocate(1)
1280 .build()
1281 .persist_layers(dag_state);
1282
1283 let authorities_to_request = vec![
1285 AuthorityIndex::new_for_test(1),
1286 AuthorityIndex::new_for_test(2),
1287 ];
1288 let results = authority_service
1289 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1290 .await;
1291
1292 let serialised_blocks = results.unwrap();
1294 for serialised_block in serialised_blocks {
1295 let signed_block: SignedBlock =
1296 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1297 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1298
1299 assert_eq!(verified_block.round(), 10);
1300 }
1301 }
1302}