1use std::{
5 collections::{BTreeMap, BTreeSet},
6 pin::Pin,
7 sync::Arc,
8 time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26 CommitIndex,
27 block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28 block_verifier::BlockVerifier,
29 commit::{CommitAPI as _, CommitRange, TrustedCommit},
30 commit_vote_monitor::CommitVoteMonitor,
31 context::Context,
32 core_thread::CoreThreadDispatcher,
33 dag_state::DagState,
34 error::{ConsensusError, ConsensusResult},
35 network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36 round_tracker::PeerRoundTracker,
37 stake_aggregator::{QuorumThreshold, StakeAggregator},
38 storage::Store,
39 synchronizer::SynchronizerHandle,
40 transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47 context: Arc<Context>,
48 commit_vote_monitor: Arc<CommitVoteMonitor>,
49 block_verifier: Arc<dyn BlockVerifier>,
50 synchronizer: Arc<SynchronizerHandle>,
51 core_dispatcher: Arc<C>,
52 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53 subscription_counter: Arc<SubscriptionCounter>,
54 transaction_certifier: TransactionCertifier,
55 dag_state: Arc<RwLock<DagState>>,
56 store: Arc<dyn Store>,
57 round_tracker: Arc<RwLock<PeerRoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61 pub(crate) fn new(
62 context: Arc<Context>,
63 block_verifier: Arc<dyn BlockVerifier>,
64 commit_vote_monitor: Arc<CommitVoteMonitor>,
65 round_tracker: Arc<RwLock<PeerRoundTracker>>,
66 synchronizer: Arc<SynchronizerHandle>,
67 core_dispatcher: Arc<C>,
68 rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69 transaction_certifier: TransactionCertifier,
70 dag_state: Arc<RwLock<DagState>>,
71 store: Arc<dyn Store>,
72 ) -> Self {
73 let subscription_counter = Arc::new(SubscriptionCounter::new(
74 context.clone(),
75 core_dispatcher.clone(),
76 ));
77 Self {
78 context,
79 block_verifier,
80 commit_vote_monitor,
81 synchronizer,
82 core_dispatcher,
83 rx_block_broadcast,
84 subscription_counter,
85 transaction_certifier,
86 dag_state,
87 store,
88 round_tracker,
89 }
90 }
91}
92
93#[async_trait]
94impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
95 async fn handle_send_block(
96 &self,
97 peer: AuthorityIndex,
98 serialized_block: ExtendedSerializedBlock,
99 ) -> ConsensusResult<()> {
100 fail_point_async!("consensus-rpc-response");
101
102 let peer_hostname = &self.context.committee.authority(peer).hostname;
103
104 let signed_block: SignedBlock =
106 bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
107
108 if peer != signed_block.author() {
110 self.context
111 .metrics
112 .node_metrics
113 .invalid_blocks
114 .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
115 .inc();
116 let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
117 info!("Block with wrong authority from {}: {}", peer, e);
118 return Err(e);
119 }
120 let peer_hostname = &self.context.committee.authority(peer).hostname;
121
122 let (verified_block, reject_txn_votes) = self
124 .block_verifier
125 .verify_and_vote(signed_block, serialized_block.block)
126 .tap_err(|e| {
127 self.context
128 .metrics
129 .node_metrics
130 .invalid_blocks
131 .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
132 .inc();
133 info!("Invalid block from {}: {}", peer, e);
134 })?;
135 let block_ref = verified_block.reference();
136 debug!("Received block {} via send block.", block_ref);
137
138 let now = self.context.clock.timestamp_utc_ms();
139 let forward_time_drift =
140 Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
141
142 self.context
143 .metrics
144 .node_metrics
145 .block_timestamp_drift_ms
146 .with_label_values(&[peer_hostname, "handle_send_block"])
147 .inc_by(forward_time_drift.as_millis() as u64);
148
149 self.commit_vote_monitor.observe_block(&verified_block);
152
153 let last_commit_index = self.dag_state.read().last_commit_index();
162 let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
163 if last_commit_index
166 + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
167 < quorum_commit_index
168 {
169 self.context
170 .metrics
171 .node_metrics
172 .rejected_blocks
173 .with_label_values(&["commit_lagging"])
174 .inc();
175 debug!(
176 "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
177 block_ref, last_commit_index, quorum_commit_index,
178 );
179 return Err(ConsensusError::BlockRejected {
180 block_ref,
181 reason: format!(
182 "Last commit index is lagging quorum commit index too much ({} < {})",
183 last_commit_index, quorum_commit_index,
184 ),
185 });
186 }
187
188 self.context
189 .metrics
190 .node_metrics
191 .verified_blocks
192 .with_label_values(&[peer_hostname])
193 .inc();
194
195 if self.context.protocol_config.mysticeti_fastpath() {
197 self.transaction_certifier
198 .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
199 }
200
201 let missing_ancestors = self
203 .core_dispatcher
204 .add_blocks(vec![verified_block.clone()])
205 .await
206 .map_err(|_| ConsensusError::Shutdown)?;
207
208 if !missing_ancestors.is_empty() {
210 self.context
211 .metrics
212 .node_metrics
213 .handler_received_block_missing_ancestors
214 .with_label_values(&[peer_hostname])
215 .inc_by(missing_ancestors.len() as u64);
216 let synchronizer = self.synchronizer.clone();
217 spawn_monitored_task!(async move {
218 if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
223 debug!("Failed to fetch missing ancestors via synchronizer: {err}");
224 }
225 });
226 }
227
228 let mut excluded_ancestors = serialized_block
231 .excluded_ancestors
232 .into_iter()
233 .map(|serialized| bcs::from_bytes::<BlockRef>(&serialized))
234 .collect::<Result<Vec<BlockRef>, bcs::Error>>()
235 .map_err(ConsensusError::MalformedBlock)?;
236
237 let excluded_ancestors_limit = self.context.committee.size() * 2;
238 if excluded_ancestors.len() > excluded_ancestors_limit {
239 debug!(
240 "Dropping {} excluded ancestor(s) from {} {} due to size limit",
241 excluded_ancestors.len() - excluded_ancestors_limit,
242 peer,
243 peer_hostname,
244 );
245 excluded_ancestors.truncate(excluded_ancestors_limit);
246 }
247
248 self.round_tracker
249 .write()
250 .update_from_accepted_block(&ExtendedBlock {
251 block: verified_block,
252 excluded_ancestors: excluded_ancestors.clone(),
253 });
254
255 self.context
256 .metrics
257 .node_metrics
258 .network_received_excluded_ancestors_from_authority
259 .with_label_values(&[peer_hostname])
260 .inc_by(excluded_ancestors.len() as u64);
261
262 for excluded_ancestor in &excluded_ancestors {
263 let excluded_ancestor_hostname = &self
264 .context
265 .committee
266 .authority(excluded_ancestor.author)
267 .hostname;
268 self.context
269 .metrics
270 .node_metrics
271 .network_excluded_ancestors_count_by_authority
272 .with_label_values(&[excluded_ancestor_hostname])
273 .inc();
274 }
275
276 let missing_excluded_ancestors = self
277 .core_dispatcher
278 .check_block_refs(excluded_ancestors)
279 .await
280 .map_err(|_| ConsensusError::Shutdown)?;
281
282 if !missing_excluded_ancestors.is_empty() {
284 self.context
285 .metrics
286 .node_metrics
287 .network_excluded_ancestors_sent_to_fetch
288 .with_label_values(&[peer_hostname])
289 .inc_by(missing_excluded_ancestors.len() as u64);
290
291 let synchronizer = self.synchronizer.clone();
292 spawn_monitored_task!(async move {
293 if let Err(err) = synchronizer
294 .fetch_blocks(missing_excluded_ancestors, peer)
295 .await
296 {
297 debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
298 }
299 });
300 }
301
302 Ok(())
303 }
304
305 async fn handle_subscribe_blocks(
306 &self,
307 peer: AuthorityIndex,
308 last_received: Round,
309 ) -> ConsensusResult<BlockStream> {
310 fail_point_async!("consensus-rpc-response");
311
312 let dag_state = self.dag_state.read();
313 let missed_blocks = stream::iter(
317 dag_state
318 .get_cached_blocks(self.context.own_index, last_received + 1)
319 .into_iter()
320 .map(|block| ExtendedSerializedBlock {
321 block: block.serialized().clone(),
322 excluded_ancestors: vec![],
323 }),
324 );
325
326 let broadcasted_blocks = BroadcastedBlockStream::new(
327 peer,
328 self.rx_block_broadcast.resubscribe(),
329 self.subscription_counter.clone(),
330 );
331
332 Ok(Box::pin(missed_blocks.chain(
334 broadcasted_blocks.map(ExtendedSerializedBlock::from),
335 )))
336 }
337
338 async fn handle_fetch_blocks(
346 &self,
347 _peer: AuthorityIndex,
348 mut block_refs: Vec<BlockRef>,
349 highest_accepted_rounds: Vec<Round>,
350 breadth_first: bool,
351 ) -> ConsensusResult<Vec<Bytes>> {
352 fail_point_async!("consensus-rpc-response");
353
354 if !highest_accepted_rounds.is_empty()
355 && highest_accepted_rounds.len() != self.context.committee.size()
356 {
357 return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
358 highest_accepted_rounds.len(),
359 self.context.committee.size(),
360 ));
361 }
362
363 let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
365 self.context.parameters.max_blocks_per_sync
366 } else {
367 self.context.parameters.max_blocks_per_fetch
368 };
369 if block_refs.len() > max_response_num_blocks {
370 block_refs.truncate(max_response_num_blocks);
371 }
372
373 for block in &block_refs {
375 if !self.context.committee.is_valid_index(block.author) {
376 return Err(ConsensusError::InvalidAuthorityIndex {
377 index: block.author,
378 max: self.context.committee.size(),
379 });
380 }
381 if block.round == GENESIS_ROUND {
382 return Err(ConsensusError::UnexpectedGenesisBlockRequested);
383 }
384 }
385
386 let blocks = if !highest_accepted_rounds.is_empty() {
388 block_refs.sort();
389 block_refs.dedup();
390 let mut blocks = self
391 .dag_state
392 .read()
393 .get_blocks(&block_refs)
394 .into_iter()
395 .flatten()
396 .collect::<Vec<_>>();
397
398 if breadth_first {
399 let mut missing_ancestors = blocks
401 .iter()
402 .flat_map(|block| block.ancestors().to_vec())
403 .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
404 .collect::<BTreeSet<_>>()
405 .into_iter()
406 .collect::<Vec<_>>();
407
408 let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
411 if selected_num_blocks < missing_ancestors.len() {
412 missing_ancestors = missing_ancestors
413 .choose_multiple(&mut rand::thread_rng(), selected_num_blocks)
414 .copied()
415 .collect::<Vec<_>>();
416 }
417 let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
418 blocks.extend(ancestor_blocks.into_iter().flatten());
419 } else {
420 let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
423 for block_ref in blocks.iter().map(|b| b.reference()) {
424 let entry = lowest_missing_rounds
425 .entry(block_ref.author)
426 .or_insert(block_ref.round);
427 *entry = (*entry).min(block_ref.round);
428 }
429
430 let dag_state = self.dag_state.read();
436 for (authority, lowest_missing_round) in lowest_missing_rounds {
437 let highest_accepted_round = highest_accepted_rounds[authority];
438 if highest_accepted_round >= lowest_missing_round {
439 continue;
440 }
441 let missing_blocks = dag_state.get_cached_blocks_in_range(
442 authority,
443 highest_accepted_round + 1,
444 lowest_missing_round,
445 self.context
446 .parameters
447 .max_blocks_per_sync
448 .saturating_sub(blocks.len()),
449 );
450 blocks.extend(missing_blocks);
451 if blocks.len() >= self.context.parameters.max_blocks_per_sync {
452 blocks.truncate(self.context.parameters.max_blocks_per_sync);
453 break;
454 }
455 }
456 }
457
458 blocks
459 } else {
460 self.dag_state
461 .read()
462 .get_blocks(&block_refs)
463 .into_iter()
464 .flatten()
465 .collect()
466 };
467
468 let bytes = blocks
470 .into_iter()
471 .map(|block| block.serialized().clone())
472 .collect::<Vec<_>>();
473 Ok(bytes)
474 }
475
476 async fn handle_fetch_commits(
477 &self,
478 _peer: AuthorityIndex,
479 commit_range: CommitRange,
480 ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
481 fail_point_async!("consensus-rpc-response");
482
483 let inclusive_end = commit_range.end().min(
485 commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
486 - 1,
487 );
488 let mut commits = self
489 .store
490 .scan_commits((commit_range.start()..=inclusive_end).into())?;
491 let mut certifier_block_refs = vec![];
492 'commit: while let Some(c) = commits.last() {
493 let index = c.index();
494 let votes = self.store.read_commit_votes(index)?;
495 let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
496 for v in &votes {
497 stake_aggregator.add(v.author, &self.context.committee);
498 }
499 if stake_aggregator.reached_threshold(&self.context.committee) {
500 certifier_block_refs = votes;
501 break 'commit;
502 } else {
503 debug!(
504 "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
505 index,
506 stake_aggregator.stake(),
507 stake_aggregator.threshold(&self.context.committee)
508 );
509 self.context
510 .metrics
511 .node_metrics
512 .commit_sync_fetch_commits_handler_uncertified_skipped
513 .inc();
514 commits.pop();
515 }
516 }
517 let certifier_blocks = self
518 .store
519 .read_blocks(&certifier_block_refs)?
520 .into_iter()
521 .flatten()
522 .collect();
523 Ok((commits, certifier_blocks))
524 }
525
526 async fn handle_fetch_latest_blocks(
527 &self,
528 peer: AuthorityIndex,
529 authorities: Vec<AuthorityIndex>,
530 ) -> ConsensusResult<Vec<Bytes>> {
531 fail_point_async!("consensus-rpc-response");
532
533 if authorities.len() > self.context.committee.size() {
534 return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
535 }
536
537 for authority in &authorities {
539 if !self.context.committee.is_valid_index(*authority) {
540 return Err(ConsensusError::InvalidAuthorityIndex {
541 index: *authority,
542 max: self.context.committee.size(),
543 });
544 }
545 }
546
547 let mut blocks = vec![];
551 let dag_state = self.dag_state.read();
552 for authority in authorities {
553 let block = dag_state.get_last_block_for_authority(authority);
554
555 debug!("Latest block for {authority}: {block:?} as requested from {peer}");
556
557 if block.round() != GENESIS_ROUND {
559 blocks.push(block);
560 }
561 }
562
563 let result = blocks
565 .into_iter()
566 .map(|block| block.serialized().clone())
567 .collect::<Vec<_>>();
568
569 Ok(result)
570 }
571
572 async fn handle_get_latest_rounds(
573 &self,
574 _peer: AuthorityIndex,
575 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
576 fail_point_async!("consensus-rpc-response");
577
578 let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
579
580 let blocks = self
581 .dag_state
582 .read()
583 .get_last_cached_block_per_authority(Round::MAX);
584 let highest_accepted_rounds = blocks
585 .into_iter()
586 .map(|(block, _)| block.round())
587 .collect::<Vec<_>>();
588
589 highest_received_rounds[self.context.own_index] =
591 highest_accepted_rounds[self.context.own_index];
592
593 Ok((highest_received_rounds, highest_accepted_rounds))
594 }
595}
596
597struct Counter {
598 count: usize,
599 subscriptions_by_authority: Vec<usize>,
600}
601
602struct SubscriptionCounter {
605 context: Arc<Context>,
606 counter: parking_lot::Mutex<Counter>,
607 dispatcher: Arc<dyn CoreThreadDispatcher>,
608}
609
610impl SubscriptionCounter {
611 fn new(context: Arc<Context>, dispatcher: Arc<dyn CoreThreadDispatcher>) -> Self {
612 for (_, authority) in context.committee.authorities() {
614 context
615 .metrics
616 .node_metrics
617 .subscribed_by
618 .with_label_values(&[authority.hostname.as_str()])
619 .set(0);
620 }
621
622 Self {
623 counter: parking_lot::Mutex::new(Counter {
624 count: 0,
625 subscriptions_by_authority: vec![0; context.committee.size()],
626 }),
627 dispatcher,
628 context,
629 }
630 }
631
632 fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
633 let mut counter = self.counter.lock();
634 counter.count += 1;
635 counter.subscriptions_by_authority[peer] += 1;
636
637 let peer_hostname = &self.context.committee.authority(peer).hostname;
638 self.context
639 .metrics
640 .node_metrics
641 .subscribed_by
642 .with_label_values(&[peer_hostname])
643 .set(1);
644
645 if counter.count == 1 {
646 self.dispatcher
647 .set_subscriber_exists(true)
648 .map_err(|_| ConsensusError::Shutdown)?;
649 }
650 Ok(())
651 }
652
653 fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
654 let mut counter = self.counter.lock();
655 counter.count -= 1;
656 counter.subscriptions_by_authority[peer] -= 1;
657
658 if counter.subscriptions_by_authority[peer] == 0 {
659 let peer_hostname = &self.context.committee.authority(peer).hostname;
660 self.context
661 .metrics
662 .node_metrics
663 .subscribed_by
664 .with_label_values(&[peer_hostname])
665 .set(0);
666 }
667
668 if counter.count == 0 {
669 self.dispatcher
670 .set_subscriber_exists(false)
671 .map_err(|_| ConsensusError::Shutdown)?;
672 }
673 Ok(())
674 }
675}
676
677type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
680
681struct BroadcastStream<T> {
684 peer: AuthorityIndex,
685 inner: ReusableBoxFuture<
687 'static,
688 (
689 Result<T, broadcast::error::RecvError>,
690 broadcast::Receiver<T>,
691 ),
692 >,
693 subscription_counter: Arc<SubscriptionCounter>,
695}
696
697impl<T: 'static + Clone + Send> BroadcastStream<T> {
698 pub fn new(
699 peer: AuthorityIndex,
700 rx: broadcast::Receiver<T>,
701 subscription_counter: Arc<SubscriptionCounter>,
702 ) -> Self {
703 if let Err(err) = subscription_counter.increment(peer) {
704 match err {
705 ConsensusError::Shutdown => {}
706 _ => panic!("Unexpected error: {err}"),
707 }
708 }
709 Self {
710 peer,
711 inner: ReusableBoxFuture::new(make_recv_future(rx)),
712 subscription_counter,
713 }
714 }
715}
716
717impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
718 type Item = T;
719
720 fn poll_next(
721 mut self: Pin<&mut Self>,
722 cx: &mut task::Context<'_>,
723 ) -> task::Poll<Option<Self::Item>> {
724 let peer = self.peer;
725 let maybe_item = loop {
726 let (result, rx) = ready!(self.inner.poll(cx));
727 self.inner.set(make_recv_future(rx));
728
729 match result {
730 Ok(item) => break Some(item),
731 Err(broadcast::error::RecvError::Closed) => {
732 info!("Block BroadcastedBlockStream {} closed", peer);
733 break None;
734 }
735 Err(broadcast::error::RecvError::Lagged(n)) => {
736 warn!(
737 "Block BroadcastedBlockStream {} lagged by {} messages",
738 peer, n
739 );
740 continue;
741 }
742 }
743 };
744 task::Poll::Ready(maybe_item)
745 }
746}
747
748impl<T> Drop for BroadcastStream<T> {
749 fn drop(&mut self) {
750 if let Err(err) = self.subscription_counter.decrement(self.peer) {
751 match err {
752 ConsensusError::Shutdown => {}
753 _ => panic!("Unexpected error: {err}"),
754 }
755 }
756 }
757}
758
759async fn make_recv_future<T: Clone>(
760 mut rx: broadcast::Receiver<T>,
761) -> (
762 Result<T, broadcast::error::RecvError>,
763 broadcast::Receiver<T>,
764) {
765 let result = rx.recv().await;
766 (result, rx)
767}
768
769#[cfg(test)]
772mod tests {
773 use std::{
774 collections::{BTreeMap, BTreeSet},
775 sync::Arc,
776 time::Duration,
777 };
778
779 use async_trait::async_trait;
780 use bytes::Bytes;
781 use consensus_config::AuthorityIndex;
782 use consensus_types::block::{BlockRef, Round};
783 use mysten_metrics::monitored_mpsc;
784 use parking_lot::{Mutex, RwLock};
785 use tokio::{sync::broadcast, time::sleep};
786
787 use crate::{
788 authority_service::AuthorityService,
789 block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
790 commit::{CertifiedCommits, CommitRange},
791 commit_vote_monitor::CommitVoteMonitor,
792 context::Context,
793 core_thread::{CoreError, CoreThreadDispatcher},
794 dag_state::DagState,
795 error::ConsensusResult,
796 network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
797 round_tracker::PeerRoundTracker,
798 storage::mem_store::MemStore,
799 synchronizer::Synchronizer,
800 test_dag_builder::DagBuilder,
801 transaction_certifier::TransactionCertifier,
802 };
803 struct FakeCoreThreadDispatcher {
804 blocks: Mutex<Vec<VerifiedBlock>>,
805 }
806
807 impl FakeCoreThreadDispatcher {
808 fn new() -> Self {
809 Self {
810 blocks: Mutex::new(vec![]),
811 }
812 }
813
814 fn get_blocks(&self) -> Vec<VerifiedBlock> {
815 self.blocks.lock().clone()
816 }
817 }
818
819 #[async_trait]
820 impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
821 async fn add_blocks(
822 &self,
823 blocks: Vec<VerifiedBlock>,
824 ) -> Result<BTreeSet<BlockRef>, CoreError> {
825 let block_refs = blocks.iter().map(|b| b.reference()).collect();
826 self.blocks.lock().extend(blocks);
827 Ok(block_refs)
828 }
829
830 async fn check_block_refs(
831 &self,
832 _block_refs: Vec<BlockRef>,
833 ) -> Result<BTreeSet<BlockRef>, CoreError> {
834 Ok(BTreeSet::new())
835 }
836
837 async fn add_certified_commits(
838 &self,
839 _commits: CertifiedCommits,
840 ) -> Result<BTreeSet<BlockRef>, CoreError> {
841 todo!()
842 }
843
844 async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
845 Ok(())
846 }
847
848 async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
849 Ok(Default::default())
850 }
851
852 fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
853 todo!()
854 }
855
856 fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
857 todo!()
858 }
859
860 fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
861 todo!()
862 }
863
864 fn highest_received_rounds(&self) -> Vec<Round> {
865 todo!()
866 }
867 }
868
869 #[derive(Default)]
870 struct FakeNetworkClient {}
871
872 #[async_trait]
873 impl NetworkClient for FakeNetworkClient {
874 const SUPPORT_STREAMING: bool = false;
875
876 async fn send_block(
877 &self,
878 _peer: AuthorityIndex,
879 _block: &VerifiedBlock,
880 _timeout: Duration,
881 ) -> ConsensusResult<()> {
882 unimplemented!("Unimplemented")
883 }
884
885 async fn subscribe_blocks(
886 &self,
887 _peer: AuthorityIndex,
888 _last_received: Round,
889 _timeout: Duration,
890 ) -> ConsensusResult<BlockStream> {
891 unimplemented!("Unimplemented")
892 }
893
894 async fn fetch_blocks(
895 &self,
896 _peer: AuthorityIndex,
897 _block_refs: Vec<BlockRef>,
898 _highest_accepted_rounds: Vec<Round>,
899 _breadth_first: bool,
900 _timeout: Duration,
901 ) -> ConsensusResult<Vec<Bytes>> {
902 unimplemented!("Unimplemented")
903 }
904
905 async fn fetch_commits(
906 &self,
907 _peer: AuthorityIndex,
908 _commit_range: CommitRange,
909 _timeout: Duration,
910 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
911 unimplemented!("Unimplemented")
912 }
913
914 async fn fetch_latest_blocks(
915 &self,
916 _peer: AuthorityIndex,
917 _authorities: Vec<AuthorityIndex>,
918 _timeout: Duration,
919 ) -> ConsensusResult<Vec<Bytes>> {
920 unimplemented!("Unimplemented")
921 }
922
923 async fn get_latest_rounds(
924 &self,
925 _peer: AuthorityIndex,
926 _timeout: Duration,
927 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
928 unimplemented!("Unimplemented")
929 }
930 }
931
932 #[tokio::test(flavor = "current_thread", start_paused = true)]
933 async fn test_handle_send_block() {
934 let (context, _keys) = Context::new_for_test(4);
935 let context = Arc::new(context);
936 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
937 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
938 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
939 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
940 let network_client = Arc::new(FakeNetworkClient::default());
941 let (blocks_sender, _blocks_receiver) =
942 monitored_mpsc::unbounded_channel("consensus_block_output");
943 let store = Arc::new(MemStore::new());
944 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
945 let transaction_certifier =
946 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
947 let synchronizer = Synchronizer::start(
948 network_client,
949 context.clone(),
950 core_dispatcher.clone(),
951 commit_vote_monitor.clone(),
952 block_verifier.clone(),
953 transaction_certifier.clone(),
954 dag_state.clone(),
955 false,
956 );
957 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
958 let authority_service = Arc::new(AuthorityService::new(
959 context.clone(),
960 block_verifier,
961 commit_vote_monitor,
962 round_tracker,
963 synchronizer,
964 core_dispatcher.clone(),
965 rx_block_broadcast,
966 transaction_certifier,
967 dag_state,
968 store,
969 ));
970
971 let now = context.clock.timestamp_utc_ms();
973 let max_drift = context.parameters.max_forward_time_drift;
974 let input_block = VerifiedBlock::new_for_test(
975 TestBlock::new(9, 0)
976 .set_timestamp_ms(now + max_drift.as_millis() as u64)
977 .build(),
978 );
979
980 let service = authority_service.clone();
981 let serialized = ExtendedSerializedBlock {
982 block: input_block.serialized().clone(),
983 excluded_ancestors: vec![],
984 };
985
986 tokio::spawn(async move {
987 service
988 .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
989 .await
990 .unwrap();
991 });
992
993 sleep(max_drift / 2).await;
994
995 let blocks = core_dispatcher.get_blocks();
996 assert_eq!(blocks.len(), 1);
997 assert_eq!(blocks[0], input_block);
998 }
999
1000 #[tokio::test(flavor = "current_thread", start_paused = true)]
1001 async fn test_handle_fetch_blocks() {
1002 const NUM_AUTHORITIES: usize = 40;
1005 const NUM_ROUNDS: usize = 40;
1006 let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1007 let context = Arc::new(context);
1008 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1009 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1010 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1011 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1012 let network_client = Arc::new(FakeNetworkClient::default());
1013 let (blocks_sender, _blocks_receiver) =
1014 monitored_mpsc::unbounded_channel("consensus_block_output");
1015 let store = Arc::new(MemStore::new());
1016 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1017 let transaction_certifier =
1018 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1019 let synchronizer = Synchronizer::start(
1020 network_client,
1021 context.clone(),
1022 core_dispatcher.clone(),
1023 commit_vote_monitor.clone(),
1024 block_verifier.clone(),
1025 transaction_certifier.clone(),
1026 dag_state.clone(),
1027 false,
1028 );
1029 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1030 let authority_service = Arc::new(AuthorityService::new(
1031 context.clone(),
1032 block_verifier,
1033 commit_vote_monitor,
1034 round_tracker,
1035 synchronizer,
1036 core_dispatcher.clone(),
1037 rx_block_broadcast,
1038 transaction_certifier,
1039 dag_state.clone(),
1040 store,
1041 ));
1042
1043 let mut dag_builder = DagBuilder::new(context.clone());
1045 dag_builder
1046 .layers(1..=(NUM_ROUNDS as u32))
1047 .build()
1048 .persist_layers(dag_state.clone());
1049 let all_blocks = dag_builder.all_blocks();
1050
1051 let missing_block_refs: Vec<BlockRef> = all_blocks
1053 .iter()
1054 .rev()
1055 .take(2)
1056 .map(|b| b.reference())
1057 .collect();
1058 let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1059 let results = authority_service
1060 .handle_fetch_blocks(
1061 AuthorityIndex::new_for_test(0),
1062 missing_block_refs.clone(),
1063 highest_accepted_rounds,
1064 true,
1065 )
1066 .await
1067 .unwrap();
1068
1069 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1071 .iter()
1072 .map(|b| {
1073 let signed = bcs::from_bytes(b).unwrap();
1074 let block = VerifiedBlock::new_verified(signed, b.clone());
1075 (block.reference(), block)
1076 })
1077 .collect();
1078 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1079 for b in &missing_block_refs {
1081 assert!(blocks.contains_key(b));
1082 }
1083 let num_missing_ancestors = blocks
1084 .keys()
1085 .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1086 .count();
1087 assert_eq!(
1088 num_missing_ancestors,
1089 context.parameters.max_blocks_per_sync - missing_block_refs.len()
1090 );
1091
1092 let missing_round = NUM_ROUNDS as Round - 3;
1094 let missing_block_refs: Vec<BlockRef> = all_blocks
1095 .iter()
1096 .filter(|b| b.reference().round == missing_round)
1097 .map(|b| b.reference())
1098 .take(2)
1099 .collect();
1100 let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1101 highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1103 let results = authority_service
1104 .handle_fetch_blocks(
1105 AuthorityIndex::new_for_test(0),
1106 missing_block_refs.clone(),
1107 highest_accepted_rounds,
1108 false,
1109 )
1110 .await
1111 .unwrap();
1112
1113 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1115 .iter()
1116 .map(|b| {
1117 let signed = bcs::from_bytes(b).unwrap();
1118 let block = VerifiedBlock::new_verified(signed, b.clone());
1119 (block.reference(), block)
1120 })
1121 .collect();
1122 assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1123 for b in &missing_block_refs {
1125 assert!(blocks.contains_key(b));
1126 }
1127 let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1129 for b in blocks.keys() {
1130 assert!(b.round <= missing_round);
1131 assert!(expected_authors.contains(&b.author));
1132 }
1133
1134 let missing_block_refs: Vec<BlockRef> = all_blocks
1136 .iter()
1137 .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1138 .map(|b| b.reference())
1139 .take(5)
1140 .collect();
1141 let results = authority_service
1142 .handle_fetch_blocks(
1143 AuthorityIndex::new_for_test(0),
1144 missing_block_refs.clone(),
1145 vec![],
1146 false,
1147 )
1148 .await
1149 .unwrap();
1150
1151 let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1153 .iter()
1154 .map(|b| {
1155 let signed = bcs::from_bytes(b).unwrap();
1156 let block = VerifiedBlock::new_verified(signed, b.clone());
1157 (block.reference(), block)
1158 })
1159 .collect();
1160 assert_eq!(blocks.len(), 5);
1161 for b in &missing_block_refs {
1162 assert!(blocks.contains_key(b));
1163 }
1164 }
1165
1166 #[tokio::test(flavor = "current_thread", start_paused = true)]
1167 async fn test_handle_fetch_latest_blocks() {
1168 let (context, _keys) = Context::new_for_test(4);
1170 let context = Arc::new(context);
1171 let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1172 let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1173 let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1174 let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1175 let network_client = Arc::new(FakeNetworkClient::default());
1176 let (blocks_sender, _blocks_receiver) =
1177 monitored_mpsc::unbounded_channel("consensus_block_output");
1178 let store = Arc::new(MemStore::new());
1179 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1180 let transaction_certifier =
1181 TransactionCertifier::new(context.clone(), dag_state.clone(), blocks_sender);
1182 let synchronizer = Synchronizer::start(
1183 network_client,
1184 context.clone(),
1185 core_dispatcher.clone(),
1186 commit_vote_monitor.clone(),
1187 block_verifier.clone(),
1188 transaction_certifier.clone(),
1189 dag_state.clone(),
1190 true,
1191 );
1192 let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1193 let authority_service = Arc::new(AuthorityService::new(
1194 context.clone(),
1195 block_verifier,
1196 commit_vote_monitor,
1197 round_tracker,
1198 synchronizer,
1199 core_dispatcher.clone(),
1200 rx_block_broadcast,
1201 transaction_certifier,
1202 dag_state.clone(),
1203 store,
1204 ));
1205
1206 let mut dag_builder = DagBuilder::new(context.clone());
1208 dag_builder
1209 .layers(1..=10)
1210 .authorities(vec![AuthorityIndex::new_for_test(2)])
1211 .equivocate(1)
1212 .build()
1213 .persist_layers(dag_state);
1214
1215 let authorities_to_request = vec![
1217 AuthorityIndex::new_for_test(1),
1218 AuthorityIndex::new_for_test(2),
1219 ];
1220 let results = authority_service
1221 .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1222 .await;
1223
1224 let serialised_blocks = results.unwrap();
1226 for serialised_block in serialised_blocks {
1227 let signed_block: SignedBlock =
1228 bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1229 let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1230
1231 assert_eq!(verified_block.round(), 10);
1232 }
1233 }
1234}