use std::{sync::Arc, time::Duration};
use mysten_metrics::monitored_mpsc::UnboundedSender;
use parking_lot::RwLock;
use tokio::time::Instant;
use tracing::{debug, info};
use crate::{
block::{BlockAPI, VerifiedBlock},
commit::{load_committed_subdag_from_store, CommitAPI, CommitIndex},
context::Context,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
linearizer::Linearizer,
storage::Store,
CommitConsumer, CommittedSubDag,
};
pub(crate) struct CommitObserver {
context: Arc<Context>,
commit_interpreter: Linearizer,
commit_sender: UnboundedSender<CommittedSubDag>,
store: Arc<dyn Store>,
leader_schedule: Arc<LeaderSchedule>,
}
impl CommitObserver {
pub(crate) fn new(
context: Arc<Context>,
commit_consumer: CommitConsumer,
dag_state: Arc<RwLock<DagState>>,
store: Arc<dyn Store>,
leader_schedule: Arc<LeaderSchedule>,
) -> Self {
let commit_interpreter =
Linearizer::new(context.clone(), dag_state.clone(), leader_schedule.clone());
let mut observer = Self {
context,
commit_interpreter,
commit_sender: commit_consumer.commit_sender,
store,
leader_schedule,
};
observer.recover_and_send_commits(commit_consumer.last_processed_commit_index);
observer
}
pub(crate) fn handle_commit(
&mut self,
committed_leaders: Vec<VerifiedBlock>,
) -> ConsensusResult<Vec<CommittedSubDag>> {
let _s = self
.context
.metrics
.node_metrics
.scope_processing_time
.with_label_values(&["CommitObserver::handle_commit"])
.start_timer();
let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
let mut sent_sub_dags = Vec::with_capacity(committed_sub_dags.len());
for committed_sub_dag in committed_sub_dags.into_iter() {
if let Err(err) = self.commit_sender.send(committed_sub_dag.clone()) {
tracing::warn!(
"Failed to send committed sub-dag, probably due to shutdown: {err:?}"
);
return Err(ConsensusError::Shutdown);
}
tracing::debug!(
"Sending to execution commit {} leader {}",
committed_sub_dag.commit_ref,
committed_sub_dag.leader
);
sent_sub_dags.push(committed_sub_dag);
}
self.report_metrics(&sent_sub_dags);
tracing::trace!("Committed & sent {sent_sub_dags:#?}");
Ok(sent_sub_dags)
}
fn recover_and_send_commits(&mut self, last_processed_commit_index: CommitIndex) {
let now = Instant::now();
let last_commit = self
.store
.read_last_commit()
.expect("Reading the last commit should not fail");
if let Some(last_commit) = &last_commit {
let last_commit_index = last_commit.index();
assert!(last_commit_index >= last_processed_commit_index);
if last_commit_index == last_processed_commit_index {
debug!("Nothing to recover for commit observer as commit index {last_commit_index} = {last_processed_commit_index} last processed index");
return;
}
};
let unsent_commits = self
.store
.scan_commits(((last_processed_commit_index + 1)..=CommitIndex::MAX).into())
.expect("Scanning commits should not fail");
info!("Recovering commit observer after index {last_processed_commit_index} with last commit {} and {} unsent commits", last_commit.map(|c|c.index()).unwrap_or_default(), unsent_commits.len());
let mut last_sent_commit_index = last_processed_commit_index;
let num_unsent_commits = unsent_commits.len();
for (index, commit) in unsent_commits.into_iter().enumerate() {
assert_eq!(commit.index(), last_sent_commit_index + 1);
let reputation_scores = if index == num_unsent_commits - 1 {
self.leader_schedule
.leader_swap_table
.read()
.reputation_scores_desc
.clone()
} else {
vec![]
};
info!("Sending commit {} during recovery", commit.index());
let committed_sub_dag =
load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
self.commit_sender
.send(committed_sub_dag)
.unwrap_or_else(|e| {
panic!(
"Failed to send commit during recovery, probably due to shutdown: {:?}",
e
)
});
last_sent_commit_index += 1;
}
info!(
"Commit observer recovery completed, took {:?}",
now.elapsed()
);
}
fn report_metrics(&self, committed: &[CommittedSubDag]) {
let metrics = &self.context.metrics.node_metrics;
let utc_now = self.context.clock.timestamp_utc_ms();
for commit in committed {
info!(
"Consensus commit {} with leader {} has {} blocks",
commit.commit_ref,
commit.leader,
commit.blocks.len()
);
metrics
.last_committed_leader_round
.set(commit.leader.round as i64);
metrics
.last_commit_index
.set(commit.commit_ref.index as i64);
metrics
.blocks_per_commit_count
.observe(commit.blocks.len() as f64);
for block in &commit.blocks {
let latency_ms = utc_now
.checked_sub(block.timestamp_ms())
.unwrap_or_default();
metrics
.block_commit_latency
.observe(Duration::from_millis(latency_ms).as_secs_f64());
}
}
self.context
.metrics
.node_metrics
.sub_dags_per_commit_count
.observe(committed.len() as f64);
}
}
#[cfg(test)]
mod tests {
use mysten_metrics::monitored_mpsc::UnboundedReceiver;
use parking_lot::RwLock;
use rstest::rstest;
use super::*;
use crate::{
block::BlockRef, context::Context, dag_state::DagState,
linearizer::median_timestamp_by_stake, storage::mem_store::MemStore,
test_dag_builder::DagBuilder,
};
#[rstest]
#[tokio::test]
async fn test_handle_commit(#[values(true, false)] consensus_median_timestamp: bool) {
telemetry_subscribers::init_for_testing();
let num_authorities = 4;
let (mut context, _keys) = Context::new_for_test(num_authorities);
context
.protocol_config
.set_consensus_median_based_commit_timestamp_for_testing(consensus_median_timestamp);
let context = Arc::new(context);
let mem_store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let mut observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
);
let num_rounds = 10;
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());
let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();
let commits = observer.handle_commit(leaders.clone()).unwrap();
let mut expected_stored_refs: Vec<BlockRef> = vec![];
for (idx, subdag) in commits.iter().enumerate() {
tracing::info!("{subdag:?}");
assert_eq!(subdag.leader, leaders[idx].reference());
let expected_ts = if consensus_median_timestamp {
let block_refs = leaders[idx]
.ancestors()
.iter()
.filter(|block_ref| block_ref.round == leaders[idx].round() - 1)
.cloned()
.collect::<Vec<_>>();
let blocks = dag_state
.read()
.get_blocks(&block_refs)
.into_iter()
.map(|block_opt| block_opt.expect("We should have all blocks in dag state."));
median_timestamp_by_stake(&context, blocks).unwrap()
} else {
leaders[idx].timestamp_ms()
};
let expected_ts = if idx == 0 {
expected_ts
} else {
expected_ts.max(commits[idx - 1].timestamp_ms)
};
assert_eq!(expected_ts, subdag.timestamp_ms);
if idx == 0 {
assert_eq!(subdag.blocks.len(), 1);
} else {
assert_eq!(subdag.blocks.len(), num_authorities);
}
for block in subdag.blocks.iter() {
expected_stored_refs.push(block.reference());
assert!(block.round() <= leaders[idx].round());
}
assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
}
let mut processed_subdag_index = 0;
while let Ok(subdag) = commit_receiver.try_recv() {
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == leaders.len() {
break;
}
}
assert_eq!(processed_subdag_index, leaders.len());
verify_channel_empty(&mut commit_receiver);
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
assert_eq!(
last_commit.index(),
commits.last().unwrap().commit_ref.index
);
let all_stored_commits = mem_store
.scan_commits((0..=CommitIndex::MAX).into())
.unwrap();
assert_eq!(all_stored_commits.len(), leaders.len());
let blocks_existence = mem_store.contains_blocks(&expected_stored_refs).unwrap();
assert!(blocks_existence.iter().all(|exists| *exists));
}
#[tokio::test]
async fn test_recover_and_send_commits() {
telemetry_subscribers::init_for_testing();
let num_authorities = 4;
let context = Arc::new(Context::new_for_test(num_authorities).0);
let mem_store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let mut observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule.clone(),
);
let num_rounds = 10;
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());
let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();
let expected_last_processed_index: usize = 2;
let mut commits = observer
.handle_commit(
leaders
.clone()
.into_iter()
.take(expected_last_processed_index)
.collect::<Vec<_>>(),
)
.unwrap();
let mut processed_subdag_index = 0;
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
}
}
assert_eq!(processed_subdag_index, expected_last_processed_index);
verify_channel_empty(&mut commit_receiver);
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
assert_eq!(
last_commit.index(),
expected_last_processed_index as CommitIndex
);
commits.append(
&mut observer
.handle_commit(
leaders
.clone()
.into_iter()
.skip(expected_last_processed_index)
.collect::<Vec<_>>(),
)
.unwrap(),
);
let expected_last_sent_index = num_rounds as usize;
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("{subdag} was sent but not processed by consumer");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
}
}
assert_eq!(processed_subdag_index, expected_last_sent_index);
verify_channel_empty(&mut commit_receiver);
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
assert_eq!(last_commit.index(), expected_last_sent_index as CommitIndex);
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(expected_last_processed_index as CommitIndex);
let _observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
);
processed_subdag_index = expected_last_processed_index;
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag} on resubmission");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
}
}
assert_eq!(processed_subdag_index, expected_last_sent_index);
verify_channel_empty(&mut commit_receiver);
}
#[tokio::test]
async fn test_send_no_missing_commits() {
telemetry_subscribers::init_for_testing();
let num_authorities = 4;
let context = Arc::new(Context::new_for_test(num_authorities).0);
let mem_store = Arc::new(MemStore::new());
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
mem_store.clone(),
)));
let last_processed_commit_index = 0;
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(last_processed_commit_index);
let leader_schedule = Arc::new(LeaderSchedule::from_store(
context.clone(),
dag_state.clone(),
));
let mut observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule.clone(),
);
let num_rounds = 10;
let mut builder = DagBuilder::new(context.clone());
builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());
let leaders = builder
.leader_blocks(1..=num_rounds)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();
let expected_last_processed_index: usize = 10;
let commits = observer.handle_commit(leaders.clone()).unwrap();
let mut processed_subdag_index = 0;
while let Ok(subdag) = commit_receiver.try_recv() {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
}
}
assert_eq!(processed_subdag_index, expected_last_processed_index);
verify_channel_empty(&mut commit_receiver);
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
assert_eq!(
last_commit.index(),
expected_last_processed_index as CommitIndex
);
let (commit_consumer, mut commit_receiver, _transaction_receiver) =
CommitConsumer::new(expected_last_processed_index as CommitIndex);
let _observer = CommitObserver::new(
context.clone(),
commit_consumer,
dag_state.clone(),
mem_store.clone(),
leader_schedule,
);
verify_channel_empty(&mut commit_receiver);
}
fn verify_channel_empty(receiver: &mut UnboundedReceiver<CommittedSubDag>) {
match receiver.try_recv() {
Ok(_) => {
panic!("Expected the consensus output channel to be empty, but found more subdags.")
}
Err(e) => match e {
tokio::sync::mpsc::error::TryRecvError::Empty => {}
tokio::sync::mpsc::error::TryRecvError::Disconnected => {
panic!("The consensus output channel was unexpectedly closed.")
}
},
}
}
}