use std::{collections::BTreeMap, sync::Arc};
use mysten_common::debug_fatal;
use mysten_metrics::monitored_mpsc::{channel, Receiver, Sender};
use parking_lot::Mutex;
use tap::TapFallible;
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{error, warn};
use crate::{
block::{BlockRef, Transaction, TransactionIndex},
context::Context,
Round,
};
const MAX_PENDING_TRANSACTIONS: usize = 2_000;
pub(crate) struct TransactionsGuard {
transactions: Vec<Transaction>,
included_in_block_ack: oneshot::Sender<(BlockRef, oneshot::Receiver<BlockStatus>)>,
}
pub(crate) struct TransactionConsumer {
context: Arc<Context>,
tx_receiver: Receiver<TransactionsGuard>,
max_transactions_in_block_bytes: u64,
max_num_transactions_in_block: u64,
pending_transactions: Option<TransactionsGuard>,
block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[allow(unused)]
pub enum BlockStatus {
Sequenced(BlockRef),
GarbageCollected(BlockRef),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LimitReached {
MaxNumOfTransactions,
MaxBytes,
AllTransactionsIncluded,
}
impl TransactionConsumer {
pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
Self {
tx_receiver,
max_transactions_in_block_bytes: context
.protocol_config
.max_transactions_in_block_bytes(),
max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
context,
pending_transactions: None,
block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
}
}
pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
let mut transactions = Vec::new();
let mut acks = Vec::new();
let mut total_bytes = 0;
let mut limit_reached = LimitReached::AllTransactionsIncluded;
let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
let transactions_bytes =
t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
let transactions_num = t.transactions.len() as u64;
if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
limit_reached = LimitReached::MaxBytes;
return Some(t);
}
if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
limit_reached = LimitReached::MaxNumOfTransactions;
return Some(t);
}
total_bytes += transactions_bytes;
acks.push(t.included_in_block_ack);
transactions.extend(t.transactions);
None
};
if let Some(t) = self.pending_transactions.take() {
if let Some(pending_transactions) = handle_txs(t) {
debug_fatal!("Previously pending transaction(s) should fit into an empty block! Dropping: {:?}", pending_transactions.transactions);
}
}
while self.pending_transactions.is_none() {
if let Ok(t) = self.tx_receiver.try_recv() {
self.pending_transactions = handle_txs(t);
} else {
break;
}
}
let block_status_subscribers = self.block_status_subscribers.clone();
let gc_enabled = self.context.protocol_config.gc_depth() > 0;
(
transactions,
Box::new(move |block_ref: BlockRef| {
let mut block_status_subscribers = block_status_subscribers.lock();
for ack in acks {
let (status_tx, status_rx) = oneshot::channel();
if gc_enabled {
block_status_subscribers
.entry(block_ref)
.or_default()
.push(status_tx);
} else {
status_tx.send(BlockStatus::Sequenced(block_ref)).ok();
}
let _ = ack.send((block_ref, status_rx));
}
}),
limit_reached,
)
}
pub(crate) fn notify_own_blocks_status(
&self,
committed_blocks: Vec<BlockRef>,
gc_round: Round,
) {
let mut block_status_subscribers = self.block_status_subscribers.lock();
for block_ref in committed_blocks {
if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
subscribers.into_iter().for_each(|s| {
let _ = s.send(BlockStatus::Sequenced(block_ref));
});
}
}
while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
if block_ref.round <= gc_round {
subscribers.into_iter().for_each(|s| {
let _ = s.send(BlockStatus::GarbageCollected(block_ref));
});
} else {
block_status_subscribers.insert(block_ref, subscribers);
break;
}
}
}
#[cfg(test)]
pub(crate) fn subscribe_for_block_status_testing(
&self,
block_ref: BlockRef,
) -> oneshot::Receiver<BlockStatus> {
let (tx, rx) = oneshot::channel();
let mut block_status_subscribers = self.block_status_subscribers.lock();
block_status_subscribers
.entry(block_ref)
.or_default()
.push(tx);
rx
}
#[cfg(test)]
fn is_empty(&mut self) -> bool {
if self.pending_transactions.is_some() {
return false;
}
if let Ok(t) = self.tx_receiver.try_recv() {
self.pending_transactions = Some(t);
return false;
}
true
}
}
#[derive(Clone)]
pub struct TransactionClient {
sender: Sender<TransactionsGuard>,
max_transaction_size: u64,
max_transactions_in_block_bytes: u64,
max_transactions_in_block_count: u64,
}
#[derive(Debug, Error)]
pub enum ClientError {
#[error("Failed to submit transaction, consensus is shutting down: {0}")]
ConsensusShuttingDown(String),
#[error("Transaction size ({0}B) is over limit ({1}B)")]
OversizedTransaction(u64, u64),
#[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
OversizedTransactionBundleBytes(u64, u64),
#[error("Transaction bundle count ({0}) is over limit ({1})")]
OversizedTransactionBundleCount(u64, u64),
}
impl TransactionClient {
pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
let (sender, receiver) = channel("consensus_input", MAX_PENDING_TRANSACTIONS);
(
Self {
sender,
max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
max_transactions_in_block_bytes: context
.protocol_config
.max_transactions_in_block_bytes(),
max_transactions_in_block_count: context
.protocol_config
.max_num_transactions_in_block(),
},
receiver,
)
}
pub async fn submit(
&self,
transactions: Vec<Vec<u8>>,
) -> Result<(BlockRef, oneshot::Receiver<BlockStatus>), ClientError> {
let included_in_block = self.submit_no_wait(transactions).await?;
included_in_block
.await
.tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
.map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
}
pub(crate) async fn submit_no_wait(
&self,
transactions: Vec<Vec<u8>>,
) -> Result<oneshot::Receiver<(BlockRef, oneshot::Receiver<BlockStatus>)>, ClientError> {
let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
let mut bundle_size = 0;
if transactions.len() as u64 > self.max_transactions_in_block_count {
return Err(ClientError::OversizedTransactionBundleCount(
transactions.len() as u64,
self.max_transactions_in_block_count,
));
}
for transaction in &transactions {
if transaction.len() as u64 > self.max_transaction_size {
return Err(ClientError::OversizedTransaction(
transaction.len() as u64,
self.max_transaction_size,
));
}
bundle_size += transaction.len() as u64;
if bundle_size > self.max_transactions_in_block_bytes {
return Err(ClientError::OversizedTransactionBundleBytes(
bundle_size,
self.max_transactions_in_block_bytes,
));
}
}
let t = TransactionsGuard {
transactions: transactions.into_iter().map(Transaction::new).collect(),
included_in_block_ack: included_in_block_ack_send,
};
self.sender
.send(t)
.await
.tap_err(|e| error!("Submit transactions failed with {:?}", e))
.map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
Ok(included_in_block_ack_receive)
}
}
pub trait TransactionVerifier: Send + Sync + 'static {
fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
fn verify_and_vote_batch(
&self,
batch: &[&[u8]],
) -> Result<Vec<TransactionIndex>, ValidationError>;
}
#[derive(Debug, Error)]
pub enum ValidationError {
#[error("Invalid transaction: {0}")]
InvalidTransaction(String),
}
#[cfg(any(test, msim))]
pub struct NoopTransactionVerifier;
#[cfg(any(test, msim))]
impl TransactionVerifier for NoopTransactionVerifier {
fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
Ok(())
}
fn verify_and_vote_batch(
&self,
_batch: &[&[u8]],
) -> Result<Vec<TransactionIndex>, ValidationError> {
Ok(vec![])
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use consensus_config::AuthorityIndex;
use futures::{stream::FuturesUnordered, StreamExt};
use sui_protocol_config::ProtocolConfig;
use tokio::time::timeout;
use crate::transaction::NoopTransactionVerifier;
use crate::{
block::{BlockDigest, BlockRef},
block_verifier::SignedBlockVerifier,
context::Context,
transaction::{BlockStatus, LimitReached, TransactionClient, TransactionConsumer},
};
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn basic_submit_and_consume() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut included_in_block_waiters = FuturesUnordered::new();
for i in 0..3 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Shouldn't submit successfully transaction");
included_in_block_waiters.push(w);
}
let (transactions, ack_transactions, _limit_reached) = consumer.next();
assert_eq!(transactions.len(), 3);
for (i, t) in transactions.iter().enumerate() {
let t: String = bcs::from_bytes(t.data()).unwrap();
assert_eq!(format!("transaction {i}").to_string(), t);
}
assert!(
timeout(Duration::from_secs(1), included_in_block_waiters.next())
.await
.is_err(),
"We should expect to timeout as none of the transactions have been acknowledged yet"
);
ack_transactions(BlockRef::MIN);
while let Some(result) = included_in_block_waiters.next().await {
assert!(result.is_ok());
}
assert!(consumer.is_empty());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn block_status_update_gc_enabled() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
config.set_consensus_gc_depth_for_testing(10);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut included_in_block_waiters = FuturesUnordered::new();
for i in 1..=10 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Shouldn't submit successfully transaction");
included_in_block_waiters.push(w);
if i % 2 == 0 {
let (transactions, ack_transactions, _limit_reached) = consumer.next();
assert_eq!(transactions.len(), 2);
ack_transactions(BlockRef::new(
i,
AuthorityIndex::new_for_test(0),
BlockDigest::MIN,
));
}
}
let mut block_status_waiters = Vec::new();
while let Some(result) = included_in_block_waiters.next().await {
let (block_ref, block_status_waiter) =
result.expect("Block inclusion waiter shouldn't fail");
block_status_waiters.push((block_ref, block_status_waiter));
}
let gc_round = 5;
consumer.notify_own_blocks_status(
vec![
BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
],
gc_round,
);
for (block_ref, waiter) in block_status_waiters {
let block_status = waiter.await.expect("Block status waiter shouldn't fail");
if block_ref.round <= gc_round {
assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
} else {
assert!(matches!(block_status, BlockStatus::Sequenced(_)));
}
}
assert!(consumer.block_status_subscribers.lock().is_empty());
}
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn block_status_update_gc_disabled() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
config.set_consensus_gc_depth_for_testing(0);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut included_in_block_waiters = FuturesUnordered::new();
for i in 1..=10 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Shouldn't submit successfully transaction");
included_in_block_waiters.push(w);
if i % 2 == 0 {
let (transactions, ack_transactions, _limit_reached) = consumer.next();
assert_eq!(transactions.len(), 2);
ack_transactions(BlockRef::new(
i,
AuthorityIndex::new_for_test(0),
BlockDigest::MIN,
));
}
}
let mut block_status_waiters = Vec::new();
while let Some(result) = included_in_block_waiters.next().await {
let (block_ref, block_status_waiter) =
result.expect("Block inclusion waiter shouldn't fail");
block_status_waiters.push((block_ref, block_status_waiter));
}
for (_block_ref, waiter) in block_status_waiters {
let block_status = waiter.await.expect("Block status waiter shouldn't fail");
assert!(matches!(block_status, BlockStatus::Sequenced(_)));
}
assert!(consumer.block_status_subscribers.lock().is_empty());
}
#[tokio::test]
async fn submit_over_max_fetch_size_and_consume() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(100);
config.set_consensus_max_transactions_in_block_bytes_for_testing(100);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
for i in 0..10 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let _w = client
.submit_no_wait(vec![transaction])
.await
.expect("Shouldn't submit successfully transaction");
}
let mut all_transactions = Vec::new();
let (transactions, _ack_transactions, _limit_reached) = consumer.next();
assert_eq!(transactions.len(), 7);
let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
assert!(
total_size <= context.protocol_config.max_transactions_in_block_bytes(),
"Should have fetched transactions up to {}",
context.protocol_config.max_transactions_in_block_bytes()
);
all_transactions.extend(transactions);
let (transactions, _ack_transactions, _limit_reached) = consumer.next();
assert_eq!(transactions.len(), 3);
let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
assert!(
total_size <= context.protocol_config.max_transactions_in_block_bytes(),
"Should have fetched transactions up to {}",
context.protocol_config.max_transactions_in_block_bytes()
);
all_transactions.extend(transactions);
assert!(consumer.is_empty());
for (i, t) in all_transactions.iter().enumerate() {
let t: String = bcs::from_bytes(t.data()).unwrap();
assert_eq!(format!("transaction {i}").to_string(), t);
}
}
#[tokio::test]
async fn submit_large_batch_and_ack() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(15);
config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut all_receivers = Vec::new();
for i in 0..10 {
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Should submit successfully transaction");
all_receivers.push(w);
}
{
let transactions: Vec<_> = (10..15)
.map(|i| {
bcs::to_bytes(&format!("transaction {i}"))
.expect("Serialization should not fail.")
})
.collect();
let w = client
.submit_no_wait(transactions)
.await
.expect("Should submit successfully transaction");
all_receivers.push(w);
}
{
let i = 15;
let transaction =
bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Shouldn't submit successfully transaction");
all_receivers.push(w);
}
{
let transactions: Vec<_> = (16..32)
.map(|i| {
bcs::to_bytes(&format!("transaction {i}"))
.expect("Serialization should not fail.")
})
.collect();
let result = client.submit_no_wait(transactions).await.unwrap_err();
assert_eq!(
result.to_string(),
"Transaction bundle size (210B) is over limit (200B)"
);
}
let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
let mut batch_index = 0;
while !consumer.is_empty() {
let (transactions, ack_transactions, _limit_reached) = consumer.next();
assert!(
transactions.len() as u64
<= context.protocol_config.max_num_transactions_in_block(),
"Should have fetched transactions up to {}",
context.protocol_config.max_num_transactions_in_block()
);
let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
assert!(
total_size <= context.protocol_config.max_transactions_in_block_bytes(),
"Should have fetched transactions up to {}",
context.protocol_config.max_transactions_in_block_bytes()
);
if batch_index == 0 {
assert_eq!(transactions.len(), 10);
for (i, transaction) in transactions.iter().enumerate() {
let t: String = bcs::from_bytes(transaction.data()).unwrap();
assert_eq!(format!("transaction {}", i).to_string(), t);
}
} else if batch_index == 1 {
assert_eq!(transactions.len(), 6);
for (i, transaction) in transactions.iter().enumerate() {
let t: String = bcs::from_bytes(transaction.data()).unwrap();
assert_eq!(format!("transaction {}", i + 10).to_string(), t);
}
} else {
panic!("Unexpected batch index");
}
batch_index += 1;
all_acks.push(ack_transactions);
}
for ack in all_acks {
ack(BlockRef::MIN);
}
for w in all_receivers {
let r = w.await;
assert!(r.is_ok());
}
}
#[tokio::test]
async fn test_submit_over_max_block_size_and_validate_block_size() {
{
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(100);
config.set_consensus_max_num_transactions_in_block_for_testing(10);
config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut all_receivers = Vec::new();
let max_num_transactions_in_block =
context.protocol_config.max_num_transactions_in_block();
for i in 0..2 * max_num_transactions_in_block {
let transaction = bcs::to_bytes(&format!("transaction {i}"))
.expect("Serialization should not fail.");
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Should submit successfully transaction");
all_receivers.push(w);
}
let (transactions, _ack_transactions, limit) = consumer.next();
assert_eq!(limit, LimitReached::MaxNumOfTransactions);
assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
let block_verifier =
SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
assert!(
block_verifier.check_transactions(&batch).is_ok(),
"Number of transactions limit verification failed"
);
}
{
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
config.set_consensus_max_transaction_size_bytes_for_testing(100);
config.set_consensus_max_num_transactions_in_block_for_testing(1_000);
config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
config
});
let context = Arc::new(Context::new_for_test(4).0);
let (client, tx_receiver) = TransactionClient::new(context.clone());
let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
let mut all_receivers = Vec::new();
let max_transactions_in_block_bytes =
context.protocol_config.max_transactions_in_block_bytes();
let mut total_size = 0;
loop {
let transaction = bcs::to_bytes(&"transaction".to_string())
.expect("Serialization should not fail.");
total_size += transaction.len() as u64;
let w = client
.submit_no_wait(vec![transaction])
.await
.expect("Should submit successfully transaction");
all_receivers.push(w);
if total_size >= 2 * max_transactions_in_block_bytes {
break;
}
}
let (transactions, _ack_transactions, limit) = consumer.next();
let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
assert_eq!(limit, LimitReached::MaxBytes);
assert!(
batch.len()
< context
.protocol_config
.consensus_max_num_transactions_in_block() as usize,
"Should have submitted less than the max number of transactions in a block"
);
assert!(size <= max_transactions_in_block_bytes);
let block_verifier =
SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
assert!(
block_verifier.check_transactions(&batch).is_ok(),
"Total size of transactions limit verification failed"
);
}
}
}