sui_core/
consensus_validator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_core::{TransactionVerifier, ValidationError};
7use consensus_types::block::{BlockRef, TransactionIndex};
8use fastcrypto_tbls::dkg_v1;
9use mysten_metrics::monitored_scope;
10use prometheus::{
11    IntCounter, IntCounterVec, Registry, register_int_counter_vec_with_registry,
12    register_int_counter_with_registry,
13};
14use sui_types::{
15    error::{SuiError, SuiErrorKind, SuiResult},
16    messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
17    transaction::Transaction,
18};
19use tap::TapFallible;
20use tracing::{debug, info, instrument, warn};
21
22use crate::{
23    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
24    checkpoints::CheckpointServiceNotify,
25    consensus_adapter::ConsensusOverloadChecker,
26};
27
28/// Allows verifying the validity of transactions
29#[derive(Clone)]
30pub struct SuiTxValidator {
31    authority_state: Arc<AuthorityState>,
32    consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
33    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
34    metrics: Arc<SuiTxValidatorMetrics>,
35}
36
37impl SuiTxValidator {
38    pub fn new(
39        authority_state: Arc<AuthorityState>,
40        consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
41        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
42        metrics: Arc<SuiTxValidatorMetrics>,
43    ) -> Self {
44        let epoch_store = authority_state.load_epoch_store_one_call_per_task().clone();
45        info!(
46            "SuiTxValidator constructed for epoch {}",
47            epoch_store.epoch()
48        );
49        Self {
50            authority_state,
51            consensus_overload_checker,
52            checkpoint_service,
53            metrics,
54        }
55    }
56
57    fn validate_transactions(&self, txs: &[ConsensusTransactionKind]) -> Result<(), SuiError> {
58        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
59
60        let mut cert_batch = Vec::new();
61        let mut ckpt_messages = Vec::new();
62        let mut ckpt_batch = Vec::new();
63        for tx in txs.iter() {
64            match tx {
65                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
66                    cert_batch.push(certificate.as_ref());
67                }
68                ConsensusTransactionKind::CheckpointSignature(signature) => {
69                    ckpt_messages.push(signature.as_ref());
70                    ckpt_batch.push(&signature.summary);
71                }
72                ConsensusTransactionKind::CheckpointSignatureV2(signature) => {
73                    if !epoch_store
74                        .protocol_config()
75                        .consensus_checkpoint_signature_key_includes_digest()
76                    {
77                        return Err(SuiErrorKind::UnexpectedMessage(
78                            "ConsensusTransactionKind::CheckpointSignatureV2 is unsupported"
79                                .to_string(),
80                        )
81                        .into());
82                    }
83                    ckpt_messages.push(signature.as_ref());
84                    ckpt_batch.push(&signature.summary);
85                }
86                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
87                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
88                        warn!("batch verification error: DKG Message too large");
89                        return Err(SuiErrorKind::InvalidDkgMessageSize.into());
90                    }
91                }
92                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
93                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
94                        warn!("batch verification error: DKG Confirmation too large");
95                        return Err(SuiErrorKind::InvalidDkgMessageSize.into());
96                    }
97                }
98
99                ConsensusTransactionKind::CapabilityNotification(_) => {}
100
101                ConsensusTransactionKind::EndOfPublish(_)
102                | ConsensusTransactionKind::NewJWKFetched(_, _, _)
103                | ConsensusTransactionKind::CapabilityNotificationV2(_)
104                | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {}
105
106                ConsensusTransactionKind::UserTransaction(_tx) => {
107                    if !epoch_store.protocol_config().mysticeti_fastpath() {
108                        return Err(SuiErrorKind::UnexpectedMessage(
109                            "ConsensusTransactionKind::UserTransaction is unsupported".to_string(),
110                        )
111                        .into());
112                    }
113                    // TODO(fastpath): move deterministic verifications of user transactions here,
114                    // for example validity_check() and verify_transaction().
115                }
116
117                ConsensusTransactionKind::ExecutionTimeObservation(obs) => {
118                    // TODO: Use a separate limit for this that may truncate shared observations.
119                    if obs.estimates.len()
120                        > epoch_store
121                            .protocol_config()
122                            .max_programmable_tx_commands()
123                            .try_into()
124                            .unwrap()
125                    {
126                        return Err(SuiErrorKind::UnexpectedMessage(format!(
127                            "ExecutionTimeObservation contains too many estimates: {}",
128                            obs.estimates.len()
129                        ))
130                        .into());
131                    }
132                }
133            }
134        }
135
136        // verify the certificate signatures as a batch
137        let cert_count = cert_batch.len();
138        let ckpt_count = ckpt_batch.len();
139
140        epoch_store
141            .signature_verifier
142            .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
143            .tap_err(|e| warn!("batch verification error: {}", e))?;
144
145        // All checkpoint sigs have been verified, forward them to the checkpoint service
146        for ckpt in ckpt_messages {
147            self.checkpoint_service
148                .notify_checkpoint_signature(&epoch_store, ckpt)?;
149        }
150
151        self.metrics
152            .certificate_signatures_verified
153            .inc_by(cert_count as u64);
154        self.metrics
155            .checkpoint_signatures_verified
156            .inc_by(ckpt_count as u64);
157        Ok(())
158    }
159
160    #[instrument(level = "debug", skip_all, fields(block_ref))]
161    fn vote_transactions(
162        &self,
163        block_ref: &BlockRef,
164        txs: Vec<ConsensusTransactionKind>,
165    ) -> Vec<TransactionIndex> {
166        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
167        if !epoch_store.protocol_config().mysticeti_fastpath() {
168            return vec![];
169        }
170
171        let mut result = Vec::new();
172        for (i, tx) in txs.into_iter().enumerate() {
173            let ConsensusTransactionKind::UserTransaction(tx) = tx else {
174                continue;
175            };
176
177            let tx_digest = *tx.digest();
178            if let Err(error) = self.vote_transaction(&epoch_store, tx) {
179                debug!(?tx_digest, "Voting to reject transaction: {error}");
180                self.metrics
181                    .transaction_reject_votes
182                    .with_label_values(&[error.to_variant_name()])
183                    .inc();
184                result.push(i as TransactionIndex);
185                // Cache the rejection vote reason (error) for the transaction
186                epoch_store.set_rejection_vote_reason(
187                    ConsensusPosition {
188                        epoch: epoch_store.epoch(),
189                        block: *block_ref,
190                        index: i as TransactionIndex,
191                    },
192                    &error,
193                );
194            } else {
195                debug!(?tx_digest, "Voting to accept transaction");
196            }
197        }
198
199        result
200    }
201
202    #[instrument(level = "debug", skip_all, err(level = "debug"), fields(tx_digest = ?tx.digest()))]
203    fn vote_transaction(
204        &self,
205        epoch_store: &Arc<AuthorityPerEpochStore>,
206        tx: Box<Transaction>,
207    ) -> SuiResult<()> {
208        // Currently validity_check() and verify_transaction() are not required to be consistent across validators,
209        // so they do not run in validate_transactions(). They can run there once we confirm it is safe.
210        tx.validity_check(&epoch_store.tx_validity_check_context())?;
211
212        self.authority_state.check_system_overload(
213            &*self.consensus_overload_checker,
214            tx.data(),
215            self.authority_state.check_system_overload_at_signing(),
216        )?;
217
218        let tx = epoch_store.verify_transaction(*tx)?;
219
220        self.authority_state
221            .handle_vote_transaction(epoch_store, tx)?;
222
223        Ok(())
224    }
225}
226
227fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
228    bcs::from_bytes::<ConsensusTransaction>(tx)
229        .map_err(|e| {
230            ValidationError::InvalidTransaction(format!(
231                "Failed to parse transaction bytes: {:?}",
232                e
233            ))
234        })
235        .map(|tx| tx.kind)
236}
237
238impl TransactionVerifier for SuiTxValidator {
239    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
240        let _scope = monitored_scope("ValidateBatch");
241
242        let txs: Vec<_> = batch
243            .iter()
244            .map(|tx| tx_kind_from_bytes(tx))
245            .collect::<Result<Vec<_>, _>>()?;
246
247        self.validate_transactions(&txs)
248            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
249    }
250
251    fn verify_and_vote_batch(
252        &self,
253        block_ref: &BlockRef,
254        batch: &[&[u8]],
255    ) -> Result<Vec<TransactionIndex>, ValidationError> {
256        let _scope = monitored_scope("VerifyAndVoteBatch");
257
258        let txs: Vec<_> = batch
259            .iter()
260            .map(|tx| tx_kind_from_bytes(tx))
261            .collect::<Result<Vec<_>, _>>()?;
262
263        self.validate_transactions(&txs)
264            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))?;
265
266        Ok(self.vote_transactions(block_ref, txs))
267    }
268}
269
270pub struct SuiTxValidatorMetrics {
271    certificate_signatures_verified: IntCounter,
272    checkpoint_signatures_verified: IntCounter,
273    transaction_reject_votes: IntCounterVec,
274}
275
276impl SuiTxValidatorMetrics {
277    pub fn new(registry: &Registry) -> Arc<Self> {
278        Arc::new(Self {
279            certificate_signatures_verified: register_int_counter_with_registry!(
280                "tx_validator_certificate_signatures_verified",
281                "Number of certificates verified in consensus batch verifier",
282                registry
283            )
284            .unwrap(),
285            checkpoint_signatures_verified: register_int_counter_with_registry!(
286                "tx_validator_checkpoint_signatures_verified",
287                "Number of checkpoint verified in consensus batch verifier",
288                registry
289            )
290            .unwrap(),
291            transaction_reject_votes: register_int_counter_vec_with_registry!(
292                "tx_validator_transaction_reject_votes",
293                "Number of reject transaction votes per reason",
294                &["reason"],
295                registry
296            )
297            .unwrap(),
298        })
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use std::num::NonZeroUsize;
305    use std::sync::Arc;
306
307    use consensus_core::TransactionVerifier as _;
308    use consensus_types::block::BlockRef;
309    use fastcrypto::traits::KeyPair;
310    use sui_config::transaction_deny_config::TransactionDenyConfigBuilder;
311    use sui_macros::sim_test;
312    use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
313    use sui_types::crypto::deterministic_random_account_key;
314    use sui_types::error::{SuiErrorKind, UserInputError};
315    use sui_types::messages_checkpoint::{
316        CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary,
317    };
318    use sui_types::messages_consensus::ConsensusPosition;
319    use sui_types::{
320        base_types::{ExecutionDigests, ObjectID},
321        crypto::Ed25519SuiSignature,
322        messages_consensus::ConsensusTransaction,
323        object::Object,
324        signature::GenericSignature,
325    };
326
327    use crate::{
328        authority::test_authority_builder::TestAuthorityBuilder,
329        checkpoints::CheckpointServiceNoop,
330        consensus_adapter::{
331            NoopConsensusOverloadChecker,
332            consensus_tests::{test_certificates, test_gas_objects, test_user_transaction},
333        },
334        consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
335    };
336
337    #[sim_test]
338    async fn accept_valid_transaction() {
339        // Initialize an authority with a (owned) gas object and a shared object; then
340        // make a test certificate.
341        let mut objects = test_gas_objects();
342        let shared_object = Object::shared_for_testing();
343        objects.push(shared_object.clone());
344
345        let network_config =
346            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
347                .with_objects(objects.clone())
348                .build();
349
350        let state = TestAuthorityBuilder::new()
351            .with_network_config(&network_config, 0)
352            .build()
353            .await;
354        let name1 = state.name;
355        let certificates = test_certificates(&state, shared_object).await;
356
357        let first_transaction = certificates[0].clone();
358        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
359            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
360        )
361        .unwrap();
362
363        let metrics = SuiTxValidatorMetrics::new(&Default::default());
364        let validator = SuiTxValidator::new(
365            state.clone(),
366            Arc::new(NoopConsensusOverloadChecker {}),
367            Arc::new(CheckpointServiceNoop {}),
368            metrics,
369        );
370        let res = validator.verify_batch(&[&first_transaction_bytes]);
371        assert!(res.is_ok(), "{res:?}");
372
373        let transaction_bytes: Vec<_> = certificates
374            .clone()
375            .into_iter()
376            .map(|cert| {
377                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
378            })
379            .collect();
380
381        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
382        let res_batch = validator.verify_batch(&batch);
383        assert!(res_batch.is_ok(), "{res_batch:?}");
384
385        let bogus_transaction_bytes: Vec<_> = certificates
386            .into_iter()
387            .map(|mut cert| {
388                // set it to an all-zero user signature
389                cert.tx_signatures_mut_for_testing()[0] =
390                    GenericSignature::Signature(sui_types::crypto::Signature::Ed25519SuiSignature(
391                        Ed25519SuiSignature::default(),
392                    ));
393                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
394            })
395            .collect();
396
397        let batch: Vec<_> = bogus_transaction_bytes
398            .iter()
399            .map(|t| t.as_slice())
400            .collect();
401        let res_batch = validator.verify_batch(&batch);
402        assert!(res_batch.is_err());
403    }
404
405    #[tokio::test]
406    async fn test_verify_and_vote_batch() {
407        // 1 account keypair
408        let (sender, keypair) = deterministic_random_account_key();
409
410        // 8 gas objects.
411        let gas_objects: Vec<Object> = (0..8)
412            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
413            .collect();
414
415        // 2 owned objects.
416        let owned_objects: Vec<Object> = (0..2)
417            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
418            .collect();
419        let denied_object = owned_objects[1].clone();
420
421        let mut objects = gas_objects.clone();
422        objects.extend(owned_objects.clone());
423
424        let network_config =
425            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
426                .committee_size(NonZeroUsize::new(1).unwrap())
427                .with_objects(objects.clone())
428                .build();
429
430        // Add the 2nd object in the deny list. Once we try to process/vote on the transaction that depends on this object, it will be rejected.
431        let transaction_deny_config = TransactionDenyConfigBuilder::new()
432            .add_denied_object(denied_object.id())
433            .build();
434        let state = TestAuthorityBuilder::new()
435            .with_network_config(&network_config, 0)
436            .with_transaction_deny_config(transaction_deny_config)
437            .build()
438            .await;
439
440        // Create two user transactions
441
442        // A valid transaction
443        let valid_transaction = test_user_transaction(
444            &state,
445            sender,
446            &keypair,
447            gas_objects[0].clone(),
448            vec![owned_objects[0].clone()],
449        )
450        .await;
451
452        // An invalid transaction where the input object is denied
453        let invalid_transaction = test_user_transaction(
454            &state,
455            sender,
456            &keypair,
457            gas_objects[1].clone(),
458            vec![denied_object.clone()],
459        )
460        .await;
461
462        // Now create the vector with the transactions and serialize them.
463        let transactions = vec![valid_transaction, invalid_transaction];
464        let serialized_transactions: Vec<_> = transactions
465            .into_iter()
466            .map(|t| {
467                bcs::to_bytes(&ConsensusTransaction::new_user_transaction_message(
468                    &state.name,
469                    t.inner().clone(),
470                ))
471                .unwrap()
472            })
473            .collect();
474        let batch: Vec<_> = serialized_transactions
475            .iter()
476            .map(|t| t.as_slice())
477            .collect();
478
479        let validator = SuiTxValidator::new(
480            state.clone(),
481            Arc::new(NoopConsensusOverloadChecker {}),
482            Arc::new(CheckpointServiceNoop {}),
483            SuiTxValidatorMetrics::new(&Default::default()),
484        );
485
486        // WHEN
487        let rejected_transactions = validator
488            .verify_and_vote_batch(&BlockRef::MAX, &batch)
489            .unwrap();
490
491        // THEN
492        // The 2nd transaction should be rejected
493        assert_eq!(rejected_transactions, vec![1]);
494
495        // AND
496        // The reject reason should get cached
497        let epoch_store = state.load_epoch_store_one_call_per_task();
498        let reason = epoch_store
499            .get_rejection_vote_reason(ConsensusPosition {
500                epoch: state.load_epoch_store_one_call_per_task().epoch(),
501                block: BlockRef::MAX,
502                index: 1,
503            })
504            .expect("Rejection vote reason should be set");
505
506        assert_eq!(
507            reason,
508            SuiErrorKind::UserInputError {
509                error: UserInputError::TransactionDenied {
510                    error: format!(
511                        "Access to input object {:?} is temporarily disabled",
512                        denied_object.id()
513                    )
514                }
515            }
516        );
517    }
518
519    #[sim_test]
520    async fn reject_checkpoint_signature_v2_when_flag_disabled() {
521        // Build a single-validator network and authority with protocol version < 93 (flag disabled)
522        let network_config =
523            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
524
525        let disabled_cfg =
526            ProtocolConfig::get_for_version(ProtocolVersion::new(92), Chain::Unknown);
527        let state = TestAuthorityBuilder::new()
528            .with_network_config(&network_config, 0)
529            .with_protocol_config(disabled_cfg)
530            .build()
531            .await;
532
533        let epoch_store = state.load_epoch_store_one_call_per_task();
534
535        // Create a minimal checkpoint summary and sign it with the validator's protocol key
536        let checkpoint_summary = CheckpointSummary::new(
537            &ProtocolConfig::get_for_max_version_UNSAFE(),
538            epoch_store.epoch(),
539            0,
540            0,
541            &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
542            None,
543            Default::default(),
544            None,
545            0,
546            Vec::new(),
547            Vec::new(),
548        );
549
550        let keypair = network_config.validator_configs()[0].protocol_key_pair();
551        let authority = keypair.public().into();
552        let signed = SignedCheckpointSummary::new(
553            epoch_store.epoch(),
554            checkpoint_summary,
555            keypair,
556            authority,
557        );
558        let message = CheckpointSignatureMessage { summary: signed };
559
560        let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
561        let bytes = bcs::to_bytes(&tx).unwrap();
562
563        let validator = SuiTxValidator::new(
564            state.clone(),
565            Arc::new(NoopConsensusOverloadChecker {}),
566            Arc::new(CheckpointServiceNoop {}),
567            SuiTxValidatorMetrics::new(&Default::default()),
568        );
569
570        let res = validator.verify_batch(&[&bytes]);
571        assert!(res.is_err());
572    }
573
574    #[sim_test]
575    async fn accept_checkpoint_signature_v2_when_flag_enabled() {
576        // Build a single-validator network and authority with protocol version >= 93 (flag enabled)
577        let network_config =
578            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
579
580        let enabled_cfg = ProtocolConfig::get_for_version(ProtocolVersion::new(93), Chain::Unknown);
581        let state = TestAuthorityBuilder::new()
582            .with_network_config(&network_config, 0)
583            .with_protocol_config(enabled_cfg)
584            .build()
585            .await;
586
587        let epoch_store = state.load_epoch_store_one_call_per_task();
588
589        // Create a minimal checkpoint summary and sign it with the validator's protocol key
590        let checkpoint_summary = CheckpointSummary::new(
591            &ProtocolConfig::get_for_max_version_UNSAFE(),
592            epoch_store.epoch(),
593            0,
594            0,
595            &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
596            None,
597            Default::default(),
598            None,
599            0,
600            Vec::new(),
601            Vec::new(),
602        );
603
604        let keypair = network_config.validator_configs()[0].protocol_key_pair();
605        let authority = keypair.public().into();
606        let signed = SignedCheckpointSummary::new(
607            epoch_store.epoch(),
608            checkpoint_summary,
609            keypair,
610            authority,
611        );
612        let message = CheckpointSignatureMessage { summary: signed };
613
614        let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
615        let bytes = bcs::to_bytes(&tx).unwrap();
616
617        let validator = SuiTxValidator::new(
618            state.clone(),
619            Arc::new(NoopConsensusOverloadChecker {}),
620            Arc::new(CheckpointServiceNoop {}),
621            SuiTxValidatorMetrics::new(&Default::default()),
622        );
623
624        let res = validator.verify_batch(&[&bytes]);
625        assert!(res.is_ok(), "{res:?}");
626    }
627}