use std::sync::Arc;
use consensus_core::{TransactionVerifier, ValidationError};
use eyre::WrapErr;
use mysten_metrics::monitored_scope;
use narwhal_types::{validate_batch_version, BatchAPI};
use narwhal_worker::TransactionValidator;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use sui_protocol_config::ProtocolConfig;
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use tap::TapFallible;
use tracing::{info, warn};
use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
checkpoints::CheckpointServiceNotify, transaction_manager::TransactionManager,
};
#[derive(Clone)]
pub struct SuiTxValidator {
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
_transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
}
impl SuiTxValidator {
pub fn new(
epoch_store: Arc<AuthorityPerEpochStore>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
) -> Self {
info!(
"SuiTxValidator constructed for epoch {}",
epoch_store.epoch()
);
Self {
epoch_store,
checkpoint_service,
_transaction_manager: transaction_manager,
metrics,
}
}
fn validate_transactions(
&self,
txs: Vec<ConsensusTransactionKind>,
) -> Result<(), eyre::Report> {
let mut cert_batch = Vec::new();
let mut ckpt_messages = Vec::new();
let mut ckpt_batch = Vec::new();
for tx in txs.into_iter() {
match tx {
ConsensusTransactionKind::UserTransaction(certificate) => {
cert_batch.push(*certificate);
}
ConsensusTransactionKind::CheckpointSignature(signature) => {
ckpt_messages.push(signature.clone());
ckpt_batch.push(signature.summary);
}
ConsensusTransactionKind::EndOfPublish(_)
| ConsensusTransactionKind::CapabilityNotification(_)
| ConsensusTransactionKind::NewJWKFetched(_, _, _)
| ConsensusTransactionKind::RandomnessStateUpdate(_, _)
| ConsensusTransactionKind::RandomnessDkgMessage(_, _)
| ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) => {}
}
}
let cert_count = cert_batch.len();
let ckpt_count = ckpt_batch.len();
self.epoch_store
.signature_verifier
.verify_certs_and_checkpoints(cert_batch, ckpt_batch)
.tap_err(|e| warn!("batch verification error: {}", e))
.wrap_err("Malformed batch (failed to verify)")?;
for ckpt in ckpt_messages {
self.checkpoint_service
.notify_checkpoint_signature(&self.epoch_store, &ckpt)?;
}
self.metrics
.certificate_signatures_verified
.inc_by(cert_count as u64);
self.metrics
.checkpoint_signatures_verified
.inc_by(ckpt_count as u64);
Ok(())
}
}
fn tx_from_bytes(tx: &[u8]) -> Result<ConsensusTransaction, eyre::Report> {
bcs::from_bytes::<ConsensusTransaction>(tx)
.wrap_err("Malformed transaction (failed to deserialize)")
}
impl TransactionValidator for SuiTxValidator {
type Error = eyre::Report;
fn validate(&self, _tx: &[u8]) -> Result<(), Self::Error> {
Ok(())
}
fn validate_batch(
&self,
b: &narwhal_types::Batch,
protocol_config: &ProtocolConfig,
) -> Result<(), Self::Error> {
let _scope = monitored_scope("ValidateBatch");
validate_batch_version(b, protocol_config)
.map_err(|err| eyre::eyre!(format!("Invalid Batch: {err}")))?;
let txs = b
.transactions()
.iter()
.map(|tx| tx_from_bytes(tx).map(|tx| tx.kind))
.collect::<Result<Vec<_>, _>>()?;
self.validate_transactions(txs)
}
}
impl TransactionVerifier for SuiTxValidator {
fn verify_batch(
&self,
_protocol_config: &ProtocolConfig,
batch: &[&[u8]],
) -> Result<(), ValidationError> {
let txs = batch
.iter()
.map(|tx| {
tx_from_bytes(tx)
.map(|tx| tx.kind)
.map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
self.validate_transactions(txs)
.map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
}
}
pub struct SuiTxValidatorMetrics {
certificate_signatures_verified: IntCounter,
checkpoint_signatures_verified: IntCounter,
}
impl SuiTxValidatorMetrics {
pub fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
certificate_signatures_verified: register_int_counter_with_registry!(
"certificate_signatures_verified",
"Number of certificates verified in narwhal batch verifier",
registry
)
.unwrap(),
checkpoint_signatures_verified: register_int_counter_with_registry!(
"checkpoint_signatures_verified",
"Number of checkpoint verified in narwhal batch verifier",
registry
)
.unwrap(),
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use narwhal_test_utils::latest_protocol_version;
use narwhal_types::{Batch, BatchV1};
use narwhal_worker::TransactionValidator;
use sui_macros::sim_test;
use sui_types::{
crypto::Ed25519SuiSignature, messages_consensus::ConsensusTransaction, object::Object,
signature::GenericSignature,
};
use crate::{
authority::test_authority_builder::TestAuthorityBuilder,
checkpoints::CheckpointServiceNoop,
consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
};
#[sim_test]
async fn accept_valid_transaction() {
let mut objects = test_gas_objects();
objects.push(Object::shared_for_testing());
let latest_protocol_config = &latest_protocol_version();
let network_config =
sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
.with_objects(objects.clone())
.build();
let state = TestAuthorityBuilder::new()
.with_network_config(&network_config)
.build()
.await;
let name1 = state.name;
let certificates = test_certificates(&state).await;
let first_transaction = certificates[0].clone();
let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
&ConsensusTransaction::new_certificate_message(&name1, first_transaction),
)
.unwrap();
let metrics = SuiTxValidatorMetrics::new(&Default::default());
let validator = SuiTxValidator::new(
state.epoch_store_for_testing().clone(),
Arc::new(CheckpointServiceNoop {}),
state.transaction_manager().clone(),
metrics,
);
let res = validator.validate(&first_transaction_bytes);
assert!(res.is_ok(), "{res:?}");
let transaction_bytes: Vec<_> = certificates
.clone()
.into_iter()
.map(|cert| {
bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
})
.collect();
let batch = Batch::new(transaction_bytes, latest_protocol_config);
let res_batch = validator.validate_batch(&batch, latest_protocol_config);
assert!(res_batch.is_ok(), "{res_batch:?}");
let bogus_transaction_bytes: Vec<_> = certificates
.into_iter()
.map(|mut cert| {
cert.tx_signatures_mut_for_testing()[0] =
GenericSignature::Signature(sui_types::crypto::Signature::Ed25519SuiSignature(
Ed25519SuiSignature::default(),
));
bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
})
.collect();
let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config);
let res_batch = validator.validate_batch(&batch, latest_protocol_config);
assert!(res_batch.is_err());
let batch_v1 = Batch::V1(BatchV1::new(vec![]));
let res_batch = validator.validate_batch(&batch_v1, latest_protocol_config);
assert!(res_batch.is_err());
let batch_v2 = Batch::new(vec![], latest_protocol_config);
let res_batch = validator.validate_batch(&batch_v2, latest_protocol_config);
assert!(res_batch.is_ok());
}
}