consensus_core/
authority_service.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    pin::Pin,
7    sync::Arc,
8    time::Duration,
9};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use consensus_config::AuthorityIndex;
14use consensus_types::block::{BlockRef, Round};
15use futures::{Stream, StreamExt, ready, stream, task};
16use mysten_metrics::spawn_monitored_task;
17use parking_lot::RwLock;
18use rand::seq::SliceRandom as _;
19use sui_macros::fail_point_async;
20use tap::TapFallible;
21use tokio::sync::broadcast;
22use tokio_util::sync::ReusableBoxFuture;
23use tracing::{debug, info, warn};
24
25use crate::{
26    CommitIndex,
27    block::{BlockAPI as _, ExtendedBlock, GENESIS_ROUND, SignedBlock, VerifiedBlock},
28    block_verifier::BlockVerifier,
29    commit::{CommitAPI as _, CommitRange, TrustedCommit},
30    commit_vote_monitor::CommitVoteMonitor,
31    context::Context,
32    core_thread::CoreThreadDispatcher,
33    dag_state::DagState,
34    error::{ConsensusError, ConsensusResult},
35    network::{BlockStream, ExtendedSerializedBlock, NetworkService},
36    round_tracker::PeerRoundTracker,
37    stake_aggregator::{QuorumThreshold, StakeAggregator},
38    storage::Store,
39    synchronizer::SynchronizerHandle,
40    transaction_certifier::TransactionCertifier,
41};
42
43pub(crate) const COMMIT_LAG_MULTIPLIER: u32 = 5;
44
45/// Authority's network service implementation, agnostic to the actual networking stack used.
46pub(crate) struct AuthorityService<C: CoreThreadDispatcher> {
47    context: Arc<Context>,
48    commit_vote_monitor: Arc<CommitVoteMonitor>,
49    block_verifier: Arc<dyn BlockVerifier>,
50    synchronizer: Arc<SynchronizerHandle>,
51    core_dispatcher: Arc<C>,
52    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
53    subscription_counter: Arc<SubscriptionCounter>,
54    transaction_certifier: TransactionCertifier,
55    dag_state: Arc<RwLock<DagState>>,
56    store: Arc<dyn Store>,
57    round_tracker: Arc<RwLock<PeerRoundTracker>>,
58}
59
60impl<C: CoreThreadDispatcher> AuthorityService<C> {
61    pub(crate) fn new(
62        context: Arc<Context>,
63        block_verifier: Arc<dyn BlockVerifier>,
64        commit_vote_monitor: Arc<CommitVoteMonitor>,
65        round_tracker: Arc<RwLock<PeerRoundTracker>>,
66        synchronizer: Arc<SynchronizerHandle>,
67        core_dispatcher: Arc<C>,
68        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
69        transaction_certifier: TransactionCertifier,
70        dag_state: Arc<RwLock<DagState>>,
71        store: Arc<dyn Store>,
72    ) -> Self {
73        let subscription_counter = Arc::new(SubscriptionCounter::new(context.clone()));
74        Self {
75            context,
76            block_verifier,
77            commit_vote_monitor,
78            synchronizer,
79            core_dispatcher,
80            rx_block_broadcast,
81            subscription_counter,
82            transaction_certifier,
83            dag_state,
84            store,
85            round_tracker,
86        }
87    }
88
89    // Parses and validates serialized excluded ancestors.
90    fn parse_excluded_ancestors(
91        &self,
92        peer: AuthorityIndex,
93        block: &VerifiedBlock,
94        mut excluded_ancestors: Vec<Vec<u8>>,
95    ) -> ConsensusResult<Vec<BlockRef>> {
96        let peer_hostname = &self.context.committee.authority(peer).hostname;
97
98        let excluded_ancestors_limit = self.context.committee.size() * 2;
99        if excluded_ancestors.len() > excluded_ancestors_limit {
100            debug!(
101                "Dropping {} excluded ancestor(s) from {} {} due to size limit",
102                excluded_ancestors.len() - excluded_ancestors_limit,
103                peer,
104                peer_hostname,
105            );
106            excluded_ancestors.truncate(excluded_ancestors_limit);
107        }
108
109        let excluded_ancestors = excluded_ancestors
110            .into_iter()
111            .map(|serialized| {
112                let block_ref: BlockRef =
113                    bcs::from_bytes(&serialized).map_err(ConsensusError::MalformedBlock)?;
114                if !self.context.committee.is_valid_index(block_ref.author) {
115                    return Err(ConsensusError::InvalidAuthorityIndex {
116                        index: block_ref.author,
117                        max: self.context.committee.size(),
118                    });
119                }
120                if block_ref.round >= block.round() {
121                    return Err(ConsensusError::InvalidAncestorRound {
122                        ancestor: block_ref.round,
123                        block: block.round(),
124                    });
125                }
126                Ok(block_ref)
127            })
128            .collect::<ConsensusResult<Vec<BlockRef>>>()?;
129
130        for excluded_ancestor in &excluded_ancestors {
131            let excluded_ancestor_hostname = &self
132                .context
133                .committee
134                .authority(excluded_ancestor.author)
135                .hostname;
136            self.context
137                .metrics
138                .node_metrics
139                .network_excluded_ancestors_count_by_authority
140                .with_label_values(&[excluded_ancestor_hostname])
141                .inc();
142        }
143        self.context
144            .metrics
145            .node_metrics
146            .network_received_excluded_ancestors_from_authority
147            .with_label_values(&[peer_hostname])
148            .inc_by(excluded_ancestors.len() as u64);
149
150        Ok(excluded_ancestors)
151    }
152}
153
154#[async_trait]
155impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
156    async fn handle_send_block(
157        &self,
158        peer: AuthorityIndex,
159        serialized_block: ExtendedSerializedBlock,
160    ) -> ConsensusResult<()> {
161        fail_point_async!("consensus-rpc-response");
162
163        let peer_hostname = &self.context.committee.authority(peer).hostname;
164
165        // TODO: dedup block verifications, here and with fetched blocks.
166        let signed_block: SignedBlock =
167            bcs::from_bytes(&serialized_block.block).map_err(ConsensusError::MalformedBlock)?;
168
169        // Reject blocks not produced by the peer.
170        if peer != signed_block.author() {
171            self.context
172                .metrics
173                .node_metrics
174                .invalid_blocks
175                .with_label_values(&[peer_hostname, "handle_send_block", "UnexpectedAuthority"])
176                .inc();
177            let e = ConsensusError::UnexpectedAuthority(signed_block.author(), peer);
178            info!("Block with wrong authority from {}: {}", peer, e);
179            return Err(e);
180        }
181
182        // Reject blocks failing validations.
183        let (verified_block, reject_txn_votes) = self
184            .block_verifier
185            .verify_and_vote(signed_block, serialized_block.block)
186            .tap_err(|e| {
187                self.context
188                    .metrics
189                    .node_metrics
190                    .invalid_blocks
191                    .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
192                    .inc();
193                info!("Invalid block from {}: {}", peer, e);
194            })?;
195        let block_ref = verified_block.reference();
196        debug!("Received block {} via send block.", block_ref);
197
198        let now = self.context.clock.timestamp_utc_ms();
199        let forward_time_drift =
200            Duration::from_millis(verified_block.timestamp_ms().saturating_sub(now));
201
202        self.context
203            .metrics
204            .node_metrics
205            .block_timestamp_drift_ms
206            .with_label_values(&[peer_hostname, "handle_send_block"])
207            .inc_by(forward_time_drift.as_millis() as u64);
208
209        // Observe the block for the commit votes. When local commit is lagging too much,
210        // commit sync loop will trigger fetching.
211        self.commit_vote_monitor.observe_block(&verified_block);
212
213        // Reject blocks when local commit index is lagging too far from quorum commit index,
214        // to avoid the memory overhead from suspended blocks.
215        //
216        // IMPORTANT: this must be done after observing votes from the block, otherwise
217        // observed quorum commit will no longer progress.
218        //
219        // Since the main issue with too many suspended blocks is memory usage not CPU,
220        // it is ok to reject after block verifications instead of before.
221        let last_commit_index = self.dag_state.read().last_commit_index();
222        let quorum_commit_index = self.commit_vote_monitor.quorum_commit_index();
223        // The threshold to ignore block should be larger than commit_sync_batch_size,
224        // to avoid excessive block rejections and synchronizations.
225        if last_commit_index
226            + self.context.parameters.commit_sync_batch_size * COMMIT_LAG_MULTIPLIER
227            < quorum_commit_index
228        {
229            self.context
230                .metrics
231                .node_metrics
232                .rejected_blocks
233                .with_label_values(&["commit_lagging"])
234                .inc();
235            debug!(
236                "Block {:?} is rejected because last commit index is lagging quorum commit index too much ({} < {})",
237                block_ref, last_commit_index, quorum_commit_index,
238            );
239            return Err(ConsensusError::BlockRejected {
240                block_ref,
241                reason: format!(
242                    "Last commit index is lagging quorum commit index too much ({} < {})",
243                    last_commit_index, quorum_commit_index,
244                ),
245            });
246        }
247
248        self.context
249            .metrics
250            .node_metrics
251            .verified_blocks
252            .with_label_values(&[peer_hostname])
253            .inc();
254
255        // The block is verified and current, so it can be processed in the fastpath.
256        if self.context.protocol_config.mysticeti_fastpath() {
257            self.transaction_certifier
258                .add_voted_blocks(vec![(verified_block.clone(), reject_txn_votes)]);
259        }
260
261        // Try to accept the block into the DAG.
262        let missing_ancestors = self
263            .core_dispatcher
264            .add_blocks(vec![verified_block.clone()])
265            .await
266            .map_err(|_| ConsensusError::Shutdown)?;
267
268        // Schedule fetching missing ancestors from this peer in the background.
269        if !missing_ancestors.is_empty() {
270            self.context
271                .metrics
272                .node_metrics
273                .handler_received_block_missing_ancestors
274                .with_label_values(&[peer_hostname])
275                .inc_by(missing_ancestors.len() as u64);
276            let synchronizer = self.synchronizer.clone();
277            spawn_monitored_task!(async move {
278                // This does not wait for the fetch request to complete.
279                // It only waits for synchronizer to queue the request to a peer.
280                // When this fails, it usually means the queue is full.
281                // The fetch will retry from other peers via live and periodic syncs.
282                if let Err(err) = synchronizer.fetch_blocks(missing_ancestors, peer).await {
283                    debug!("Failed to fetch missing ancestors via synchronizer: {err}");
284                }
285            });
286        }
287
288        // ------------ After processing the block, process the excluded ancestors ------------
289
290        let excluded_ancestors = self
291            .parse_excluded_ancestors(peer, &verified_block, serialized_block.excluded_ancestors)
292            .tap_err(|e| {
293                debug!("Failed to parse excluded ancestors from {peer} {peer_hostname}: {e}");
294                self.context
295                    .metrics
296                    .node_metrics
297                    .invalid_blocks
298                    .with_label_values(&[peer_hostname, "handle_send_block", e.name()])
299                    .inc();
300            })?;
301
302        self.round_tracker
303            .write()
304            .update_from_verified_block(&ExtendedBlock {
305                block: verified_block,
306                excluded_ancestors: excluded_ancestors.clone(),
307            });
308
309        let missing_excluded_ancestors = self
310            .core_dispatcher
311            .check_block_refs(excluded_ancestors)
312            .await
313            .map_err(|_| ConsensusError::Shutdown)?;
314
315        // Schedule fetching missing soft links from this peer in the background.
316        if !missing_excluded_ancestors.is_empty() {
317            self.context
318                .metrics
319                .node_metrics
320                .network_excluded_ancestors_sent_to_fetch
321                .with_label_values(&[peer_hostname])
322                .inc_by(missing_excluded_ancestors.len() as u64);
323
324            let synchronizer = self.synchronizer.clone();
325            spawn_monitored_task!(async move {
326                if let Err(err) = synchronizer
327                    .fetch_blocks(missing_excluded_ancestors, peer)
328                    .await
329                {
330                    debug!("Failed to fetch excluded ancestors via synchronizer: {err}");
331                }
332            });
333        }
334
335        Ok(())
336    }
337
338    async fn handle_subscribe_blocks(
339        &self,
340        peer: AuthorityIndex,
341        last_received: Round,
342    ) -> ConsensusResult<BlockStream> {
343        fail_point_async!("consensus-rpc-response");
344
345        let dag_state = self.dag_state.read();
346        // Find recent own blocks that have not been received by the peer.
347        // If last_received is a valid and more blocks have been proposed since then, this call is
348        // guaranteed to return at least some recent blocks, which will help with liveness.
349        let missed_blocks = stream::iter(
350            dag_state
351                .get_cached_blocks(self.context.own_index, last_received + 1)
352                .into_iter()
353                .map(|block| ExtendedSerializedBlock {
354                    block: block.serialized().clone(),
355                    excluded_ancestors: vec![],
356                }),
357        );
358
359        let broadcasted_blocks = BroadcastedBlockStream::new(
360            peer,
361            self.rx_block_broadcast.resubscribe(),
362            self.subscription_counter.clone(),
363        );
364
365        // Return a stream of blocks that first yields missed blocks as requested, then new blocks.
366        Ok(Box::pin(missed_blocks.chain(
367            broadcasted_blocks.map(ExtendedSerializedBlock::from),
368        )))
369    }
370
371    // Handles two types of requests:
372    // 1. Missing block for block sync:
373    //    - uses highest_accepted_rounds.
374    //    - max_blocks_per_sync blocks should be returned.
375    // 2. Committed block for commit sync:
376    //    - does not use highest_accepted_rounds.
377    //    - max_blocks_per_fetch blocks should be returned.
378    async fn handle_fetch_blocks(
379        &self,
380        _peer: AuthorityIndex,
381        mut block_refs: Vec<BlockRef>,
382        highest_accepted_rounds: Vec<Round>,
383        breadth_first: bool,
384    ) -> ConsensusResult<Vec<Bytes>> {
385        fail_point_async!("consensus-rpc-response");
386
387        if !highest_accepted_rounds.is_empty()
388            && highest_accepted_rounds.len() != self.context.committee.size()
389        {
390            return Err(ConsensusError::InvalidSizeOfHighestAcceptedRounds(
391                highest_accepted_rounds.len(),
392                self.context.committee.size(),
393            ));
394        }
395
396        // Some quick validation of the requested block refs
397        let max_response_num_blocks = if !highest_accepted_rounds.is_empty() {
398            self.context.parameters.max_blocks_per_sync
399        } else {
400            self.context.parameters.max_blocks_per_fetch
401        };
402        if block_refs.len() > max_response_num_blocks {
403            block_refs.truncate(max_response_num_blocks);
404        }
405
406        // Validate the requested block refs.
407        for block in &block_refs {
408            if !self.context.committee.is_valid_index(block.author) {
409                return Err(ConsensusError::InvalidAuthorityIndex {
410                    index: block.author,
411                    max: self.context.committee.size(),
412                });
413            }
414            if block.round == GENESIS_ROUND {
415                return Err(ConsensusError::UnexpectedGenesisBlockRequested);
416            }
417        }
418
419        // Get requested blocks from store.
420        let blocks = if !highest_accepted_rounds.is_empty() {
421            block_refs.sort();
422            block_refs.dedup();
423            let mut blocks = self
424                .dag_state
425                .read()
426                .get_blocks(&block_refs)
427                .into_iter()
428                .flatten()
429                .collect::<Vec<_>>();
430
431            if breadth_first {
432                // Get unique missing ancestor blocks of the requested blocks.
433                let mut missing_ancestors = blocks
434                    .iter()
435                    .flat_map(|block| block.ancestors().to_vec())
436                    .filter(|block_ref| highest_accepted_rounds[block_ref.author] < block_ref.round)
437                    .collect::<BTreeSet<_>>()
438                    .into_iter()
439                    .collect::<Vec<_>>();
440
441                // If there are too many missing ancestors, randomly select a subset to avoid
442                // fetching duplicated blocks across peers.
443                let selected_num_blocks = max_response_num_blocks.saturating_sub(blocks.len());
444                if selected_num_blocks < missing_ancestors.len() {
445                    missing_ancestors = missing_ancestors
446                        .choose_multiple(&mut rand::thread_rng(), selected_num_blocks)
447                        .copied()
448                        .collect::<Vec<_>>();
449                }
450                let ancestor_blocks = self.dag_state.read().get_blocks(&missing_ancestors);
451                blocks.extend(ancestor_blocks.into_iter().flatten());
452            } else {
453                // Get additional blocks from authorities with missing block, if they are available in cache.
454                // Compute the lowest missing round per requested authority.
455                let mut lowest_missing_rounds = BTreeMap::<AuthorityIndex, Round>::new();
456                for block_ref in blocks.iter().map(|b| b.reference()) {
457                    let entry = lowest_missing_rounds
458                        .entry(block_ref.author)
459                        .or_insert(block_ref.round);
460                    *entry = (*entry).min(block_ref.round);
461                }
462
463                // Retrieve additional blocks per authority, from peer's highest accepted round + 1 to
464                // lowest missing round (exclusive) per requested authority.
465                // No block from other authorities are retrieved. It is possible that the requestor is not
466                // seeing missing block from another authority, and serving a block would just lead to unnecessary
467                // data transfer. Or missing blocks from other authorities are requested from other peers.
468                let dag_state = self.dag_state.read();
469                for (authority, lowest_missing_round) in lowest_missing_rounds {
470                    let highest_accepted_round = highest_accepted_rounds[authority];
471                    if highest_accepted_round >= lowest_missing_round {
472                        continue;
473                    }
474                    let missing_blocks = dag_state.get_cached_blocks_in_range(
475                        authority,
476                        highest_accepted_round + 1,
477                        lowest_missing_round,
478                        self.context
479                            .parameters
480                            .max_blocks_per_sync
481                            .saturating_sub(blocks.len()),
482                    );
483                    blocks.extend(missing_blocks);
484                    if blocks.len() >= self.context.parameters.max_blocks_per_sync {
485                        blocks.truncate(self.context.parameters.max_blocks_per_sync);
486                        break;
487                    }
488                }
489            }
490
491            blocks
492        } else {
493            self.dag_state
494                .read()
495                .get_blocks(&block_refs)
496                .into_iter()
497                .flatten()
498                .collect()
499        };
500
501        // Return the serialized blocks
502        let bytes = blocks
503            .into_iter()
504            .map(|block| block.serialized().clone())
505            .collect::<Vec<_>>();
506        Ok(bytes)
507    }
508
509    async fn handle_fetch_commits(
510        &self,
511        _peer: AuthorityIndex,
512        commit_range: CommitRange,
513    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)> {
514        fail_point_async!("consensus-rpc-response");
515
516        // Compute an inclusive end index and bound the maximum number of commits scanned.
517        let inclusive_end = commit_range.end().min(
518            commit_range.start() + self.context.parameters.commit_sync_batch_size as CommitIndex
519                - 1,
520        );
521        let mut commits = self
522            .store
523            .scan_commits((commit_range.start()..=inclusive_end).into())?;
524        let mut certifier_block_refs = vec![];
525        'commit: while let Some(c) = commits.last() {
526            let index = c.index();
527            let votes = self.store.read_commit_votes(index)?;
528            let mut stake_aggregator = StakeAggregator::<QuorumThreshold>::new();
529            for v in &votes {
530                stake_aggregator.add(v.author, &self.context.committee);
531            }
532            if stake_aggregator.reached_threshold(&self.context.committee) {
533                certifier_block_refs = votes;
534                break 'commit;
535            } else {
536                debug!(
537                    "Commit {} votes did not reach quorum to certify, {} < {}, skipping",
538                    index,
539                    stake_aggregator.stake(),
540                    stake_aggregator.threshold(&self.context.committee)
541                );
542                self.context
543                    .metrics
544                    .node_metrics
545                    .commit_sync_fetch_commits_handler_uncertified_skipped
546                    .inc();
547                commits.pop();
548            }
549        }
550        let certifier_blocks = self
551            .store
552            .read_blocks(&certifier_block_refs)?
553            .into_iter()
554            .flatten()
555            .collect();
556        Ok((commits, certifier_blocks))
557    }
558
559    async fn handle_fetch_latest_blocks(
560        &self,
561        peer: AuthorityIndex,
562        authorities: Vec<AuthorityIndex>,
563    ) -> ConsensusResult<Vec<Bytes>> {
564        fail_point_async!("consensus-rpc-response");
565
566        if authorities.len() > self.context.committee.size() {
567            return Err(ConsensusError::TooManyAuthoritiesProvided(peer));
568        }
569
570        // Ensure that those are valid authorities
571        for authority in &authorities {
572            if !self.context.committee.is_valid_index(*authority) {
573                return Err(ConsensusError::InvalidAuthorityIndex {
574                    index: *authority,
575                    max: self.context.committee.size(),
576                });
577            }
578        }
579
580        // Read from the dag state to find the latest blocks.
581        // TODO: at the moment we don't look into the block manager for suspended blocks. Ideally we
582        // want in the future if we think we would like to tackle the majority of cases.
583        let mut blocks = vec![];
584        let dag_state = self.dag_state.read();
585        for authority in authorities {
586            let block = dag_state.get_last_block_for_authority(authority);
587
588            debug!("Latest block for {authority}: {block:?} as requested from {peer}");
589
590            // no reason to serve back the genesis block - it's equal as if it has not received any block
591            if block.round() != GENESIS_ROUND {
592                blocks.push(block);
593            }
594        }
595
596        // Return the serialised blocks
597        let result = blocks
598            .into_iter()
599            .map(|block| block.serialized().clone())
600            .collect::<Vec<_>>();
601
602        Ok(result)
603    }
604
605    async fn handle_get_latest_rounds(
606        &self,
607        _peer: AuthorityIndex,
608    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
609        fail_point_async!("consensus-rpc-response");
610
611        let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
612
613        let blocks = self
614            .dag_state
615            .read()
616            .get_last_cached_block_per_authority(Round::MAX);
617        let highest_accepted_rounds = blocks
618            .into_iter()
619            .map(|(block, _)| block.round())
620            .collect::<Vec<_>>();
621
622        // Own blocks do not go through the core dispatcher, so they need to be set separately.
623        highest_received_rounds[self.context.own_index] =
624            highest_accepted_rounds[self.context.own_index];
625
626        Ok((highest_received_rounds, highest_accepted_rounds))
627    }
628}
629
630struct Counter {
631    count: usize,
632    subscriptions_by_authority: Vec<usize>,
633}
634
635/// Atomically counts the number of active subscriptions to the block broadcast stream.
636struct SubscriptionCounter {
637    context: Arc<Context>,
638    counter: parking_lot::Mutex<Counter>,
639}
640
641impl SubscriptionCounter {
642    fn new(context: Arc<Context>) -> Self {
643        // Set the subscribed peers by default to 0
644        for (_, authority) in context.committee.authorities() {
645            context
646                .metrics
647                .node_metrics
648                .subscribed_by
649                .with_label_values(&[authority.hostname.as_str()])
650                .set(0);
651        }
652
653        Self {
654            counter: parking_lot::Mutex::new(Counter {
655                count: 0,
656                subscriptions_by_authority: vec![0; context.committee.size()],
657            }),
658            context,
659        }
660    }
661
662    fn increment(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
663        let mut counter = self.counter.lock();
664        counter.count += 1;
665        counter.subscriptions_by_authority[peer] += 1;
666
667        let peer_hostname = &self.context.committee.authority(peer).hostname;
668        self.context
669            .metrics
670            .node_metrics
671            .subscribed_by
672            .with_label_values(&[peer_hostname])
673            .set(1);
674
675        Ok(())
676    }
677
678    fn decrement(&self, peer: AuthorityIndex) -> Result<(), ConsensusError> {
679        let mut counter = self.counter.lock();
680        counter.count -= 1;
681        counter.subscriptions_by_authority[peer] -= 1;
682
683        if counter.subscriptions_by_authority[peer] == 0 {
684            let peer_hostname = &self.context.committee.authority(peer).hostname;
685            self.context
686                .metrics
687                .node_metrics
688                .subscribed_by
689                .with_label_values(&[peer_hostname])
690                .set(0);
691        }
692
693        Ok(())
694    }
695}
696
697/// Each broadcasted block stream wraps a broadcast receiver for blocks.
698/// It yields blocks that are broadcasted after the stream is created.
699type BroadcastedBlockStream = BroadcastStream<ExtendedBlock>;
700
701/// Adapted from `tokio_stream::wrappers::BroadcastStream`. The main difference is that
702/// this tolerates lags with only logging, without yielding errors.
703struct BroadcastStream<T> {
704    peer: AuthorityIndex,
705    // Stores the receiver across poll_next() calls.
706    inner: ReusableBoxFuture<
707        'static,
708        (
709            Result<T, broadcast::error::RecvError>,
710            broadcast::Receiver<T>,
711        ),
712    >,
713    // Counts total subscriptions / active BroadcastStreams.
714    subscription_counter: Arc<SubscriptionCounter>,
715}
716
717impl<T: 'static + Clone + Send> BroadcastStream<T> {
718    pub fn new(
719        peer: AuthorityIndex,
720        rx: broadcast::Receiver<T>,
721        subscription_counter: Arc<SubscriptionCounter>,
722    ) -> Self {
723        if let Err(err) = subscription_counter.increment(peer) {
724            match err {
725                ConsensusError::Shutdown => {}
726                _ => panic!("Unexpected error: {err}"),
727            }
728        }
729        Self {
730            peer,
731            inner: ReusableBoxFuture::new(make_recv_future(rx)),
732            subscription_counter,
733        }
734    }
735}
736
737impl<T: 'static + Clone + Send> Stream for BroadcastStream<T> {
738    type Item = T;
739
740    fn poll_next(
741        mut self: Pin<&mut Self>,
742        cx: &mut task::Context<'_>,
743    ) -> task::Poll<Option<Self::Item>> {
744        let peer = self.peer;
745        let maybe_item = loop {
746            let (result, rx) = ready!(self.inner.poll(cx));
747            self.inner.set(make_recv_future(rx));
748
749            match result {
750                Ok(item) => break Some(item),
751                Err(broadcast::error::RecvError::Closed) => {
752                    info!("Block BroadcastedBlockStream {} closed", peer);
753                    break None;
754                }
755                Err(broadcast::error::RecvError::Lagged(n)) => {
756                    warn!(
757                        "Block BroadcastedBlockStream {} lagged by {} messages",
758                        peer, n
759                    );
760                    continue;
761                }
762            }
763        };
764        task::Poll::Ready(maybe_item)
765    }
766}
767
768impl<T> Drop for BroadcastStream<T> {
769    fn drop(&mut self) {
770        if let Err(err) = self.subscription_counter.decrement(self.peer) {
771            match err {
772                ConsensusError::Shutdown => {}
773                _ => panic!("Unexpected error: {err}"),
774            }
775        }
776    }
777}
778
779async fn make_recv_future<T: Clone>(
780    mut rx: broadcast::Receiver<T>,
781) -> (
782    Result<T, broadcast::error::RecvError>,
783    broadcast::Receiver<T>,
784) {
785    let result = rx.recv().await;
786    (result, rx)
787}
788
789// TODO: add a unit test for BroadcastStream.
790
791#[cfg(test)]
792mod tests {
793    use std::{
794        collections::{BTreeMap, BTreeSet},
795        sync::Arc,
796        time::Duration,
797    };
798
799    use async_trait::async_trait;
800    use bytes::Bytes;
801    use consensus_config::AuthorityIndex;
802    use consensus_types::block::{BlockDigest, BlockRef, Round};
803    use mysten_metrics::monitored_mpsc;
804    use parking_lot::{Mutex, RwLock};
805    use tokio::{sync::broadcast, time::sleep};
806
807    use crate::{
808        authority_service::AuthorityService,
809        block::{BlockAPI, SignedBlock, TestBlock, VerifiedBlock},
810        commit::{CertifiedCommits, CommitRange},
811        commit_vote_monitor::CommitVoteMonitor,
812        context::Context,
813        core_thread::{CoreError, CoreThreadDispatcher},
814        dag_state::DagState,
815        error::ConsensusResult,
816        network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
817        round_tracker::PeerRoundTracker,
818        storage::mem_store::MemStore,
819        synchronizer::Synchronizer,
820        test_dag_builder::DagBuilder,
821        transaction_certifier::TransactionCertifier,
822    };
823    struct FakeCoreThreadDispatcher {
824        blocks: Mutex<Vec<VerifiedBlock>>,
825    }
826
827    impl FakeCoreThreadDispatcher {
828        fn new() -> Self {
829            Self {
830                blocks: Mutex::new(vec![]),
831            }
832        }
833
834        fn get_blocks(&self) -> Vec<VerifiedBlock> {
835            self.blocks.lock().clone()
836        }
837    }
838
839    #[async_trait]
840    impl CoreThreadDispatcher for FakeCoreThreadDispatcher {
841        async fn add_blocks(
842            &self,
843            blocks: Vec<VerifiedBlock>,
844        ) -> Result<BTreeSet<BlockRef>, CoreError> {
845            let block_refs = blocks.iter().map(|b| b.reference()).collect();
846            self.blocks.lock().extend(blocks);
847            Ok(block_refs)
848        }
849
850        async fn check_block_refs(
851            &self,
852            _block_refs: Vec<BlockRef>,
853        ) -> Result<BTreeSet<BlockRef>, CoreError> {
854            Ok(BTreeSet::new())
855        }
856
857        async fn add_certified_commits(
858            &self,
859            _commits: CertifiedCommits,
860        ) -> Result<BTreeSet<BlockRef>, CoreError> {
861            todo!()
862        }
863
864        async fn new_block(&self, _round: Round, _force: bool) -> Result<(), CoreError> {
865            Ok(())
866        }
867
868        async fn get_missing_blocks(&self) -> Result<BTreeSet<BlockRef>, CoreError> {
869            Ok(Default::default())
870        }
871
872        fn set_propagation_delay(&self, _propagation_delay: Round) -> Result<(), CoreError> {
873            todo!()
874        }
875
876        fn set_last_known_proposed_round(&self, _round: Round) -> Result<(), CoreError> {
877            todo!()
878        }
879
880        fn highest_received_rounds(&self) -> Vec<Round> {
881            todo!()
882        }
883    }
884
885    #[derive(Default)]
886    struct FakeNetworkClient {}
887
888    #[async_trait]
889    impl NetworkClient for FakeNetworkClient {
890        async fn send_block(
891            &self,
892            _peer: AuthorityIndex,
893            _block: &VerifiedBlock,
894            _timeout: Duration,
895        ) -> ConsensusResult<()> {
896            unimplemented!("Unimplemented")
897        }
898
899        async fn subscribe_blocks(
900            &self,
901            _peer: AuthorityIndex,
902            _last_received: Round,
903            _timeout: Duration,
904        ) -> ConsensusResult<BlockStream> {
905            unimplemented!("Unimplemented")
906        }
907
908        async fn fetch_blocks(
909            &self,
910            _peer: AuthorityIndex,
911            _block_refs: Vec<BlockRef>,
912            _highest_accepted_rounds: Vec<Round>,
913            _breadth_first: bool,
914            _timeout: Duration,
915        ) -> ConsensusResult<Vec<Bytes>> {
916            unimplemented!("Unimplemented")
917        }
918
919        async fn fetch_commits(
920            &self,
921            _peer: AuthorityIndex,
922            _commit_range: CommitRange,
923            _timeout: Duration,
924        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
925            unimplemented!("Unimplemented")
926        }
927
928        async fn fetch_latest_blocks(
929            &self,
930            _peer: AuthorityIndex,
931            _authorities: Vec<AuthorityIndex>,
932            _timeout: Duration,
933        ) -> ConsensusResult<Vec<Bytes>> {
934            unimplemented!("Unimplemented")
935        }
936
937        async fn get_latest_rounds(
938            &self,
939            _peer: AuthorityIndex,
940            _timeout: Duration,
941        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
942            unimplemented!("Unimplemented")
943        }
944    }
945
946    #[tokio::test(flavor = "current_thread", start_paused = true)]
947    async fn test_handle_send_block() {
948        let (context, _keys) = Context::new_for_test(4);
949        let context = Arc::new(context);
950        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
951        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
952        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
953        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
954        let network_client = Arc::new(FakeNetworkClient::default());
955        let (blocks_sender, _blocks_receiver) =
956            monitored_mpsc::unbounded_channel("consensus_block_output");
957        let store = Arc::new(MemStore::new());
958        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
959        let transaction_certifier = TransactionCertifier::new(
960            context.clone(),
961            block_verifier.clone(),
962            dag_state.clone(),
963            blocks_sender,
964        );
965        let synchronizer = Synchronizer::start(
966            network_client,
967            context.clone(),
968            core_dispatcher.clone(),
969            commit_vote_monitor.clone(),
970            block_verifier.clone(),
971            transaction_certifier.clone(),
972            dag_state.clone(),
973            false,
974        );
975        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
976        let authority_service = Arc::new(AuthorityService::new(
977            context.clone(),
978            block_verifier,
979            commit_vote_monitor,
980            round_tracker,
981            synchronizer,
982            core_dispatcher.clone(),
983            rx_block_broadcast,
984            transaction_certifier,
985            dag_state,
986            store,
987        ));
988
989        // Test delaying blocks with time drift.
990        let now = context.clock.timestamp_utc_ms();
991        let max_drift = context.parameters.max_forward_time_drift;
992        let input_block = VerifiedBlock::new_for_test(
993            TestBlock::new(9, 0)
994                .set_timestamp_ms(now + max_drift.as_millis() as u64)
995                .build(),
996        );
997
998        let service = authority_service.clone();
999        let serialized = ExtendedSerializedBlock {
1000            block: input_block.serialized().clone(),
1001            excluded_ancestors: vec![],
1002        };
1003
1004        tokio::spawn({
1005            let service = service.clone();
1006            let context = context.clone();
1007            async move {
1008                service
1009                    .handle_send_block(context.committee.to_authority_index(0).unwrap(), serialized)
1010                    .await
1011                    .unwrap();
1012            }
1013        });
1014
1015        sleep(max_drift / 2).await;
1016
1017        let blocks = core_dispatcher.get_blocks();
1018        assert_eq!(blocks.len(), 1);
1019        assert_eq!(blocks[0], input_block);
1020
1021        // Test invalid block.
1022        let invalid_block =
1023            VerifiedBlock::new_for_test(TestBlock::new(10, 1000).set_timestamp_ms(10).build());
1024        let extended_block = ExtendedSerializedBlock {
1025            block: invalid_block.serialized().clone(),
1026            excluded_ancestors: vec![],
1027        };
1028        service
1029            .handle_send_block(
1030                context.committee.to_authority_index(0).unwrap(),
1031                extended_block,
1032            )
1033            .await
1034            .unwrap_err();
1035
1036        // Test invalid excluded ancestors.
1037        let invalid_excluded_ancestors = vec![
1038            bcs::to_bytes(&BlockRef::new(
1039                10,
1040                AuthorityIndex::new_for_test(1000),
1041                BlockDigest::MIN,
1042            ))
1043            .unwrap(),
1044            vec![3u8; 40],
1045            bcs::to_bytes(&invalid_block.reference()).unwrap(),
1046        ];
1047        let extended_block = ExtendedSerializedBlock {
1048            block: input_block.serialized().clone(),
1049            excluded_ancestors: invalid_excluded_ancestors,
1050        };
1051        service
1052            .handle_send_block(
1053                context.committee.to_authority_index(0).unwrap(),
1054                extended_block,
1055            )
1056            .await
1057            .unwrap_err();
1058    }
1059
1060    #[tokio::test(flavor = "current_thread", start_paused = true)]
1061    async fn test_handle_fetch_blocks() {
1062        // GIVEN
1063        // Use NUM_AUTHORITIES and NUM_ROUNDS higher than max_blocks_per_sync to test limits.
1064        const NUM_AUTHORITIES: usize = 40;
1065        const NUM_ROUNDS: usize = 40;
1066        let (context, _keys) = Context::new_for_test(NUM_AUTHORITIES);
1067        let context = Arc::new(context);
1068        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1069        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1070        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1071        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1072        let network_client = Arc::new(FakeNetworkClient::default());
1073        let (blocks_sender, _blocks_receiver) =
1074            monitored_mpsc::unbounded_channel("consensus_block_output");
1075        let store = Arc::new(MemStore::new());
1076        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1077        let transaction_certifier = TransactionCertifier::new(
1078            context.clone(),
1079            block_verifier.clone(),
1080            dag_state.clone(),
1081            blocks_sender,
1082        );
1083        let synchronizer = Synchronizer::start(
1084            network_client,
1085            context.clone(),
1086            core_dispatcher.clone(),
1087            commit_vote_monitor.clone(),
1088            block_verifier.clone(),
1089            transaction_certifier.clone(),
1090            dag_state.clone(),
1091            false,
1092        );
1093        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1094        let authority_service = Arc::new(AuthorityService::new(
1095            context.clone(),
1096            block_verifier,
1097            commit_vote_monitor,
1098            round_tracker,
1099            synchronizer,
1100            core_dispatcher.clone(),
1101            rx_block_broadcast,
1102            transaction_certifier,
1103            dag_state.clone(),
1104            store,
1105        ));
1106
1107        // GIVEN: 40 rounds of blocks in the dag state.
1108        let mut dag_builder = DagBuilder::new(context.clone());
1109        dag_builder
1110            .layers(1..=(NUM_ROUNDS as u32))
1111            .build()
1112            .persist_layers(dag_state.clone());
1113        let all_blocks = dag_builder.all_blocks();
1114
1115        // WHEN: Request 2 blocks from round 40, get ancestors breadth first.
1116        let missing_block_refs: Vec<BlockRef> = all_blocks
1117            .iter()
1118            .rev()
1119            .take(2)
1120            .map(|b| b.reference())
1121            .collect();
1122        let highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1123        let results = authority_service
1124            .handle_fetch_blocks(
1125                AuthorityIndex::new_for_test(0),
1126                missing_block_refs.clone(),
1127                highest_accepted_rounds,
1128                true,
1129            )
1130            .await
1131            .unwrap();
1132
1133        // THEN: the expected number of unique blocks are returned.
1134        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1135            .iter()
1136            .map(|b| {
1137                let signed = bcs::from_bytes(b).unwrap();
1138                let block = VerifiedBlock::new_verified(signed, b.clone());
1139                (block.reference(), block)
1140            })
1141            .collect();
1142        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1143        // All missing blocks are returned.
1144        for b in &missing_block_refs {
1145            assert!(blocks.contains_key(b));
1146        }
1147        let num_missing_ancestors = blocks
1148            .keys()
1149            .filter(|b| b.round == NUM_ROUNDS as Round - 1)
1150            .count();
1151        assert_eq!(
1152            num_missing_ancestors,
1153            context.parameters.max_blocks_per_sync - missing_block_refs.len()
1154        );
1155
1156        // WHEN: Request 2 blocks from round 37, get ancestors depth first.
1157        let missing_round = NUM_ROUNDS as Round - 3;
1158        let missing_block_refs: Vec<BlockRef> = all_blocks
1159            .iter()
1160            .filter(|b| b.reference().round == missing_round)
1161            .map(|b| b.reference())
1162            .take(2)
1163            .collect();
1164        let mut highest_accepted_rounds: Vec<Round> = vec![1; NUM_AUTHORITIES];
1165        // Try to fill up the blocks from the 1st authority in missing_block_refs.
1166        highest_accepted_rounds[missing_block_refs[0].author] = missing_round - 5;
1167        let results = authority_service
1168            .handle_fetch_blocks(
1169                AuthorityIndex::new_for_test(0),
1170                missing_block_refs.clone(),
1171                highest_accepted_rounds,
1172                false,
1173            )
1174            .await
1175            .unwrap();
1176
1177        // THEN: the expected number of unique blocks are returned.
1178        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1179            .iter()
1180            .map(|b| {
1181                let signed = bcs::from_bytes(b).unwrap();
1182                let block = VerifiedBlock::new_verified(signed, b.clone());
1183                (block.reference(), block)
1184            })
1185            .collect();
1186        assert_eq!(blocks.len(), context.parameters.max_blocks_per_sync);
1187        // All missing blocks are returned.
1188        for b in &missing_block_refs {
1189            assert!(blocks.contains_key(b));
1190        }
1191        // Ancestor blocks are from the expected rounds and authorities.
1192        let expected_authors = [missing_block_refs[0].author, missing_block_refs[1].author];
1193        for b in blocks.keys() {
1194            assert!(b.round <= missing_round);
1195            assert!(expected_authors.contains(&b.author));
1196        }
1197
1198        // WHEN: Request 5 block from round 40, not getting ancestors.
1199        let missing_block_refs: Vec<BlockRef> = all_blocks
1200            .iter()
1201            .filter(|b| b.reference().round == NUM_ROUNDS as Round - 10)
1202            .map(|b| b.reference())
1203            .take(5)
1204            .collect();
1205        let results = authority_service
1206            .handle_fetch_blocks(
1207                AuthorityIndex::new_for_test(0),
1208                missing_block_refs.clone(),
1209                vec![],
1210                false,
1211            )
1212            .await
1213            .unwrap();
1214
1215        // THEN: the expected number of unique blocks are returned.
1216        let blocks: BTreeMap<BlockRef, VerifiedBlock> = results
1217            .iter()
1218            .map(|b| {
1219                let signed = bcs::from_bytes(b).unwrap();
1220                let block = VerifiedBlock::new_verified(signed, b.clone());
1221                (block.reference(), block)
1222            })
1223            .collect();
1224        assert_eq!(blocks.len(), 5);
1225        for b in &missing_block_refs {
1226            assert!(blocks.contains_key(b));
1227        }
1228    }
1229
1230    #[tokio::test(flavor = "current_thread", start_paused = true)]
1231    async fn test_handle_fetch_latest_blocks() {
1232        // GIVEN
1233        let (context, _keys) = Context::new_for_test(4);
1234        let context = Arc::new(context);
1235        let block_verifier = Arc::new(crate::block_verifier::NoopBlockVerifier {});
1236        let commit_vote_monitor = Arc::new(CommitVoteMonitor::new(context.clone()));
1237        let core_dispatcher = Arc::new(FakeCoreThreadDispatcher::new());
1238        let (_tx_block_broadcast, rx_block_broadcast) = broadcast::channel(100);
1239        let network_client = Arc::new(FakeNetworkClient::default());
1240        let (blocks_sender, _blocks_receiver) =
1241            monitored_mpsc::unbounded_channel("consensus_block_output");
1242        let store = Arc::new(MemStore::new());
1243        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store.clone())));
1244        let transaction_certifier = TransactionCertifier::new(
1245            context.clone(),
1246            block_verifier.clone(),
1247            dag_state.clone(),
1248            blocks_sender,
1249        );
1250        let synchronizer = Synchronizer::start(
1251            network_client,
1252            context.clone(),
1253            core_dispatcher.clone(),
1254            commit_vote_monitor.clone(),
1255            block_verifier.clone(),
1256            transaction_certifier.clone(),
1257            dag_state.clone(),
1258            true,
1259        );
1260        let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));
1261        let authority_service = Arc::new(AuthorityService::new(
1262            context.clone(),
1263            block_verifier,
1264            commit_vote_monitor,
1265            round_tracker,
1266            synchronizer,
1267            core_dispatcher.clone(),
1268            rx_block_broadcast,
1269            transaction_certifier,
1270            dag_state.clone(),
1271            store,
1272        ));
1273
1274        // Create some blocks for a few authorities. Create some equivocations as well and store in dag state.
1275        let mut dag_builder = DagBuilder::new(context.clone());
1276        dag_builder
1277            .layers(1..=10)
1278            .authorities(vec![AuthorityIndex::new_for_test(2)])
1279            .equivocate(1)
1280            .build()
1281            .persist_layers(dag_state);
1282
1283        // WHEN
1284        let authorities_to_request = vec![
1285            AuthorityIndex::new_for_test(1),
1286            AuthorityIndex::new_for_test(2),
1287        ];
1288        let results = authority_service
1289            .handle_fetch_latest_blocks(AuthorityIndex::new_for_test(1), authorities_to_request)
1290            .await;
1291
1292        // THEN
1293        let serialised_blocks = results.unwrap();
1294        for serialised_block in serialised_blocks {
1295            let signed_block: SignedBlock =
1296                bcs::from_bytes(&serialised_block).expect("Error while deserialising block");
1297            let verified_block = VerifiedBlock::new_verified(signed_block, serialised_block);
1298
1299            assert_eq!(verified_block.round(), 10);
1300        }
1301    }
1302}