use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
use consensus_types::block::Round;
use mysten_common::sync::notify_read::NotifyRead;
use parking_lot::{RwLock, RwLockWriteGuard};
use sui_types::{
error::{SuiError, SuiResult},
messages_consensus::ConsensusPosition,
};
use tokio::sync::watch;
use tracing::debug;
pub(crate) const CONSENSUS_STATUS_RETENTION_ROUNDS: u32 = 400;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum ConsensusTxStatus {
FastpathCertified,
Rejected,
Finalized,
}
#[derive(Debug, Clone)]
pub(crate) enum NotifyReadConsensusTxStatusResult {
Status(ConsensusTxStatus),
Expired(u32),
}
pub(crate) struct ConsensusTxStatusCache {
consensus_gc_depth: u32,
inner: RwLock<Inner>,
status_notify_read: NotifyRead<ConsensusPosition, ConsensusTxStatus>,
last_committed_leader_round_tx: watch::Sender<Option<u32>>,
last_committed_leader_round_rx: watch::Receiver<Option<u32>>,
}
#[derive(Default)]
struct Inner {
transaction_status: BTreeMap<ConsensusPosition, ConsensusTxStatus>,
fastpath_certified: BTreeSet<ConsensusPosition>,
last_committed_leader_round: Option<Round>,
}
impl ConsensusTxStatusCache {
pub(crate) fn new(consensus_gc_depth: Round) -> Self {
assert!(
consensus_gc_depth < CONSENSUS_STATUS_RETENTION_ROUNDS,
"{} vs {}",
consensus_gc_depth,
CONSENSUS_STATUS_RETENTION_ROUNDS
);
let (last_committed_leader_round_tx, last_committed_leader_round_rx) = watch::channel(None);
Self {
consensus_gc_depth,
inner: Default::default(),
status_notify_read: Default::default(),
last_committed_leader_round_tx,
last_committed_leader_round_rx,
}
}
pub(crate) fn set_transaction_status(&self, pos: ConsensusPosition, status: ConsensusTxStatus) {
if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow() {
if pos.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= last_committed_leader_round {
return;
}
}
let mut inner = self.inner.write();
self.set_transaction_status_inner(&mut inner, pos, status);
}
fn set_transaction_status_inner(
&self,
inner: &mut RwLockWriteGuard<Inner>,
pos: ConsensusPosition,
status: ConsensusTxStatus,
) {
let status_entry = inner.transaction_status.entry(pos);
match status_entry {
Entry::Vacant(entry) => {
entry.insert(status);
if status == ConsensusTxStatus::FastpathCertified {
assert!(inner.fastpath_certified.insert(pos));
}
}
Entry::Occupied(mut entry) => {
let old_status = *entry.get();
match (old_status, status) {
(s1, s2) if s1 == s2 => return,
(ConsensusTxStatus::FastpathCertified, _) => {
entry.insert(status);
if old_status == ConsensusTxStatus::FastpathCertified {
assert!(inner.fastpath_certified.remove(&pos));
}
}
(
ConsensusTxStatus::Rejected | ConsensusTxStatus::Finalized,
ConsensusTxStatus::FastpathCertified,
) => {
return;
}
_ => {
panic!(
"Conflicting status updates for transaction {:?}: {:?} -> {:?}",
pos, old_status, status
);
}
}
}
};
debug!("Transaction status is set for {}: {:?}", pos, status);
self.status_notify_read.notify(&pos, &status);
}
pub(crate) async fn notify_read_transaction_status_change(
&self,
consensus_position: ConsensusPosition,
old_status: Option<ConsensusTxStatus>,
) -> NotifyReadConsensusTxStatusResult {
let registration = self.status_notify_read.register_one(&consensus_position);
let mut round_rx = self.last_committed_leader_round_rx.clone();
{
let inner = self.inner.read();
if let Some(status) = inner.transaction_status.get(&consensus_position) {
if Some(status) != old_status.as_ref() {
if let Some(old_status) = old_status {
assert_eq!(old_status, ConsensusTxStatus::FastpathCertified);
}
return NotifyReadConsensusTxStatusResult::Status(*status);
}
}
}
let expiration_check = async {
loop {
if let Some(last_committed_leader_round) = *round_rx.borrow() {
if consensus_position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS
<= last_committed_leader_round
{
return last_committed_leader_round;
}
}
round_rx
.changed()
.await
.expect("last_committed_leader_round watch channel closed unexpectedly");
}
};
tokio::select! {
status = registration => NotifyReadConsensusTxStatusResult::Status(status),
last_committed_leader_round = expiration_check => NotifyReadConsensusTxStatusResult::Expired(last_committed_leader_round),
}
}
pub(crate) async fn update_last_committed_leader_round(
&self,
last_committed_leader_round: u32,
) {
debug!(
"Updating last committed leader round: {}",
last_committed_leader_round
);
let mut inner = self.inner.write();
let Some(leader_round) = inner
.last_committed_leader_round
.replace(last_committed_leader_round)
else {
return;
};
while let Some((position, _)) = inner.transaction_status.first_key_value() {
if position.block.round + CONSENSUS_STATUS_RETENTION_ROUNDS <= leader_round {
let (pos, status) = inner.transaction_status.pop_first().unwrap();
if status == ConsensusTxStatus::FastpathCertified {
assert!(inner.fastpath_certified.remove(&pos));
}
} else {
break;
}
}
while let Some(position) = inner.fastpath_certified.first().cloned() {
if position.block.round + self.consensus_gc_depth <= leader_round {
self.set_transaction_status_inner(
&mut inner,
position,
ConsensusTxStatus::Rejected,
);
} else {
break;
}
}
let _ = self.last_committed_leader_round_tx.send(Some(leader_round));
}
pub(crate) fn get_last_committed_leader_round(&self) -> Option<u32> {
*self.last_committed_leader_round_rx.borrow()
}
pub(crate) fn get_num_fastpath_certified(&self) -> usize {
self.inner.read().fastpath_certified.len()
}
pub(crate) fn check_position_too_ahead(&self, position: &ConsensusPosition) -> SuiResult<()> {
if let Some(last_committed_leader_round) = *self.last_committed_leader_round_rx.borrow() {
if position.block.round
> last_committed_leader_round + CONSENSUS_STATUS_RETENTION_ROUNDS
{
return Err(SuiError::ValidatorConsensusLagging {
round: position.block.round,
last_committed_round: last_committed_leader_round,
});
}
}
Ok(())
}
#[cfg(test)]
pub(crate) fn get_transaction_status(
&self,
position: &ConsensusPosition,
) -> Option<ConsensusTxStatus> {
let inner = self.inner.read();
inner.transaction_status.get(position).cloned()
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use super::*;
use consensus_types::block::{BlockRef, TransactionIndex};
fn create_test_tx_position(round: u64, index: u64) -> ConsensusPosition {
ConsensusPosition {
epoch: Default::default(),
block: BlockRef {
round: round as u32,
author: Default::default(),
digest: Default::default(),
},
index: index as TransactionIndex,
}
}
#[tokio::test]
async fn test_set_and_get_transaction_status() {
let cache = ConsensusTxStatusCache::new(60);
let tx_pos = create_test_tx_position(1, 0);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
let result = cache
.notify_read_transaction_status_change(tx_pos, None)
.await;
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::FastpathCertified)
));
}
#[tokio::test]
async fn test_status_notification() {
let cache = Arc::new(ConsensusTxStatusCache::new(60));
let tx_pos = create_test_tx_position(1, 0);
let cache_clone = cache.clone();
let handle = tokio::spawn(async move {
cache_clone
.notify_read_transaction_status_change(tx_pos, None)
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
let result = handle.await.unwrap();
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
));
}
#[tokio::test]
async fn test_round_expiration() {
let cache = ConsensusTxStatusCache::new(60);
let tx_pos = create_test_tx_position(1, 0);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
cache
.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 1)
.await;
cache
.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
.await;
let result = cache
.notify_read_transaction_status_change(tx_pos, None)
.await;
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Expired(_)
));
}
#[tokio::test]
async fn test_multiple_status_updates() {
let cache = ConsensusTxStatusCache::new(60);
let tx_pos = create_test_tx_position(1, 0);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
let result = cache
.notify_read_transaction_status_change(
tx_pos,
Some(ConsensusTxStatus::FastpathCertified),
)
.await;
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
));
}
#[tokio::test]
async fn test_cleanup_expired_rounds() {
let cache = ConsensusTxStatusCache::new(60);
for round in 1..=5 {
let tx_pos = create_test_tx_position(round, 0);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
}
cache
.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 2)
.await;
{
let inner = cache.inner.read();
let rounds = inner
.transaction_status
.keys()
.map(|p| p.block.round)
.collect::<Vec<_>>();
assert_eq!(rounds, vec![1, 2, 3, 4, 5]);
}
cache
.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 3)
.await;
{
let inner = cache.inner.read();
let rounds = inner
.transaction_status
.keys()
.map(|p| p.block.round)
.collect::<Vec<_>>();
assert_eq!(rounds, vec![3, 4, 5]);
}
cache
.update_last_committed_leader_round(CONSENSUS_STATUS_RETENTION_ROUNDS + 4)
.await;
{
let inner = cache.inner.read();
let rounds = inner
.transaction_status
.keys()
.map(|p| p.block.round)
.collect::<Vec<_>>();
assert_eq!(rounds, vec![4, 5]);
}
}
#[tokio::test]
async fn test_concurrent_operations() {
let cache = Arc::new(ConsensusTxStatusCache::new(60));
let tx_pos = create_test_tx_position(1, 0);
let mut handles = vec![];
for _ in 0..3 {
let cache_clone = cache.clone();
handles.push(tokio::spawn(async move {
cache_clone
.notify_read_transaction_status_change(tx_pos, None)
.await
}));
}
tokio::time::sleep(Duration::from_millis(10)).await;
cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
for handle in handles {
let result = handle.await.unwrap();
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
));
}
}
#[tokio::test]
async fn test_out_of_order_status_updates() {
let cache = Arc::new(ConsensusTxStatusCache::new(60));
let tx_pos = create_test_tx_position(1, 0);
cache.set_transaction_status(tx_pos, ConsensusTxStatus::Finalized);
let result = cache
.notify_read_transaction_status_change(tx_pos, None)
.await;
assert!(matches!(
result,
NotifyReadConsensusTxStatusResult::Status(ConsensusTxStatus::Finalized)
));
let cache_clone = cache.clone();
let notify_read_task = tokio::spawn(async move {
cache_clone
.notify_read_transaction_status_change(tx_pos, Some(ConsensusTxStatus::Finalized))
.await
});
cache.set_transaction_status(tx_pos, ConsensusTxStatus::FastpathCertified);
let result = tokio::time::timeout(Duration::from_secs(3), notify_read_task).await;
assert!(result.is_err());
assert_eq!(
cache.get_transaction_status(&tx_pos),
Some(ConsensusTxStatus::Finalized)
);
}
#[tokio::test]
async fn test_fastpath_certified_tracking() {
let cache = Arc::new(ConsensusTxStatusCache::new(60));
assert_eq!(cache.get_num_fastpath_certified(), 0);
let tx_pos1 = create_test_tx_position(100, 0);
let tx_pos2 = create_test_tx_position(100, 1);
let tx_pos3 = create_test_tx_position(101, 2);
let tx_pos4 = create_test_tx_position(102, 3);
cache.set_transaction_status(tx_pos1, ConsensusTxStatus::FastpathCertified);
assert_eq!(cache.get_num_fastpath_certified(), 1);
cache.set_transaction_status(tx_pos2, ConsensusTxStatus::FastpathCertified);
assert_eq!(cache.get_num_fastpath_certified(), 2);
cache.set_transaction_status(tx_pos3, ConsensusTxStatus::FastpathCertified);
assert_eq!(cache.get_num_fastpath_certified(), 3);
cache.set_transaction_status(tx_pos4, ConsensusTxStatus::FastpathCertified);
assert_eq!(cache.get_num_fastpath_certified(), 4);
let tx_pos5 = create_test_tx_position(103, 4);
cache.set_transaction_status(tx_pos5, ConsensusTxStatus::Finalized);
assert_eq!(cache.get_num_fastpath_certified(), 4);
cache.set_transaction_status(tx_pos1, ConsensusTxStatus::Finalized);
assert_eq!(cache.get_num_fastpath_certified(), 3);
assert_eq!(
cache.get_transaction_status(&tx_pos1),
Some(ConsensusTxStatus::Finalized)
);
cache.set_transaction_status(tx_pos2, ConsensusTxStatus::Rejected);
assert_eq!(cache.get_num_fastpath_certified(), 2);
assert_eq!(
cache.get_transaction_status(&tx_pos2),
Some(ConsensusTxStatus::Rejected)
);
cache.update_last_committed_leader_round(160).await;
assert_eq!(cache.get_num_fastpath_certified(), 2);
assert_eq!(
cache.get_transaction_status(&tx_pos3),
Some(ConsensusTxStatus::FastpathCertified)
);
assert_eq!(
cache.get_transaction_status(&tx_pos4),
Some(ConsensusTxStatus::FastpathCertified)
);
cache.update_last_committed_leader_round(161).await;
assert_eq!(cache.get_num_fastpath_certified(), 2);
assert_eq!(
cache.get_transaction_status(&tx_pos3),
Some(ConsensusTxStatus::FastpathCertified)
);
assert_eq!(
cache.get_transaction_status(&tx_pos4),
Some(ConsensusTxStatus::FastpathCertified)
);
cache.update_last_committed_leader_round(162).await;
assert_eq!(cache.get_num_fastpath_certified(), 1);
assert_eq!(
cache.get_transaction_status(&tx_pos3),
Some(ConsensusTxStatus::Rejected)
);
assert_eq!(
cache.get_transaction_status(&tx_pos4),
Some(ConsensusTxStatus::FastpathCertified)
);
cache.update_last_committed_leader_round(163).await;
assert_eq!(cache.get_num_fastpath_certified(), 0);
assert_eq!(
cache.get_transaction_status(&tx_pos4),
Some(ConsensusTxStatus::Rejected)
);
let tx_pos6 = create_test_tx_position(200, 5);
cache.set_transaction_status(tx_pos6, ConsensusTxStatus::Finalized);
assert_eq!(cache.get_num_fastpath_certified(), 0);
cache.set_transaction_status(tx_pos6, ConsensusTxStatus::FastpathCertified);
assert_eq!(cache.get_num_fastpath_certified(), 0);
assert_eq!(
cache.get_transaction_status(&tx_pos6),
Some(ConsensusTxStatus::Finalized)
);
}
}