1use std::sync::Arc;
5
6use consensus_core::{TransactionVerifier, ValidationError};
7use consensus_types::block::{BlockRef, TransactionIndex};
8use fastcrypto_tbls::dkg_v1;
9use mysten_metrics::monitored_scope;
10use prometheus::{
11 IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
12 register_int_counter_with_registry,
13};
14use sui_types::{
15 error::{SuiError, SuiErrorKind, SuiResult},
16 messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
17 transaction::Transaction,
18};
19use tap::TapFallible;
20use tracing::{debug, info, instrument, warn};
21
22use crate::{
23 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
24 checkpoints::CheckpointServiceNotify,
25 consensus_adapter::ConsensusOverloadChecker,
26};
27
28#[derive(Clone)]
30pub struct SuiTxValidator {
31 authority_state: Arc<AuthorityState>,
32 consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
33 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
34 metrics: Arc<SuiTxValidatorMetrics>,
35}
36
37impl SuiTxValidator {
38 pub fn new(
39 authority_state: Arc<AuthorityState>,
40 consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
41 checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
42 metrics: Arc<SuiTxValidatorMetrics>,
43 ) -> Self {
44 let epoch_store = authority_state.load_epoch_store_one_call_per_task().clone();
45 info!(
46 "SuiTxValidator constructed for epoch {}",
47 epoch_store.epoch()
48 );
49 Self {
50 authority_state,
51 consensus_overload_checker,
52 checkpoint_service,
53 metrics,
54 }
55 }
56
57 fn validate_transactions(&self, txs: &[ConsensusTransactionKind]) -> Result<(), SuiError> {
58 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
59
60 let mut cert_batch = Vec::new();
61 let mut ckpt_messages = Vec::new();
62 let mut ckpt_batch = Vec::new();
63 for tx in txs.iter() {
64 match tx {
65 ConsensusTransactionKind::CertifiedTransaction(certificate) => {
66 cert_batch.push(certificate.as_ref());
67 }
68 ConsensusTransactionKind::CheckpointSignature(signature) => {
69 ckpt_messages.push(signature.as_ref());
70 ckpt_batch.push(&signature.summary);
71 }
72 ConsensusTransactionKind::CheckpointSignatureV2(signature) => {
73 if !epoch_store
74 .protocol_config()
75 .consensus_checkpoint_signature_key_includes_digest()
76 {
77 return Err(SuiErrorKind::UnexpectedMessage(
78 "ConsensusTransactionKind::CheckpointSignatureV2 is unsupported"
79 .to_string(),
80 )
81 .into());
82 }
83 ckpt_messages.push(signature.as_ref());
84 ckpt_batch.push(&signature.summary);
85 }
86 ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
87 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
88 warn!("batch verification error: DKG Message too large");
89 return Err(SuiErrorKind::InvalidDkgMessageSize.into());
90 }
91 }
92 ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
93 if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
94 warn!("batch verification error: DKG Confirmation too large");
95 return Err(SuiErrorKind::InvalidDkgMessageSize.into());
96 }
97 }
98
99 ConsensusTransactionKind::CapabilityNotification(_) => {}
100
101 ConsensusTransactionKind::EndOfPublish(_)
102 | ConsensusTransactionKind::NewJWKFetched(_, _, _)
103 | ConsensusTransactionKind::CapabilityNotificationV2(_)
104 | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {}
105
106 ConsensusTransactionKind::UserTransaction(_tx) => {
107 if !epoch_store.protocol_config().mysticeti_fastpath() {
108 return Err(SuiErrorKind::UnexpectedMessage(
109 "ConsensusTransactionKind::UserTransaction is unsupported".to_string(),
110 )
111 .into());
112 }
113 }
116
117 ConsensusTransactionKind::ExecutionTimeObservation(obs) => {
118 if obs.estimates.len()
120 > epoch_store
121 .protocol_config()
122 .max_programmable_tx_commands()
123 .try_into()
124 .unwrap()
125 {
126 return Err(SuiErrorKind::UnexpectedMessage(format!(
127 "ExecutionTimeObservation contains too many estimates: {}",
128 obs.estimates.len()
129 ))
130 .into());
131 }
132 }
133 }
134 }
135
136 let cert_count = cert_batch.len();
138 let ckpt_count = ckpt_batch.len();
139
140 epoch_store
141 .signature_verifier
142 .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
143 .tap_err(|e| warn!("batch verification error: {}", e))?;
144
145 for ckpt in ckpt_messages {
147 self.checkpoint_service
148 .notify_checkpoint_signature(&epoch_store, ckpt)?;
149 }
150
151 self.metrics
152 .certificate_signatures_verified
153 .inc_by(cert_count as u64);
154 self.metrics
155 .checkpoint_signatures_verified
156 .inc_by(ckpt_count as u64);
157 Ok(())
158 }
159
160 #[instrument(level = "debug", skip_all, fields(block_ref))]
161 fn vote_transactions(
162 &self,
163 block_ref: &BlockRef,
164 txs: Vec<ConsensusTransactionKind>,
165 ) -> Vec<TransactionIndex> {
166 let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
167 if !epoch_store.protocol_config().mysticeti_fastpath() {
168 return vec![];
169 }
170
171 let mut result = Vec::new();
172 for (i, tx) in txs.into_iter().enumerate() {
173 let ConsensusTransactionKind::UserTransaction(tx) = tx else {
174 continue;
175 };
176
177 let tx_digest = *tx.digest();
178 if let Err(error) = self.vote_transaction(&epoch_store, tx) {
179 debug!(?tx_digest, "Voting to reject transaction: {error}");
180 self.metrics
181 .transaction_reject_votes
182 .with_label_values(&[error.to_variant_name()])
183 .inc();
184 result.push(i as TransactionIndex);
185 epoch_store.set_rejection_vote_reason(
187 ConsensusPosition {
188 epoch: epoch_store.epoch(),
189 block: *block_ref,
190 index: i as TransactionIndex,
191 },
192 &error,
193 );
194 } else {
195 debug!(?tx_digest, "Voting to accept transaction");
196 }
197 }
198
199 result
200 }
201
202 #[instrument(level = "debug", skip_all, err(level = "debug"), fields(tx_digest = ?tx.digest()))]
203 fn vote_transaction(
204 &self,
205 epoch_store: &Arc<AuthorityPerEpochStore>,
206 tx: Box<Transaction>,
207 ) -> SuiResult<()> {
208 tx.validity_check(&epoch_store.tx_validity_check_context())?;
211
212 self.authority_state.check_system_overload(
213 &*self.consensus_overload_checker,
214 tx.data(),
215 self.authority_state.check_system_overload_at_signing(),
216 )?;
217
218 let tx = epoch_store.verify_transaction(*tx)?;
219
220 self.authority_state
221 .handle_vote_transaction(epoch_store, tx)?;
222
223 Ok(())
224 }
225}
226
227fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
228 bcs::from_bytes::<ConsensusTransaction>(tx)
229 .map_err(|e| {
230 ValidationError::InvalidTransaction(format!(
231 "Failed to parse transaction bytes: {:?}",
232 e
233 ))
234 })
235 .map(|tx| tx.kind)
236}
237
238impl TransactionVerifier for SuiTxValidator {
239 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
240 let _scope = monitored_scope("ValidateBatch");
241
242 let txs: Vec<_> = batch
243 .iter()
244 .map(|tx| tx_kind_from_bytes(tx))
245 .collect::<Result<Vec<_>, _>>()?;
246
247 self.validate_transactions(&txs)
248 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
249 }
250
251 fn verify_and_vote_batch(
252 &self,
253 block_ref: &BlockRef,
254 batch: &[&[u8]],
255 ) -> Result<Vec<TransactionIndex>, ValidationError> {
256 let _scope = monitored_scope("VerifyAndVoteBatch");
257
258 let txs: Vec<_> = batch
259 .iter()
260 .map(|tx| tx_kind_from_bytes(tx))
261 .collect::<Result<Vec<_>, _>>()?;
262
263 self.validate_transactions(&txs)
264 .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))?;
265
266 Ok(self.vote_transactions(block_ref, txs))
267 }
268}
269
270pub struct SuiTxValidatorMetrics {
271 certificate_signatures_verified: IntCounter,
272 checkpoint_signatures_verified: IntCounter,
273 transaction_reject_votes: IntCounterVec,
274}
275
276impl SuiTxValidatorMetrics {
277 pub fn new(registry: &Registry) -> Arc<Self> {
278 Arc::new(Self {
279 certificate_signatures_verified: register_int_counter_with_registry!(
280 "tx_validator_certificate_signatures_verified",
281 "Number of certificates verified in consensus batch verifier",
282 registry
283 )
284 .unwrap(),
285 checkpoint_signatures_verified: register_int_counter_with_registry!(
286 "tx_validator_checkpoint_signatures_verified",
287 "Number of checkpoint verified in consensus batch verifier",
288 registry
289 )
290 .unwrap(),
291 transaction_reject_votes: register_int_counter_vec_with_registry!(
292 "tx_validator_transaction_reject_votes",
293 "Number of reject transaction votes per reason",
294 &["reason"],
295 registry
296 )
297 .unwrap(),
298 })
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use std::num::NonZeroUsize;
305 use std::sync::Arc;
306
307 use consensus_core::TransactionVerifier as _;
308 use consensus_types::block::BlockRef;
309 use fastcrypto::traits::KeyPair;
310 use sui_config::transaction_deny_config::TransactionDenyConfigBuilder;
311 use sui_macros::sim_test;
312 use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
313 use sui_types::crypto::deterministic_random_account_key;
314 use sui_types::error::{SuiErrorKind, UserInputError};
315 use sui_types::messages_checkpoint::{
316 CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary,
317 };
318 use sui_types::messages_consensus::ConsensusPosition;
319 use sui_types::{
320 base_types::{ExecutionDigests, ObjectID},
321 crypto::Ed25519SuiSignature,
322 messages_consensus::ConsensusTransaction,
323 object::Object,
324 signature::GenericSignature,
325 };
326
327 use crate::{
328 authority::test_authority_builder::TestAuthorityBuilder,
329 checkpoints::CheckpointServiceNoop,
330 consensus_adapter::{
331 NoopConsensusOverloadChecker,
332 consensus_tests::{test_certificates, test_gas_objects, test_user_transaction},
333 },
334 consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
335 };
336
337 #[sim_test]
338 async fn accept_valid_transaction() {
339 let mut objects = test_gas_objects();
342 let shared_object = Object::shared_for_testing();
343 objects.push(shared_object.clone());
344
345 let network_config =
346 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
347 .with_objects(objects.clone())
348 .build();
349
350 let state = TestAuthorityBuilder::new()
351 .with_network_config(&network_config, 0)
352 .build()
353 .await;
354 let name1 = state.name;
355 let certificates = test_certificates(&state, shared_object).await;
356
357 let first_transaction = certificates[0].clone();
358 let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
359 &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
360 )
361 .unwrap();
362
363 let metrics = SuiTxValidatorMetrics::new(&Default::default());
364 let validator = SuiTxValidator::new(
365 state.clone(),
366 Arc::new(NoopConsensusOverloadChecker {}),
367 Arc::new(CheckpointServiceNoop {}),
368 metrics,
369 );
370 let res = validator.verify_batch(&[&first_transaction_bytes]);
371 assert!(res.is_ok(), "{res:?}");
372
373 let transaction_bytes: Vec<_> = certificates
374 .clone()
375 .into_iter()
376 .map(|cert| {
377 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
378 })
379 .collect();
380
381 let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
382 let res_batch = validator.verify_batch(&batch);
383 assert!(res_batch.is_ok(), "{res_batch:?}");
384
385 let bogus_transaction_bytes: Vec<_> = certificates
386 .into_iter()
387 .map(|mut cert| {
388 cert.tx_signatures_mut_for_testing()[0] =
390 GenericSignature::Signature(sui_types::crypto::Signature::Ed25519SuiSignature(
391 Ed25519SuiSignature::default(),
392 ));
393 bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
394 })
395 .collect();
396
397 let batch: Vec<_> = bogus_transaction_bytes
398 .iter()
399 .map(|t| t.as_slice())
400 .collect();
401 let res_batch = validator.verify_batch(&batch);
402 assert!(res_batch.is_err());
403 }
404
405 #[tokio::test]
406 async fn test_verify_and_vote_batch() {
407 let (sender, keypair) = deterministic_random_account_key();
409
410 let gas_objects: Vec<Object> = (0..8)
412 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
413 .collect();
414
415 let owned_objects: Vec<Object> = (0..2)
417 .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
418 .collect();
419 let denied_object = owned_objects[1].clone();
420
421 let mut objects = gas_objects.clone();
422 objects.extend(owned_objects.clone());
423
424 let network_config =
425 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
426 .committee_size(NonZeroUsize::new(1).unwrap())
427 .with_objects(objects.clone())
428 .build();
429
430 let transaction_deny_config = TransactionDenyConfigBuilder::new()
432 .add_denied_object(denied_object.id())
433 .build();
434 let state = TestAuthorityBuilder::new()
435 .with_network_config(&network_config, 0)
436 .with_transaction_deny_config(transaction_deny_config)
437 .build()
438 .await;
439
440 let valid_transaction = test_user_transaction(
444 &state,
445 sender,
446 &keypair,
447 gas_objects[0].clone(),
448 vec![owned_objects[0].clone()],
449 )
450 .await;
451
452 let invalid_transaction = test_user_transaction(
454 &state,
455 sender,
456 &keypair,
457 gas_objects[1].clone(),
458 vec![denied_object.clone()],
459 )
460 .await;
461
462 let transactions = vec![valid_transaction, invalid_transaction];
464 let serialized_transactions: Vec<_> = transactions
465 .into_iter()
466 .map(|t| {
467 bcs::to_bytes(&ConsensusTransaction::new_user_transaction_message(
468 &state.name,
469 t.inner().clone(),
470 ))
471 .unwrap()
472 })
473 .collect();
474 let batch: Vec<_> = serialized_transactions
475 .iter()
476 .map(|t| t.as_slice())
477 .collect();
478
479 let validator = SuiTxValidator::new(
480 state.clone(),
481 Arc::new(NoopConsensusOverloadChecker {}),
482 Arc::new(CheckpointServiceNoop {}),
483 SuiTxValidatorMetrics::new(&Default::default()),
484 );
485
486 let rejected_transactions = validator
488 .verify_and_vote_batch(&BlockRef::MAX, &batch)
489 .unwrap();
490
491 assert_eq!(rejected_transactions, vec![1]);
494
495 let epoch_store = state.load_epoch_store_one_call_per_task();
498 let reason = epoch_store
499 .get_rejection_vote_reason(ConsensusPosition {
500 epoch: state.load_epoch_store_one_call_per_task().epoch(),
501 block: BlockRef::MAX,
502 index: 1,
503 })
504 .expect("Rejection vote reason should be set");
505
506 assert_eq!(
507 reason,
508 SuiErrorKind::UserInputError {
509 error: UserInputError::TransactionDenied {
510 error: format!(
511 "Access to input object {:?} is temporarily disabled",
512 denied_object.id()
513 )
514 }
515 }
516 );
517 }
518
519 #[sim_test]
520 async fn reject_checkpoint_signature_v2_when_flag_disabled() {
521 let network_config =
523 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
524
525 let disabled_cfg =
526 ProtocolConfig::get_for_version(ProtocolVersion::new(92), Chain::Unknown);
527 let state = TestAuthorityBuilder::new()
528 .with_network_config(&network_config, 0)
529 .with_protocol_config(disabled_cfg)
530 .build()
531 .await;
532
533 let epoch_store = state.load_epoch_store_one_call_per_task();
534
535 let checkpoint_summary = CheckpointSummary::new(
537 &ProtocolConfig::get_for_max_version_UNSAFE(),
538 epoch_store.epoch(),
539 0,
540 0,
541 &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
542 None,
543 Default::default(),
544 None,
545 0,
546 Vec::new(),
547 Vec::new(),
548 );
549
550 let keypair = network_config.validator_configs()[0].protocol_key_pair();
551 let authority = keypair.public().into();
552 let signed = SignedCheckpointSummary::new(
553 epoch_store.epoch(),
554 checkpoint_summary,
555 keypair,
556 authority,
557 );
558 let message = CheckpointSignatureMessage { summary: signed };
559
560 let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
561 let bytes = bcs::to_bytes(&tx).unwrap();
562
563 let validator = SuiTxValidator::new(
564 state.clone(),
565 Arc::new(NoopConsensusOverloadChecker {}),
566 Arc::new(CheckpointServiceNoop {}),
567 SuiTxValidatorMetrics::new(&Default::default()),
568 );
569
570 let res = validator.verify_batch(&[&bytes]);
571 assert!(res.is_err());
572 }
573
574 #[sim_test]
575 async fn accept_checkpoint_signature_v2_when_flag_enabled() {
576 let network_config =
578 sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
579
580 let enabled_cfg = ProtocolConfig::get_for_version(ProtocolVersion::new(93), Chain::Unknown);
581 let state = TestAuthorityBuilder::new()
582 .with_network_config(&network_config, 0)
583 .with_protocol_config(enabled_cfg)
584 .build()
585 .await;
586
587 let epoch_store = state.load_epoch_store_one_call_per_task();
588
589 let checkpoint_summary = CheckpointSummary::new(
591 &ProtocolConfig::get_for_max_version_UNSAFE(),
592 epoch_store.epoch(),
593 0,
594 0,
595 &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
596 None,
597 Default::default(),
598 None,
599 0,
600 Vec::new(),
601 Vec::new(),
602 );
603
604 let keypair = network_config.validator_configs()[0].protocol_key_pair();
605 let authority = keypair.public().into();
606 let signed = SignedCheckpointSummary::new(
607 epoch_store.epoch(),
608 checkpoint_summary,
609 keypair,
610 authority,
611 );
612 let message = CheckpointSignatureMessage { summary: signed };
613
614 let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
615 let bytes = bcs::to_bytes(&tx).unwrap();
616
617 let validator = SuiTxValidator::new(
618 state.clone(),
619 Arc::new(NoopConsensusOverloadChecker {}),
620 Arc::new(CheckpointServiceNoop {}),
621 SuiTxValidatorMetrics::new(&Default::default()),
622 );
623
624 let res = validator.verify_batch(&[&bytes]);
625 assert!(res.is_ok(), "{res:?}");
626 }
627}