consensus_core/
proposed_block_handler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use mysten_metrics::monitored_scope;
use tokio::sync::broadcast;
use tracing::{trace, warn};

use crate::{block::ExtendedBlock, context::Context, transaction_certifier::TransactionCertifier};

/// Runs async processing logic for proposed blocks.
/// Currently it only call transaction certifier with proposed blocks.
/// In future, more logic related to proposing should be moved here, for example
/// flushing dag state.
pub(crate) struct ProposedBlockHandler {
    context: Arc<Context>,
    rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
    transaction_certifier: TransactionCertifier,
}

impl ProposedBlockHandler {
    pub(crate) fn new(
        context: Arc<Context>,
        rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
        transaction_certifier: TransactionCertifier,
    ) -> Self {
        Self {
            context,
            rx_block_broadcast,
            transaction_certifier,
        }
    }

    pub(crate) async fn run(&mut self) {
        loop {
            match self.rx_block_broadcast.recv().await {
                Ok(extended_block) => self.handle_proposed_block(extended_block),
                Err(broadcast::error::RecvError::Closed) => {
                    trace!("Handler is shutting down!");
                    return;
                }
                Err(broadcast::error::RecvError::Lagged(e)) => {
                    warn!("Handler is lagging! {e}");
                    // Re-run the loop to receive again.
                    continue;
                }
            };
        }
    }

    fn handle_proposed_block(&self, extended_block: ExtendedBlock) {
        if !self.context.protocol_config.mysticeti_fastpath() {
            return;
        }
        let _scope = monitored_scope("handle_proposed_block");
        // Run GC first to remove blocks that do not need be voted on.
        self.transaction_certifier.run_gc();
        self.transaction_certifier
            .add_proposed_block(extended_block.block.clone());
    }
}