consensus_core/commit_consumer.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::sync::Arc;
use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::watch;
use tracing::debug;
use crate::{block::CertifiedBlocksOutput, CommitIndex, CommittedSubDag};
/// Arguments from commit consumer to this consensus instance.
/// This includes both parameters and components for communications.
#[derive(Clone)]
pub struct CommitConsumerArgs {
/// The consumer requests consensus to replay from commit replay_after_commit_index + 1.
/// Set to 0 to replay from the start (as commit sequence starts at index = 1).
pub(crate) replay_after_commit_index: CommitIndex,
/// Index of the last commit that the consumer has processed. This is useful during
/// crash recovery when other components can wait for the consumer to finish processing
/// up to this index.
pub(crate) consumer_last_processed_commit_index: CommitIndex,
/// A channel to output the committed sub dags.
pub(crate) commit_sender: UnboundedSender<CommittedSubDag>,
/// A channel to output blocks for processing, separated from consensus commits.
/// In each block output, transactions that are not rejected are considered certified.
pub(crate) block_sender: UnboundedSender<CertifiedBlocksOutput>,
// Allows the commit consumer to report its progress.
monitor: Arc<CommitConsumerMonitor>,
}
impl CommitConsumerArgs {
pub fn new(
replay_after_commit_index: CommitIndex,
consumer_last_processed_commit_index: CommitIndex,
) -> (
Self,
UnboundedReceiver<CommittedSubDag>,
UnboundedReceiver<CertifiedBlocksOutput>,
) {
let (commit_sender, commit_receiver) = unbounded_channel("consensus_commit_output");
let (block_sender, block_receiver) = unbounded_channel("consensus_block_output");
let monitor = Arc::new(CommitConsumerMonitor::new(
replay_after_commit_index,
consumer_last_processed_commit_index,
));
(
Self {
replay_after_commit_index,
consumer_last_processed_commit_index,
commit_sender,
block_sender,
monitor,
},
commit_receiver,
block_receiver,
)
}
pub fn monitor(&self) -> Arc<CommitConsumerMonitor> {
self.monitor.clone()
}
}
/// Helps monitor the progress of the consensus commit handler (consumer).
///
/// This component currently has two use usages:
/// 1. Checking the highest commit index processed by the consensus commit handler.
/// Consensus components can decide to wait for more commits to be processed before proceeding with
/// their work.
/// 2. Waiting for consensus commit handler to finish processing replayed commits.
/// Current usage is actually outside of consensus.
pub struct CommitConsumerMonitor {
// highest commit that has been handled by the consumer.
highest_handled_commit: watch::Sender<u32>,
// At node startup, the last consensus commit processed by the commit consumer from the previous run.
// This can be 0 if starting a new epoch.
consumer_last_processed_commit_index: CommitIndex,
}
impl CommitConsumerMonitor {
pub(crate) fn new(
replay_after_commit_index: CommitIndex,
consumer_last_processed_commit_index: CommitIndex,
) -> Self {
Self {
highest_handled_commit: watch::Sender::new(replay_after_commit_index),
consumer_last_processed_commit_index,
}
}
/// Gets the highest commit index processed by the consensus commit handler.
pub fn highest_handled_commit(&self) -> CommitIndex {
*self.highest_handled_commit.borrow()
}
/// Updates the highest commit index processed by the consensus commit handler.
pub fn set_highest_handled_commit(&self, highest_handled_commit: CommitIndex) {
debug!("Highest handled commit set to {}", highest_handled_commit);
self.highest_handled_commit
.send_replace(highest_handled_commit);
}
/// Waits for consensus to replay commits until the consumer last processed commit index.
pub async fn replay_to_consumer_last_processed_commit_complete(&self) {
let mut rx = self.highest_handled_commit.subscribe();
loop {
let highest_handled = *rx.borrow_and_update();
if highest_handled >= self.consumer_last_processed_commit_index {
return;
}
rx.changed().await.unwrap();
}
}
}
#[cfg(test)]
mod test {
use crate::CommitConsumerMonitor;
#[test]
fn test_commit_consumer_monitor() {
let monitor = CommitConsumerMonitor::new(0, 10);
assert_eq!(monitor.highest_handled_commit(), 0);
monitor.set_highest_handled_commit(100);
assert_eq!(monitor.highest_handled_commit(), 100);
}
}