1use std::sync::Arc;
5
6use consensus_core::{TransactionVerifier, ValidationError};
7use consensus_types::block::{BlockRef, TransactionIndex};
8use fastcrypto_tbls::dkg_v1;
9use mysten_metrics::monitored_scope;
10use prometheus::{
11 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
12 register_int_counter_with_registry,
13};
14use sui_macros::fail_point_arg;
15#[cfg(msim)]
16use sui_types::base_types::AuthorityName;
17use sui_types::{
18 error::{SuiError, SuiErrorKind, SuiResult},
19 messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
20 transaction::{TransactionDataAPI, TransactionWithAliases, WithAliases},
21};
22use tap::TapFallible;
23use tracing::{debug, info, instrument, warn};
24
25use crate::{
26 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
27 checkpoints::CheckpointServiceNotify,
28 consensus_adapter::{ConsensusOverloadChecker, NoopConsensusOverloadChecker},
29};
30
31#[derive(Clone)]
34pub struct SuiTxValidator {
35 authority_state: Arc<AuthorityState>,
36 consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
37 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
38 metrics: Arc<SuiTxValidatorMetrics>,
39}
40
41impl SuiTxValidator {
42 pub fn new(
43 authority_state: Arc<AuthorityState>,
44 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
45 metrics: Arc<SuiTxValidatorMetrics>,
46 ) -> Self {
47 let epoch_store = authority_state.load_epoch_store_one_call_per_task().clone();
48 info!(
49 "SuiTxValidator constructed for epoch {}",
50 epoch_store.epoch()
51 );
52 let consensus_overload_checker = Arc::new(NoopConsensusOverloadChecker {});
54 Self {
55 authority_state,
56 consensus_overload_checker,
57 checkpoint_service,
58 metrics,
59 }
60 }
61
62 fn validate_transactions(&self, txs: &[ConsensusTransactionKind]) -> Result<(), SuiError> {
63 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
64
65 let mut cert_batch = Vec::new();
66 let mut ckpt_messages = Vec::new();
67 let mut ckpt_batch = Vec::new();
68 for tx in txs.iter() {
69 match tx {
70 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
71 cert_batch.push(certificate.as_ref());
72 }
73 ConsensusTransactionKind::CheckpointSignature(signature) => {
74 ckpt_messages.push(signature.as_ref());
75 ckpt_batch.push(&signature.summary);
76 }
77 ConsensusTransactionKind::CheckpointSignatureV2(signature) => {
78 if !epoch_store
79 .protocol_config()
80 .consensus_checkpoint_signature_key_includes_digest()
81 {
82 return Err(SuiErrorKind::UnexpectedMessage(
83 "ConsensusTransactionKind::CheckpointSignatureV2 is unsupported"
84 .to_string(),
85 )
86 .into());
87 }
88 ckpt_messages.push(signature.as_ref());
89 ckpt_batch.push(&signature.summary);
90 }
91 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
92 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
93 warn!("batch verification error: DKG Message too large");
94 return Err(SuiErrorKind::InvalidDkgMessageSize.into());
95 }
96 }
97 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
98 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
99 warn!("batch verification error: DKG Confirmation too large");
100 return Err(SuiErrorKind::InvalidDkgMessageSize.into());
101 }
102 }
103
104 ConsensusTransactionKind::CapabilityNotification(_) => {}
105
106 ConsensusTransactionKind::EndOfPublish(_)
107 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
108 | ConsensusTransactionKind::CapabilityNotificationV2(_)
109 | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {}
110
111 ConsensusTransactionKind::UserTransaction(_)
112 | ConsensusTransactionKind::UserTransactionV2(_) => {
113 if !epoch_store.protocol_config().mysticeti_fastpath() {
114 return Err(SuiErrorKind::UnexpectedMessage(
115 "ConsensusTransactionKind::UserTransaction is unsupported".to_string(),
116 )
117 .into());
118 }
119 }
122
123 ConsensusTransactionKind::ExecutionTimeObservation(obs) => {
124 if obs.estimates.len()
126 > epoch_store
127 .protocol_config()
128 .max_programmable_tx_commands()
129 .try_into()
130 .unwrap()
131 {
132 return Err(SuiErrorKind::UnexpectedMessage(format!(
133 "ExecutionTimeObservation contains too many estimates: {}",
134 obs.estimates.len()
135 ))
136 .into());
137 }
138 }
139 }
140 }
141
142 let cert_count = cert_batch.len();
144 let ckpt_count = ckpt_batch.len();
145
146 epoch_store
147 .signature_verifier
148 .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
149 .tap_err(|e| warn!("batch verification error: {}", e))?;
150
151 for ckpt in ckpt_messages {
153 self.checkpoint_service
154 .notify_checkpoint_signature(&epoch_store, ckpt)?;
155 }
156
157 self.metrics
158 .certificate_signatures_verified
159 .inc_by(cert_count as u64);
160 self.metrics
161 .checkpoint_signatures_verified
162 .inc_by(ckpt_count as u64);
163 Ok(())
164 }
165
166 #[instrument(level = "debug", skip_all, fields(block_ref))]
167 fn vote_transactions(
168 &self,
169 block_ref: &BlockRef,
170 txs: Vec<ConsensusTransactionKind>,
171 ) -> Vec<TransactionIndex> {
172 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
173 if !epoch_store.protocol_config().mysticeti_fastpath() {
174 return vec![];
175 }
176
177 let mut result = Vec::new();
178 for (i, tx) in txs.into_iter().enumerate() {
179 let tx = match tx {
180 ConsensusTransactionKind::UserTransaction(tx) => {
181 let no_aliases_allowed = tx
182 .intent_message()
183 .value
184 .required_signers()
185 .map(|s| (s, None));
186 WithAliases::new(*tx, no_aliases_allowed)
187 }
188 ConsensusTransactionKind::UserTransactionV2(tx) => *tx,
189 _ => continue,
190 };
191
192 let tx_digest = *tx.tx().digest();
193 if let Err(error) = self.vote_transaction(&epoch_store, tx) {
194 debug!(?tx_digest, "Voting to reject transaction: {error}");
195 self.metrics
196 .transaction_reject_votes
197 .with_label_values(&[error.to_variant_name()])
198 .inc();
199 result.push(i as TransactionIndex);
200 epoch_store.set_rejection_vote_reason(
202 ConsensusPosition {
203 epoch: epoch_store.epoch(),
204 block: *block_ref,
205 index: i as TransactionIndex,
206 },
207 &error,
208 );
209 } else {
210 debug!(?tx_digest, "Voting to accept transaction");
211 }
212 }
213
214 result
215 }
216
217 #[instrument(level = "debug", skip_all, err(level = "debug"), fields(tx_digest = ?tx.tx().digest()))]
218 fn vote_transaction(
219 &self,
220 epoch_store: &Arc<AuthorityPerEpochStore>,
221 tx: TransactionWithAliases,
222 ) -> SuiResult<()> {
223 let (tx, aliases) = tx.into_inner();
224
225 tx.validity_check(&epoch_store.tx_validity_check_context())?;
228
229 self.authority_state.check_system_overload(
230 &*self.consensus_overload_checker,
231 tx.data(),
232 self.authority_state.check_system_overload_at_signing(),
233 )?;
234
235 #[allow(unused_mut)]
236 let mut fail_point_always_report_aliases_changed = false;
237 fail_point_arg!(
238 "consensus-validator-always-report-aliases-changed",
239 |for_validators: Vec<AuthorityName>| {
240 if for_validators.contains(&self.authority_state.name) {
241 fail_point_always_report_aliases_changed = true;
243 }
244 }
245 );
246
247 let verified_tx = epoch_store.verify_transaction_with_current_aliases(tx)?;
248 if *verified_tx.aliases() != aliases || fail_point_always_report_aliases_changed {
249 return Err(SuiErrorKind::AliasesChanged.into());
250 }
251
252 self.authority_state
253 .handle_vote_transaction(epoch_store, verified_tx.into_tx())?;
254
255 Ok(())
256 }
257}
258
259fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
260 bcs::from_bytes::<ConsensusTransaction>(tx)
261 .map_err(|e| {
262 ValidationError::InvalidTransaction(format!(
263 "Failed to parse transaction bytes: {:?}",
264 e
265 ))
266 })
267 .map(|tx| tx.kind)
268}
269
270impl TransactionVerifier for SuiTxValidator {
271 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
272 let _scope = monitored_scope("ValidateBatch");
273
274 let txs: Vec<_> = batch
275 .iter()
276 .map(|tx| tx_kind_from_bytes(tx))
277 .collect::<Result<Vec<_>, _>>()?;
278
279 self.validate_transactions(&txs)
280 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
281 }
282
283 fn verify_and_vote_batch(
284 &self,
285 block_ref: &BlockRef,
286 batch: &[&[u8]],
287 ) -> Result<Vec<TransactionIndex>, ValidationError> {
288 let _scope = monitored_scope("VerifyAndVoteBatch");
289
290 let txs: Vec<_> = batch
291 .iter()
292 .map(|tx| tx_kind_from_bytes(tx))
293 .collect::<Result<Vec<_>, _>>()?;
294
295 self.validate_transactions(&txs)
296 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))?;
297
298 Ok(self.vote_transactions(block_ref, txs))
299 }
300}
301
302pub struct SuiTxValidatorMetrics {
303 certificate_signatures_verified: IntCounter,
304 checkpoint_signatures_verified: IntCounter,
305 transaction_reject_votes: IntCounterVec,
306}
307
308impl SuiTxValidatorMetrics {
309 pub fn new(registry: &Registry) -> Arc<Self> {
310 Arc::new(Self {
311 certificate_signatures_verified: register_int_counter_with_registry!(
312 "tx_validator_certificate_signatures_verified",
313 "Number of certificates verified in consensus batch verifier",
314 registry
315 )
316 .unwrap(),
317 checkpoint_signatures_verified: register_int_counter_with_registry!(
318 "tx_validator_checkpoint_signatures_verified",
319 "Number of checkpoint verified in consensus batch verifier",
320 registry
321 )
322 .unwrap(),
323 transaction_reject_votes: register_int_counter_vec_with_registry!(
324 "tx_validator_transaction_reject_votes",
325 "Number of reject transaction votes per reason",
326 &["reason"],
327 registry
328 )
329 .unwrap(),
330 })
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use std::num::NonZeroUsize;
337 use std::sync::Arc;
338
339 use consensus_core::TransactionVerifier as _;
340 use consensus_types::block::BlockRef;
341 use fastcrypto::traits::KeyPair;
342 use sui_config::transaction_deny_config::TransactionDenyConfigBuilder;
343 use sui_macros::sim_test;
344 use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
345 use sui_types::crypto::deterministic_random_account_key;
346 use sui_types::error::{SuiErrorKind, UserInputError};
347 use sui_types::executable_transaction::VerifiedExecutableTransaction;
348 use sui_types::messages_checkpoint::{
349 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary,
350 };
351 use sui_types::messages_consensus::ConsensusPosition;
352 use sui_types::{
353 base_types::{ExecutionDigests, ObjectID},
354 crypto::Ed25519SuiSignature,
355 effects::TransactionEffectsAPI as _,
356 messages_consensus::ConsensusTransaction,
357 object::Object,
358 signature::GenericSignature,
359 };
360
361 use crate::authority::ExecutionEnv;
362 use crate::{
363 authority::test_authority_builder::TestAuthorityBuilder,
364 checkpoints::CheckpointServiceNoop,
365 consensus_adapter::consensus_tests::{
366 test_certificates, test_gas_objects, test_user_transaction,
367 },
368 consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
369 };
370
371 #[sim_test]
372 async fn accept_valid_transaction() {
373 let mut objects = test_gas_objects();
376 let shared_object = Object::shared_for_testing();
377 objects.push(shared_object.clone());
378
379 let network_config =
380 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
381 .with_objects(objects.clone())
382 .build();
383
384 let state = TestAuthorityBuilder::new()
385 .with_network_config(&network_config, 0)
386 .build()
387 .await;
388 let name1 = state.name;
389 let certificates = test_certificates(&state, shared_object).await;
390
391 let first_transaction = certificates[0].clone();
392 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
393 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
394 )
395 .unwrap();
396
397 let metrics = SuiTxValidatorMetrics::new(&Default::default());
398 let validator =
399 SuiTxValidator::new(state.clone(), Arc::new(CheckpointServiceNoop {}), metrics);
400 let res = validator.verify_batch(&[&first_transaction_bytes]);
401 assert!(res.is_ok(), "{res:?}");
402
403 let transaction_bytes: Vec<_> = certificates
404 .clone()
405 .into_iter()
406 .map(|cert| {
407 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
408 })
409 .collect();
410
411 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
412 let res_batch = validator.verify_batch(&batch);
413 assert!(res_batch.is_ok(), "{res_batch:?}");
414
415 let bogus_transaction_bytes: Vec<_> = certificates
416 .into_iter()
417 .map(|mut cert| {
418 cert.tx_signatures_mut_for_testing()[0] =
420 GenericSignature::Signature(sui_types::crypto::Signature::Ed25519SuiSignature(
421 Ed25519SuiSignature::default(),
422 ));
423 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
424 })
425 .collect();
426
427 let batch: Vec<_> = bogus_transaction_bytes
428 .iter()
429 .map(|t| t.as_slice())
430 .collect();
431 let res_batch = validator.verify_batch(&batch);
432 assert!(res_batch.is_err());
433 }
434
435 #[tokio::test]
436 async fn test_verify_and_vote_batch() {
437 let (sender, keypair) = deterministic_random_account_key();
439
440 let gas_objects: Vec<Object> = (0..8)
442 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
443 .collect();
444
445 let owned_objects: Vec<Object> = (0..2)
447 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
448 .collect();
449 let denied_object = owned_objects[1].clone();
450
451 let mut objects = gas_objects.clone();
452 objects.extend(owned_objects.clone());
453
454 let network_config =
455 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
456 .committee_size(NonZeroUsize::new(1).unwrap())
457 .with_objects(objects.clone())
458 .build();
459
460 let transaction_deny_config = TransactionDenyConfigBuilder::new()
462 .add_denied_object(denied_object.id())
463 .build();
464 let state = TestAuthorityBuilder::new()
465 .with_network_config(&network_config, 0)
466 .with_transaction_deny_config(transaction_deny_config)
467 .build()
468 .await;
469
470 let valid_transaction = test_user_transaction(
474 &state,
475 sender,
476 &keypair,
477 gas_objects[0].clone(),
478 vec![owned_objects[0].clone()],
479 )
480 .await;
481
482 let invalid_transaction = test_user_transaction(
484 &state,
485 sender,
486 &keypair,
487 gas_objects[1].clone(),
488 vec![denied_object.clone()],
489 )
490 .await;
491
492 let transactions = vec![valid_transaction, invalid_transaction];
494 let serialized_transactions: Vec<_> = transactions
495 .into_iter()
496 .map(|t| {
497 bcs::to_bytes(&ConsensusTransaction::new_user_transaction_v2_message(
498 &state.name,
499 t.into(),
500 ))
501 .unwrap()
502 })
503 .collect();
504 let batch: Vec<_> = serialized_transactions
505 .iter()
506 .map(|t| t.as_slice())
507 .collect();
508
509 let validator = SuiTxValidator::new(
510 state.clone(),
511 Arc::new(CheckpointServiceNoop {}),
512 SuiTxValidatorMetrics::new(&Default::default()),
513 );
514
515 let rejected_transactions = validator
517 .verify_and_vote_batch(&BlockRef::MAX, &batch)
518 .unwrap();
519
520 assert_eq!(rejected_transactions, vec![1]);
523
524 let epoch_store = state.load_epoch_store_one_call_per_task();
527 let reason = epoch_store
528 .get_rejection_vote_reason(ConsensusPosition {
529 epoch: state.load_epoch_store_one_call_per_task().epoch(),
530 block: BlockRef::MAX,
531 index: 1,
532 })
533 .expect("Rejection vote reason should be set");
534
535 assert_eq!(
536 reason,
537 SuiErrorKind::UserInputError {
538 error: UserInputError::TransactionDenied {
539 error: format!(
540 "Access to input object {:?} is temporarily disabled",
541 denied_object.id()
542 )
543 }
544 }
545 );
546 }
547
548 #[sim_test]
549 async fn reject_checkpoint_signature_v2_when_flag_disabled() {
550 let network_config =
552 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
553
554 let disabled_cfg =
555 ProtocolConfig::get_for_version(ProtocolVersion::new(92), Chain::Unknown);
556 let state = TestAuthorityBuilder::new()
557 .with_network_config(&network_config, 0)
558 .with_protocol_config(disabled_cfg)
559 .build()
560 .await;
561
562 let epoch_store = state.load_epoch_store_one_call_per_task();
563
564 let checkpoint_summary = CheckpointSummary::new(
566 &ProtocolConfig::get_for_max_version_UNSAFE(),
567 epoch_store.epoch(),
568 0,
569 0,
570 &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
571 None,
572 Default::default(),
573 None,
574 0,
575 Vec::new(),
576 Vec::new(),
577 );
578
579 let keypair = network_config.validator_configs()[0].protocol_key_pair();
580 let authority = keypair.public().into();
581 let signed = SignedCheckpointSummary::new(
582 epoch_store.epoch(),
583 checkpoint_summary,
584 keypair,
585 authority,
586 );
587 let message = CheckpointSignatureMessage { summary: signed };
588
589 let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
590 let bytes = bcs::to_bytes(&tx).unwrap();
591
592 let validator = SuiTxValidator::new(
593 state.clone(),
594 Arc::new(CheckpointServiceNoop {}),
595 SuiTxValidatorMetrics::new(&Default::default()),
596 );
597
598 let res = validator.verify_batch(&[&bytes]);
599 assert!(res.is_err());
600 }
601
602 #[sim_test]
603 async fn accept_checkpoint_signature_v2_when_flag_enabled() {
604 let network_config =
606 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
607
608 let enabled_cfg = ProtocolConfig::get_for_version(ProtocolVersion::new(93), Chain::Unknown);
609 let state = TestAuthorityBuilder::new()
610 .with_network_config(&network_config, 0)
611 .with_protocol_config(enabled_cfg)
612 .build()
613 .await;
614
615 let epoch_store = state.load_epoch_store_one_call_per_task();
616
617 let checkpoint_summary = CheckpointSummary::new(
619 &ProtocolConfig::get_for_max_version_UNSAFE(),
620 epoch_store.epoch(),
621 0,
622 0,
623 &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
624 None,
625 Default::default(),
626 None,
627 0,
628 Vec::new(),
629 Vec::new(),
630 );
631
632 let keypair = network_config.validator_configs()[0].protocol_key_pair();
633 let authority = keypair.public().into();
634 let signed = SignedCheckpointSummary::new(
635 epoch_store.epoch(),
636 checkpoint_summary,
637 keypair,
638 authority,
639 );
640 let message = CheckpointSignatureMessage { summary: signed };
641
642 let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
643 let bytes = bcs::to_bytes(&tx).unwrap();
644
645 let validator = SuiTxValidator::new(
646 state.clone(),
647 Arc::new(CheckpointServiceNoop {}),
648 SuiTxValidatorMetrics::new(&Default::default()),
649 );
650
651 let res = validator.verify_batch(&[&bytes]);
652 assert!(res.is_ok(), "{res:?}");
653 }
654
655 #[sim_test]
656 async fn accept_already_executed_transaction() {
657 let (sender, keypair) = deterministic_random_account_key();
658
659 let gas_object = Object::with_id_owner_for_testing(ObjectID::random(), sender);
660 let owned_object = Object::with_id_owner_for_testing(ObjectID::random(), sender);
661
662 let network_config =
663 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
664 .committee_size(NonZeroUsize::new(1).unwrap())
665 .with_objects(vec![gas_object.clone(), owned_object.clone()])
666 .build();
667
668 let state = TestAuthorityBuilder::new()
669 .with_network_config(&network_config, 0)
670 .build()
671 .await;
672
673 let epoch_store = state.load_epoch_store_one_call_per_task();
674
675 let transaction = test_user_transaction(
677 &state,
678 sender,
679 &keypair,
680 gas_object.clone(),
681 vec![owned_object.clone()],
682 )
683 .await
684 .into_tx();
685 let tx_digest = *transaction.digest();
686 let cert = VerifiedExecutableTransaction::new_from_quorum_execution(transaction.clone(), 0);
687 let (executed_effects, _) = state
688 .try_execute_immediately(&cert, ExecutionEnv::new(), &state.epoch_store_for_testing())
689 .await
690 .unwrap();
691
692 let read_effects = state
694 .get_transaction_cache_reader()
695 .get_executed_effects(&tx_digest)
696 .expect("Transaction should be executed");
697 assert_eq!(read_effects, executed_effects);
698 assert_eq!(read_effects.executed_epoch(), epoch_store.epoch());
699
700 let serialized_tx = bcs::to_bytes(&ConsensusTransaction::new_user_transaction_message(
702 &state.name,
703 transaction.into_inner().clone(),
704 ))
705 .unwrap();
706 let validator = SuiTxValidator::new(
707 state.clone(),
708 Arc::new(CheckpointServiceNoop {}),
709 SuiTxValidatorMetrics::new(&Default::default()),
710 );
711 let rejected_transactions = validator
712 .verify_and_vote_batch(&BlockRef::MAX, &[&serialized_tx])
713 .expect("Verify and vote should succeed");
714
715 assert!(rejected_transactions.is_empty());
717 }
718}