use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet},
hash::{Hash, Hasher},
num::NonZeroUsize,
sync::Arc,
};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use lru::LruCache;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use narwhal_config::Committee;
use narwhal_executor::{ExecutionIndices, ExecutionState};
use narwhal_types::ConsensusOutput;
use serde::{Deserialize, Serialize};
use sui_macros::{fail_point_async, fail_point_if};
use sui_types::{
authenticator_state::ActiveJwk,
base_types::{AuthorityName, EpochId, TransactionDigest},
digests::ConsensusCommitDigest,
executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction},
messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind},
sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
transaction::{SenderSignedData, VerifiedTransaction},
};
use tracing::{debug, error, info, instrument, trace_span};
use crate::{
authority::{
authority_per_epoch_store::{
AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndicesWithStats,
},
epoch_start_configuration::EpochStartConfigTrait,
AuthorityMetrics, AuthorityState,
},
checkpoints::{CheckpointService, CheckpointServiceNotify},
consensus_throughput_calculator::ConsensusThroughputCalculator,
consensus_types::{
committee_api::CommitteeAPI, consensus_output_api::ConsensusOutputAPI, AuthorityIndex,
},
execution_cache::ExecutionCacheRead,
scoring_decision::update_low_scoring_authorities,
transaction_manager::TransactionManager,
};
pub struct ConsensusHandlerInitializer {
state: Arc<AuthorityState>,
checkpoint_service: Arc<CheckpointService>,
epoch_store: Arc<AuthorityPerEpochStore>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
}
impl ConsensusHandlerInitializer {
pub fn new(
state: Arc<AuthorityState>,
checkpoint_service: Arc<CheckpointService>,
epoch_store: Arc<AuthorityPerEpochStore>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
) -> Self {
Self {
state,
checkpoint_service,
epoch_store,
low_scoring_authorities,
throughput_calculator,
}
}
pub fn new_for_testing(
state: Arc<AuthorityState>,
checkpoint_service: Arc<CheckpointService>,
) -> Self {
Self {
state: state.clone(),
checkpoint_service,
epoch_store: state.epoch_store_for_testing().clone(),
low_scoring_authorities: Arc::new(Default::default()),
throughput_calculator: Arc::new(ConsensusThroughputCalculator::new(
None,
state.metrics.clone(),
)),
}
}
pub fn new_consensus_handler(&self) -> ConsensusHandler<CheckpointService> {
let new_epoch_start_state = self.epoch_store.epoch_start_state();
let committee = new_epoch_start_state.get_narwhal_committee();
ConsensusHandler::new(
self.epoch_store.clone(),
self.checkpoint_service.clone(),
self.state.transaction_manager().clone(),
self.state.get_cache_reader().clone(),
self.low_scoring_authorities.clone(),
committee,
self.state.metrics.clone(),
self.throughput_calculator.clone(),
)
}
}
pub struct ConsensusHandler<C> {
epoch_store: Arc<AuthorityPerEpochStore>,
last_consensus_stats: ExecutionIndicesWithStats,
checkpoint_service: Arc<C>,
cache_reader: Arc<dyn ExecutionCacheRead>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
committee: Committee,
metrics: Arc<AuthorityMetrics>,
processed_cache: LruCache<SequencedConsensusTransactionKey, ()>,
transaction_scheduler: AsyncTransactionScheduler,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
}
const PROCESSED_CACHE_CAP: usize = 1024 * 1024;
impl<C> ConsensusHandler<C> {
pub fn new(
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint_service: Arc<C>,
transaction_manager: Arc<TransactionManager>,
cache_reader: Arc<dyn ExecutionCacheRead>,
low_scoring_authorities: Arc<ArcSwap<HashMap<AuthorityName, u64>>>,
committee: Committee,
metrics: Arc<AuthorityMetrics>,
throughput_calculator: Arc<ConsensusThroughputCalculator>,
) -> Self {
let mut last_consensus_stats = epoch_store
.get_last_consensus_stats()
.expect("Should be able to read last consensus index");
if !last_consensus_stats.stats.is_initialized() {
last_consensus_stats.stats = ConsensusStats::new(committee.size());
}
let transaction_scheduler =
AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone());
Self {
epoch_store,
last_consensus_stats,
checkpoint_service,
cache_reader,
low_scoring_authorities,
committee,
metrics,
processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()),
transaction_scheduler,
throughput_calculator,
}
}
fn update_index_and_hash(&mut self, index: ExecutionIndices, v: &[u8]) {
update_index_and_hash(&mut self.last_consensus_stats, index, v)
}
}
fn update_index_and_hash(
last_consensus_stats: &mut ExecutionIndicesWithStats,
index: ExecutionIndices,
v: &[u8],
) {
assert!(last_consensus_stats.index < index);
let previous_hash = last_consensus_stats.hash;
let mut hasher = DefaultHasher::new();
previous_hash.hash(&mut hasher);
v.hash(&mut hasher);
let hash = hasher.finish();
if index.transaction_index % 1000 == 0 {
info!(
"Integrity hash for consensus output at subdag {} transaction {} is {:016x}",
index.sub_dag_index, index.transaction_index, hash
);
}
last_consensus_stats.index = index;
last_consensus_stats.hash = hash;
}
#[async_trait]
impl<C: CheckpointServiceNotify + Send + Sync> ExecutionState for ConsensusHandler<C> {
#[instrument(level = "debug", skip_all)]
async fn handle_consensus_output(&mut self, consensus_output: ConsensusOutput) {
let _scope = monitored_scope("HandleConsensusOutput");
self.handle_consensus_output_internal(consensus_output)
.await;
}
fn last_executed_sub_dag_round(&self) -> u64 {
self.last_consensus_stats.index.last_committed_round
}
fn last_executed_sub_dag_index(&self) -> u64 {
self.last_consensus_stats.index.sub_dag_index
}
}
impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {
#[instrument(level = "debug", skip_all)]
async fn handle_consensus_output_internal(
&mut self,
consensus_output: impl ConsensusOutputAPI,
) {
assert!(self
.epoch_store
.protocol_config()
.consensus_order_end_of_epoch_last());
let last_committed_round = self.last_consensus_stats.index.last_committed_round;
let round = consensus_output.leader_round();
assert!(round >= last_committed_round);
if last_committed_round == round {
info!(
"Ignoring consensus output for round {} as it is already committed",
round
);
return;
}
let mut transactions = vec![];
let timestamp = consensus_output.commit_timestamp_ms();
let leader_author = consensus_output.leader_author_index();
let commit_sub_dag_index = consensus_output.commit_sub_dag_index();
let epoch_start = self
.epoch_store
.epoch_start_config()
.epoch_start_timestamp_ms();
let timestamp = if timestamp < epoch_start {
error!(
"Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}, round {round}",
);
epoch_start
} else {
timestamp
};
let prologue_transaction = match self
.epoch_store
.protocol_config()
.include_consensus_digest_in_prologue()
{
true => self.consensus_commit_prologue_v2_transaction(
round,
timestamp,
consensus_output.consensus_digest(),
),
false => self.consensus_commit_prologue_transaction(round, timestamp),
};
info!(
%consensus_output,
epoch = ?self.epoch_store.epoch(),
prologue_transaction_digest = ?prologue_transaction.digest(),
"Received consensus output"
);
let empty_bytes = vec![];
transactions.push((
empty_bytes.as_slice(),
SequencedConsensusTransactionKind::System(prologue_transaction),
consensus_output.leader_author_index(),
));
let new_jwks = self
.epoch_store
.get_new_jwks(last_committed_round)
.expect("Unrecoverable error in consensus handler");
if !new_jwks.is_empty() {
let authenticator_state_update_transaction =
self.authenticator_state_update_transaction(round, new_jwks);
debug!(
"adding AuthenticatorStateUpdate({:?}) tx: {:?}",
authenticator_state_update_transaction.digest(),
authenticator_state_update_transaction,
);
transactions.push((
empty_bytes.as_slice(),
SequencedConsensusTransactionKind::System(authenticator_state_update_transaction),
consensus_output.leader_author_index(),
));
}
update_low_scoring_authorities(
self.low_scoring_authorities.clone(),
&self.committee,
consensus_output.reputation_score_sorted_desc(),
&self.metrics,
self.epoch_store
.protocol_config()
.consensus_bad_nodes_stake_threshold(),
);
self.metrics
.consensus_committed_subdags
.with_label_values(&[&leader_author.to_string()])
.inc();
let mut bytes = 0usize;
{
let span = trace_span!("process_consensus_certs");
let _guard = span.enter();
for (authority_index, authority_transactions) in consensus_output.transactions() {
self.last_consensus_stats
.stats
.inc_num_messages(authority_index as usize);
for (serialized_transaction, transaction) in authority_transactions {
bytes += serialized_transaction.len();
self.metrics
.consensus_handler_processed
.with_label_values(&[classify(&transaction)])
.inc();
if matches!(
&transaction.kind,
ConsensusTransactionKind::UserTransaction(_)
) {
self.last_consensus_stats
.stats
.inc_num_user_transactions(authority_index as usize);
}
if let ConsensusTransactionKind::RandomnessStateUpdate(randomness_round, _) =
&transaction.kind
{
error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}")
} else {
let transaction = SequencedConsensusTransactionKind::External(transaction);
transactions.push((serialized_transaction, transaction, authority_index));
}
}
}
}
for i in 0..self.committee.size() {
let hostname = self
.committee
.authority_hostname_by_index(i as AuthorityIndex)
.unwrap_or_default();
self.metrics
.consensus_committed_messages
.with_label_values(&[hostname])
.set(self.last_consensus_stats.stats.get_num_messages(i) as i64);
self.metrics
.consensus_committed_user_transactions
.with_label_values(&[hostname])
.set(self.last_consensus_stats.stats.get_num_user_transactions(i) as i64);
}
self.metrics
.consensus_handler_processed_bytes
.inc_by(bytes as u64);
let mut all_transactions = Vec::new();
{
let mut processed_set = HashSet::new();
for (seq, (serialized, transaction, cert_origin)) in
transactions.into_iter().enumerate()
{
let current_tx_index = ExecutionIndices {
last_committed_round: round,
sub_dag_index: commit_sub_dag_index,
transaction_index: seq as u64,
};
self.update_index_and_hash(current_tx_index, serialized);
let certificate_author = self
.committee
.authority_pubkey_by_index(cert_origin)
.unwrap();
let sequenced_transaction = SequencedConsensusTransaction {
certificate_author_index: cert_origin,
certificate_author,
consensus_index: current_tx_index,
transaction,
};
let key = sequenced_transaction.key();
let in_set = !processed_set.insert(key);
let in_cache = self
.processed_cache
.put(sequenced_transaction.key(), ())
.is_some();
if in_set || in_cache {
self.metrics.skipped_consensus_txns_cache_hit.inc();
continue;
}
all_transactions.push(sequenced_transaction);
}
}
let transactions_to_schedule = self
.epoch_store
.process_consensus_transactions_and_commit_boundary(
all_transactions,
&self.last_consensus_stats,
&self.checkpoint_service,
self.cache_reader.as_ref(),
round,
timestamp,
&self.metrics,
)
.await
.expect("Unrecoverable error in consensus handler");
self.throughput_calculator
.add_transactions(timestamp, transactions_to_schedule.len() as u64);
fail_point_if!("correlated-crash-after-consensus-commit-boundary", || {
let key = [commit_sub_dag_index, self.epoch_store.epoch()];
if sui_simulator::random::deterministic_probability(&key, 0.01) {
sui_simulator::task::kill_current_node(None);
}
});
fail_point_async!("crash"); self.transaction_scheduler
.schedule(transactions_to_schedule)
.await;
}
}
struct AsyncTransactionScheduler {
sender: tokio::sync::mpsc::Sender<Vec<VerifiedExecutableTransaction>>,
}
impl AsyncTransactionScheduler {
pub fn start(
transaction_manager: Arc<TransactionManager>,
epoch_store: Arc<AuthorityPerEpochStore>,
) -> Self {
let (sender, recv) = tokio::sync::mpsc::channel(16);
spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store));
Self { sender }
}
pub async fn schedule(&self, transactions: Vec<VerifiedExecutableTransaction>) {
self.sender.send(transactions).await.ok();
}
pub async fn run(
mut recv: tokio::sync::mpsc::Receiver<Vec<VerifiedExecutableTransaction>>,
transaction_manager: Arc<TransactionManager>,
epoch_store: Arc<AuthorityPerEpochStore>,
) {
while let Some(transactions) = recv.recv().await {
let _guard = monitored_scope("ConsensusHandler::enqueue");
transaction_manager.enqueue(transactions, &epoch_store);
}
}
}
pub struct MysticetiConsensusHandler {
handle: Option<tokio::task::JoinHandle<()>>,
}
impl MysticetiConsensusHandler {
pub fn new(
mut consensus_handler: ConsensusHandler<CheckpointService>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<consensus_core::CommittedSubDag>,
) -> Self {
let handle = spawn_monitored_task!(async move {
while let Some(committed_subdag) = receiver.recv().await {
consensus_handler
.handle_consensus_output_internal(committed_subdag)
.await;
}
});
Self {
handle: Some(handle),
}
}
pub async fn abort(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
let _ = handle.await;
}
}
}
impl Drop for MysticetiConsensusHandler {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
impl<C> ConsensusHandler<C> {
fn consensus_commit_prologue_transaction(
&self,
round: u64,
commit_timestamp_ms: u64,
) -> VerifiedExecutableTransaction {
let transaction = VerifiedTransaction::new_consensus_commit_prologue(
self.epoch(),
round,
commit_timestamp_ms,
);
VerifiedExecutableTransaction::new_system(transaction, self.epoch())
}
fn consensus_commit_prologue_v2_transaction(
&self,
round: u64,
commit_timestamp_ms: u64,
consensus_digest: ConsensusCommitDigest,
) -> VerifiedExecutableTransaction {
let transaction = VerifiedTransaction::new_consensus_commit_prologue_v2(
self.epoch(),
round,
commit_timestamp_ms,
consensus_digest,
);
VerifiedExecutableTransaction::new_system(transaction, self.epoch())
}
fn authenticator_state_update_transaction(
&self,
round: u64,
mut new_active_jwks: Vec<ActiveJwk>,
) -> VerifiedExecutableTransaction {
new_active_jwks.sort();
info!("creating authenticator state update transaction");
assert!(self.epoch_store.authenticator_state_enabled());
let transaction = VerifiedTransaction::new_authenticator_state_update(
self.epoch(),
round,
new_active_jwks,
self.epoch_store
.epoch_start_config()
.authenticator_obj_initial_shared_version()
.expect("authenticator state obj must exist"),
);
VerifiedExecutableTransaction::new_system(transaction, self.epoch())
}
fn epoch(&self) -> EpochId {
self.epoch_store.epoch()
}
}
pub(crate) fn classify(transaction: &ConsensusTransaction) -> &'static str {
match &transaction.kind {
ConsensusTransactionKind::UserTransaction(certificate) => {
if certificate.contains_shared_object() {
"shared_certificate"
} else {
"owned_certificate"
}
}
ConsensusTransactionKind::CheckpointSignature(_) => "checkpoint_signature",
ConsensusTransactionKind::EndOfPublish(_) => "end_of_publish",
ConsensusTransactionKind::CapabilityNotification(_) => "capability_notification",
ConsensusTransactionKind::NewJWKFetched(_, _, _) => "new_jwk_fetched",
ConsensusTransactionKind::RandomnessStateUpdate(_, _) => "randomness_state_update",
ConsensusTransactionKind::RandomnessDkgMessage(_, _) => "randomness_dkg_message",
ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => "randomness_dkg_confirmation",
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SequencedConsensusTransaction {
pub certificate_author_index: AuthorityIndex,
pub certificate_author: AuthorityName,
pub consensus_index: ExecutionIndices,
pub transaction: SequencedConsensusTransactionKind,
}
#[derive(Debug, Clone)]
pub enum SequencedConsensusTransactionKind {
External(ConsensusTransaction),
System(VerifiedExecutableTransaction),
}
impl Serialize for SequencedConsensusTransactionKind {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let serializable = SerializableSequencedConsensusTransactionKind::from(self);
serializable.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for SequencedConsensusTransactionKind {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let serializable =
SerializableSequencedConsensusTransactionKind::deserialize(deserializer)?;
Ok(serializable.into())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum SerializableSequencedConsensusTransactionKind {
External(ConsensusTransaction),
System(TrustedExecutableTransaction),
}
impl From<&SequencedConsensusTransactionKind> for SerializableSequencedConsensusTransactionKind {
fn from(kind: &SequencedConsensusTransactionKind) -> Self {
match kind {
SequencedConsensusTransactionKind::External(ext) => {
SerializableSequencedConsensusTransactionKind::External(ext.clone())
}
SequencedConsensusTransactionKind::System(txn) => {
SerializableSequencedConsensusTransactionKind::System(txn.clone().serializable())
}
}
}
}
impl From<SerializableSequencedConsensusTransactionKind> for SequencedConsensusTransactionKind {
fn from(kind: SerializableSequencedConsensusTransactionKind) -> Self {
match kind {
SerializableSequencedConsensusTransactionKind::External(ext) => {
SequencedConsensusTransactionKind::External(ext)
}
SerializableSequencedConsensusTransactionKind::System(txn) => {
SequencedConsensusTransactionKind::System(txn.into())
}
}
}
}
#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq, Debug)]
pub enum SequencedConsensusTransactionKey {
External(ConsensusTransactionKey),
System(TransactionDigest),
}
impl SequencedConsensusTransactionKind {
pub fn key(&self) -> SequencedConsensusTransactionKey {
match self {
SequencedConsensusTransactionKind::External(ext) => {
SequencedConsensusTransactionKey::External(ext.key())
}
SequencedConsensusTransactionKind::System(txn) => {
SequencedConsensusTransactionKey::System(*txn.digest())
}
}
}
pub fn get_tracking_id(&self) -> u64 {
match self {
SequencedConsensusTransactionKind::External(ext) => ext.get_tracking_id(),
SequencedConsensusTransactionKind::System(_txn) => 0,
}
}
pub fn is_executable_transaction(&self) -> bool {
match self {
SequencedConsensusTransactionKind::External(ext) => ext.is_user_certificate(),
SequencedConsensusTransactionKind::System(_) => true,
}
}
pub fn executable_transaction_digest(&self) -> Option<TransactionDigest> {
match self {
SequencedConsensusTransactionKind::External(ext) => {
if let ConsensusTransactionKind::UserTransaction(txn) = &ext.kind {
Some(*txn.digest())
} else {
None
}
}
SequencedConsensusTransactionKind::System(txn) => Some(*txn.digest()),
}
}
pub fn is_end_of_publish(&self) -> bool {
match self {
SequencedConsensusTransactionKind::External(ext) => {
matches!(ext.kind, ConsensusTransactionKind::EndOfPublish(..))
}
SequencedConsensusTransactionKind::System(_) => false,
}
}
}
impl SequencedConsensusTransaction {
pub fn sender_authority(&self) -> AuthorityName {
self.certificate_author
}
pub fn key(&self) -> SequencedConsensusTransactionKey {
self.transaction.key()
}
pub fn is_end_of_publish(&self) -> bool {
if let SequencedConsensusTransactionKind::External(ref transaction) = self.transaction {
matches!(transaction.kind, ConsensusTransactionKind::EndOfPublish(..))
} else {
false
}
}
pub fn is_system(&self) -> bool {
matches!(
self.transaction,
SequencedConsensusTransactionKind::System(_)
)
}
pub fn is_user_tx_with_randomness(&self, randomness_state_enabled: bool) -> bool {
if !randomness_state_enabled {
return false;
}
let SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::UserTransaction(certificate),
..
}) = &self.transaction
else {
return false;
};
certificate.is_randomness_reader()
}
pub fn as_shared_object_txn(&self) -> Option<&SenderSignedData> {
match &self.transaction {
SequencedConsensusTransactionKind::External(ConsensusTransaction {
kind: ConsensusTransactionKind::UserTransaction(certificate),
..
}) if certificate.contains_shared_object() => Some(certificate.data()),
SequencedConsensusTransactionKind::System(txn) if txn.contains_shared_object() => {
Some(txn.data())
}
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerifiedSequencedConsensusTransaction(pub SequencedConsensusTransaction);
#[cfg(test)]
impl VerifiedSequencedConsensusTransaction {
pub fn new_test(transaction: ConsensusTransaction) -> Self {
Self(SequencedConsensusTransaction::new_test(transaction))
}
}
impl SequencedConsensusTransaction {
pub fn new_test(transaction: ConsensusTransaction) -> Self {
Self {
certificate_author_index: 0,
certificate_author: AuthorityName::ZERO,
consensus_index: Default::default(),
transaction: SequencedConsensusTransactionKind::External(transaction),
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use narwhal_config::AuthorityIdentifier;
use narwhal_test_utils::latest_protocol_version;
use narwhal_types::{Batch, Certificate, CommittedSubDag, HeaderV1Builder, ReputationScores};
use prometheus::Registry;
use sui_protocol_config::{ConsensusTransactionOrdering, SupportedProtocolVersions};
use sui_types::{
base_types::{random_object_ref, AuthorityName, SuiAddress},
committee::Committee,
messages_consensus::{
AuthorityCapabilities, ConsensusTransaction, ConsensusTransactionKind,
},
object::Object,
sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait,
transaction::{
CertifiedTransaction, SenderSignedData, TransactionData, TransactionDataAPI,
},
};
use super::*;
use crate::{
authority::{
authority_per_epoch_store::ConsensusStatsAPI,
test_authority_builder::TestAuthorityBuilder,
},
checkpoints::CheckpointServiceNoop,
consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
post_consensus_tx_reorder::PostConsensusTxReorder,
};
#[tokio::test]
pub async fn test_consensus_handler() {
let mut objects = test_gas_objects();
objects.push(Object::shared_for_testing());
let latest_protocol_config = &latest_protocol_version();
let network_config =
sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
.with_objects(objects.clone())
.build();
let state = TestAuthorityBuilder::new()
.with_network_config(&network_config)
.build()
.await;
let epoch_store = state.epoch_store_for_testing().clone();
let new_epoch_start_state = epoch_store.epoch_start_state();
let committee = new_epoch_start_state.get_narwhal_committee();
let metrics = Arc::new(AuthorityMetrics::new(&Registry::new()));
let throughput_calculator = ConsensusThroughputCalculator::new(None, metrics.clone());
let mut consensus_handler = ConsensusHandler::new(
epoch_store,
Arc::new(CheckpointServiceNoop {}),
state.transaction_manager().clone(),
state.get_cache_reader().clone(),
Arc::new(ArcSwap::default()),
committee.clone(),
metrics,
Arc::new(throughput_calculator),
);
let transactions = test_certificates(&state).await;
let mut certificates = Vec::new();
let mut batches = Vec::new();
for transaction in transactions.iter() {
let transaction_bytes: Vec<u8> = bcs::to_bytes(
&ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()),
)
.unwrap();
let batch = Batch::new(vec![transaction_bytes], latest_protocol_config);
batches.push(vec![batch.clone()]);
let header = HeaderV1Builder::default()
.author(AuthorityIdentifier(0))
.round(5)
.epoch(0)
.parents(BTreeSet::new())
.with_payload_batch(batch.clone(), 0, 0)
.build()
.unwrap();
let certificate = Certificate::new_unsigned(
latest_protocol_config,
&committee,
header.into(),
vec![],
)
.unwrap();
certificates.push(certificate);
}
let consensus_output = ConsensusOutput {
sub_dag: Arc::new(CommittedSubDag::new(
certificates.clone(),
certificates[0].clone(),
10,
ReputationScores::default(),
None,
)),
batches,
};
consensus_handler
.handle_consensus_output(consensus_output.clone())
.await;
let num_certificates = certificates.len();
let num_transactions = transactions.len();
let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone();
assert_eq!(
last_consensus_stats_1.index.transaction_index,
num_transactions as u64
);
assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64);
assert_eq!(last_consensus_stats_1.index.last_committed_round, 5_u64);
assert_ne!(last_consensus_stats_1.hash, 0);
assert_eq!(
last_consensus_stats_1.stats.get_num_messages(0),
num_certificates as u64
);
assert_eq!(
last_consensus_stats_1.stats.get_num_user_transactions(0),
num_transactions as u64
);
for _ in 0..2 {
consensus_handler
.handle_consensus_output(consensus_output.clone())
.await;
let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
}
}
#[test]
pub fn test_update_index_and_hash() {
let index0 = ExecutionIndices {
sub_dag_index: 0,
transaction_index: 5,
last_committed_round: 0,
};
let index1 = ExecutionIndices {
sub_dag_index: 1,
transaction_index: 2,
last_committed_round: 3,
};
let mut last_seen = ExecutionIndicesWithStats {
index: index0,
hash: 1000,
stats: ConsensusStats::default(),
};
let tx = &[0];
update_index_and_hash(&mut last_seen, index1, tx);
assert_eq!(last_seen.index, index1);
assert_ne!(last_seen.hash, 1000);
}
#[test]
fn test_order_by_gas_price() {
let mut v = vec![cap_txn(10), user_txn(42), user_txn(100), cap_txn(1)];
PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
assert_eq!(
extract(v),
vec![
"cap(10)".to_string(),
"cap(1)".to_string(),
"user(100)".to_string(),
"user(42)".to_string(),
]
);
let mut v = vec![
user_txn(1200),
cap_txn(10),
user_txn(12),
user_txn(1000),
user_txn(42),
user_txn(100),
cap_txn(1),
user_txn(1000),
];
PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
assert_eq!(
extract(v),
vec![
"cap(10)".to_string(),
"cap(1)".to_string(),
"user(1200)".to_string(),
"user(1000)".to_string(),
"user(1000)".to_string(),
"user(100)".to_string(),
"user(42)".to_string(),
"user(12)".to_string(),
]
);
let mut v = vec![
cap_txn(10),
eop_txn(12),
eop_txn(10),
cap_txn(1),
eop_txn(11),
];
PostConsensusTxReorder::reorder(&mut v, ConsensusTransactionOrdering::ByGasPrice);
assert_eq!(
extract(v),
vec![
"cap(10)".to_string(),
"eop(12)".to_string(),
"eop(10)".to_string(),
"cap(1)".to_string(),
"eop(11)".to_string(),
]
);
}
fn extract(v: Vec<VerifiedSequencedConsensusTransaction>) -> Vec<String> {
v.into_iter().map(extract_one).collect()
}
fn extract_one(t: VerifiedSequencedConsensusTransaction) -> String {
match t.0.transaction {
SequencedConsensusTransactionKind::External(ext) => match ext.kind {
ConsensusTransactionKind::EndOfPublish(authority) => {
format!("eop({})", authority.0[0])
}
ConsensusTransactionKind::CapabilityNotification(cap) => {
format!("cap({})", cap.generation)
}
ConsensusTransactionKind::UserTransaction(txn) => {
format!("user({})", txn.transaction_data().gas_price())
}
_ => unreachable!(),
},
SequencedConsensusTransactionKind::System(_) => unreachable!(),
}
}
fn eop_txn(a: u8) -> VerifiedSequencedConsensusTransaction {
let mut authority = AuthorityName::default();
authority.0[0] = a;
txn(ConsensusTransactionKind::EndOfPublish(authority))
}
fn cap_txn(generation: u64) -> VerifiedSequencedConsensusTransaction {
txn(ConsensusTransactionKind::CapabilityNotification(
AuthorityCapabilities {
authority: Default::default(),
generation,
supported_protocol_versions: SupportedProtocolVersions::SYSTEM_DEFAULT,
available_system_packages: vec![],
},
))
}
fn user_txn(gas_price: u64) -> VerifiedSequencedConsensusTransaction {
let (committee, keypairs) = Committee::new_simple_test_committee();
let data = SenderSignedData::new(
TransactionData::new_transfer(
SuiAddress::default(),
random_object_ref(),
SuiAddress::default(),
random_object_ref(),
1000 * gas_price,
gas_price,
),
vec![],
);
txn(ConsensusTransactionKind::UserTransaction(Box::new(
CertifiedTransaction::new_from_keypairs_for_testing(data, &keypairs, &committee),
)))
}
fn txn(kind: ConsensusTransactionKind) -> VerifiedSequencedConsensusTransaction {
VerifiedSequencedConsensusTransaction::new_test(ConsensusTransaction {
kind,
tracking_id: Default::default(),
})
}
}