sui_core/
consensus_validator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use consensus_core::{TransactionVerifier, ValidationError};
7use consensus_types::block::{BlockRef, TransactionIndex};
8use fastcrypto_tbls::dkg_v1;
9use mysten_metrics::monitored_scope;
10use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
11use sui_types::{
12    error::{SuiError, SuiErrorKind, SuiResult},
13    messages_consensus::{ConsensusPosition, ConsensusTransaction, ConsensusTransactionKind},
14    transaction::Transaction,
15};
16use tap::TapFallible;
17use tracing::{info, instrument, warn};
18
19use crate::{
20    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
21    checkpoints::CheckpointServiceNotify,
22    consensus_adapter::ConsensusOverloadChecker,
23};
24
25/// Allows verifying the validity of transactions
26#[derive(Clone)]
27pub struct SuiTxValidator {
28    authority_state: Arc<AuthorityState>,
29    consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
30    checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
31    metrics: Arc<SuiTxValidatorMetrics>,
32}
33
34impl SuiTxValidator {
35    pub fn new(
36        authority_state: Arc<AuthorityState>,
37        consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
38        checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
39        metrics: Arc<SuiTxValidatorMetrics>,
40    ) -> Self {
41        let epoch_store = authority_state.load_epoch_store_one_call_per_task().clone();
42        info!(
43            "SuiTxValidator constructed for epoch {}",
44            epoch_store.epoch()
45        );
46        Self {
47            authority_state,
48            consensus_overload_checker,
49            checkpoint_service,
50            metrics,
51        }
52    }
53
54    fn validate_transactions(&self, txs: &[ConsensusTransactionKind]) -> Result<(), SuiError> {
55        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
56
57        let mut cert_batch = Vec::new();
58        let mut ckpt_messages = Vec::new();
59        let mut ckpt_batch = Vec::new();
60        for tx in txs.iter() {
61            match tx {
62                ConsensusTransactionKind::CertifiedTransaction(certificate) => {
63                    cert_batch.push(certificate.as_ref());
64                }
65                ConsensusTransactionKind::CheckpointSignature(signature) => {
66                    ckpt_messages.push(signature.as_ref());
67                    ckpt_batch.push(&signature.summary);
68                }
69                ConsensusTransactionKind::CheckpointSignatureV2(signature) => {
70                    if !epoch_store
71                        .protocol_config()
72                        .consensus_checkpoint_signature_key_includes_digest()
73                    {
74                        return Err(SuiErrorKind::UnexpectedMessage(
75                            "ConsensusTransactionKind::CheckpointSignatureV2 is unsupported"
76                                .to_string(),
77                        )
78                        .into());
79                    }
80                    ckpt_messages.push(signature.as_ref());
81                    ckpt_batch.push(&signature.summary);
82                }
83                ConsensusTransactionKind::RandomnessDkgMessage(_, bytes) => {
84                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
85                        warn!("batch verification error: DKG Message too large");
86                        return Err(SuiErrorKind::InvalidDkgMessageSize.into());
87                    }
88                }
89                ConsensusTransactionKind::RandomnessDkgConfirmation(_, bytes) => {
90                    if bytes.len() > dkg_v1::DKG_MESSAGES_MAX_SIZE {
91                        warn!("batch verification error: DKG Confirmation too large");
92                        return Err(SuiErrorKind::InvalidDkgMessageSize.into());
93                    }
94                }
95
96                ConsensusTransactionKind::CapabilityNotification(_) => {}
97
98                ConsensusTransactionKind::EndOfPublish(_)
99                | ConsensusTransactionKind::NewJWKFetched(_, _, _)
100                | ConsensusTransactionKind::CapabilityNotificationV2(_)
101                | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {}
102
103                ConsensusTransactionKind::UserTransaction(_tx) => {
104                    if !epoch_store.protocol_config().mysticeti_fastpath() {
105                        return Err(SuiErrorKind::UnexpectedMessage(
106                            "ConsensusTransactionKind::UserTransaction is unsupported".to_string(),
107                        )
108                        .into());
109                    }
110                    // TODO(fastpath): move deterministic verifications of user transactions here,
111                    // for example validity_check() and verify_transaction().
112                }
113
114                ConsensusTransactionKind::ExecutionTimeObservation(obs) => {
115                    // TODO: Use a separate limit for this that may truncate shared observations.
116                    if obs.estimates.len()
117                        > epoch_store
118                            .protocol_config()
119                            .max_programmable_tx_commands()
120                            .try_into()
121                            .unwrap()
122                    {
123                        return Err(SuiErrorKind::UnexpectedMessage(format!(
124                            "ExecutionTimeObservation contains too many estimates: {}",
125                            obs.estimates.len()
126                        ))
127                        .into());
128                    }
129                }
130            }
131        }
132
133        // verify the certificate signatures as a batch
134        let cert_count = cert_batch.len();
135        let ckpt_count = ckpt_batch.len();
136
137        epoch_store
138            .signature_verifier
139            .verify_certs_and_checkpoints(cert_batch, ckpt_batch)
140            .tap_err(|e| warn!("batch verification error: {}", e))?;
141
142        // All checkpoint sigs have been verified, forward them to the checkpoint service
143        for ckpt in ckpt_messages {
144            self.checkpoint_service
145                .notify_checkpoint_signature(&epoch_store, ckpt)?;
146        }
147
148        self.metrics
149            .certificate_signatures_verified
150            .inc_by(cert_count as u64);
151        self.metrics
152            .checkpoint_signatures_verified
153            .inc_by(ckpt_count as u64);
154        Ok(())
155    }
156
157    #[instrument(level = "debug", skip_all, fields(block_ref = ?block_ref))]
158    fn vote_transactions(
159        &self,
160        block_ref: &BlockRef,
161        txs: Vec<ConsensusTransactionKind>,
162    ) -> Vec<TransactionIndex> {
163        let epoch_store = self.authority_state.load_epoch_store_one_call_per_task();
164        if !epoch_store.protocol_config().mysticeti_fastpath() {
165            return vec![];
166        }
167
168        let mut result = Vec::new();
169        for (i, tx) in txs.into_iter().enumerate() {
170            let ConsensusTransactionKind::UserTransaction(tx) = tx else {
171                continue;
172            };
173
174            if let Err(error) = self.vote_transaction(&epoch_store, tx) {
175                result.push(i as TransactionIndex);
176
177                // Cache the rejection vote reason (error) for the transaction
178                epoch_store.set_rejection_vote_reason(
179                    ConsensusPosition {
180                        epoch: epoch_store.epoch(),
181                        block: *block_ref,
182                        index: i as TransactionIndex,
183                    },
184                    &error,
185                );
186            }
187        }
188
189        result
190    }
191
192    #[instrument(level = "debug", skip_all, err(level = "debug"), fields(tx_digest = ?tx.digest()))]
193    fn vote_transaction(
194        &self,
195        epoch_store: &Arc<AuthorityPerEpochStore>,
196        tx: Box<Transaction>,
197    ) -> SuiResult<()> {
198        // Currently validity_check() and verify_transaction() are not required to be consistent across validators,
199        // so they do not run in validate_transactions(). They can run there once we confirm it is safe.
200        tx.validity_check(&epoch_store.tx_validity_check_context())?;
201
202        self.authority_state.check_system_overload(
203            &*self.consensus_overload_checker,
204            tx.data(),
205            self.authority_state.check_system_overload_at_signing(),
206        )?;
207
208        let tx = epoch_store.verify_transaction(*tx)?;
209
210        self.authority_state
211            .handle_vote_transaction(epoch_store, tx)?;
212
213        Ok(())
214    }
215}
216
217fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
218    bcs::from_bytes::<ConsensusTransaction>(tx)
219        .map_err(|e| {
220            ValidationError::InvalidTransaction(format!(
221                "Failed to parse transaction bytes: {:?}",
222                e
223            ))
224        })
225        .map(|tx| tx.kind)
226}
227
228impl TransactionVerifier for SuiTxValidator {
229    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> {
230        let _scope = monitored_scope("ValidateBatch");
231
232        let txs: Vec<_> = batch
233            .iter()
234            .map(|tx| tx_kind_from_bytes(tx))
235            .collect::<Result<Vec<_>, _>>()?;
236
237        self.validate_transactions(&txs)
238            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))
239    }
240
241    fn verify_and_vote_batch(
242        &self,
243        block_ref: &BlockRef,
244        batch: &[&[u8]],
245    ) -> Result<Vec<TransactionIndex>, ValidationError> {
246        let _scope = monitored_scope("VerifyAndVoteBatch");
247
248        let txs: Vec<_> = batch
249            .iter()
250            .map(|tx| tx_kind_from_bytes(tx))
251            .collect::<Result<Vec<_>, _>>()?;
252
253        self.validate_transactions(&txs)
254            .map_err(|e| ValidationError::InvalidTransaction(e.to_string()))?;
255
256        Ok(self.vote_transactions(block_ref, txs))
257    }
258}
259
260pub struct SuiTxValidatorMetrics {
261    certificate_signatures_verified: IntCounter,
262    checkpoint_signatures_verified: IntCounter,
263}
264
265impl SuiTxValidatorMetrics {
266    pub fn new(registry: &Registry) -> Arc<Self> {
267        Arc::new(Self {
268            certificate_signatures_verified: register_int_counter_with_registry!(
269                "certificate_signatures_verified",
270                "Number of certificates verified in consensus batch verifier",
271                registry
272            )
273            .unwrap(),
274            checkpoint_signatures_verified: register_int_counter_with_registry!(
275                "checkpoint_signatures_verified",
276                "Number of checkpoint verified in consensus batch verifier",
277                registry
278            )
279            .unwrap(),
280        })
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use std::num::NonZeroUsize;
287    use std::sync::Arc;
288
289    use consensus_core::TransactionVerifier as _;
290    use consensus_types::block::BlockRef;
291    use fastcrypto::traits::KeyPair;
292    use sui_config::transaction_deny_config::TransactionDenyConfigBuilder;
293    use sui_macros::sim_test;
294    use sui_protocol_config::{Chain, ProtocolConfig, ProtocolVersion};
295    use sui_types::crypto::deterministic_random_account_key;
296    use sui_types::error::{SuiErrorKind, UserInputError};
297    use sui_types::messages_checkpoint::{
298        CheckpointContents, CheckpointSignatureMessage, CheckpointSummary, SignedCheckpointSummary,
299    };
300    use sui_types::messages_consensus::ConsensusPosition;
301    use sui_types::{
302        base_types::{ExecutionDigests, ObjectID},
303        crypto::Ed25519SuiSignature,
304        messages_consensus::ConsensusTransaction,
305        object::Object,
306        signature::GenericSignature,
307    };
308
309    use crate::{
310        authority::test_authority_builder::TestAuthorityBuilder,
311        checkpoints::CheckpointServiceNoop,
312        consensus_adapter::{
313            NoopConsensusOverloadChecker,
314            consensus_tests::{test_certificates, test_gas_objects, test_user_transaction},
315        },
316        consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
317    };
318
319    #[sim_test]
320    async fn accept_valid_transaction() {
321        // Initialize an authority with a (owned) gas object and a shared object; then
322        // make a test certificate.
323        let mut objects = test_gas_objects();
324        let shared_object = Object::shared_for_testing();
325        objects.push(shared_object.clone());
326
327        let network_config =
328            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
329                .with_objects(objects.clone())
330                .build();
331
332        let state = TestAuthorityBuilder::new()
333            .with_network_config(&network_config, 0)
334            .build()
335            .await;
336        let name1 = state.name;
337        let certificates = test_certificates(&state, shared_object).await;
338
339        let first_transaction = certificates[0].clone();
340        let first_transaction_bytes: Vec<u8> = bcs::to_bytes(
341            &ConsensusTransaction::new_certificate_message(&name1, first_transaction),
342        )
343        .unwrap();
344
345        let metrics = SuiTxValidatorMetrics::new(&Default::default());
346        let validator = SuiTxValidator::new(
347            state.clone(),
348            Arc::new(NoopConsensusOverloadChecker {}),
349            Arc::new(CheckpointServiceNoop {}),
350            metrics,
351        );
352        let res = validator.verify_batch(&[&first_transaction_bytes]);
353        assert!(res.is_ok(), "{res:?}");
354
355        let transaction_bytes: Vec<_> = certificates
356            .clone()
357            .into_iter()
358            .map(|cert| {
359                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
360            })
361            .collect();
362
363        let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect();
364        let res_batch = validator.verify_batch(&batch);
365        assert!(res_batch.is_ok(), "{res_batch:?}");
366
367        let bogus_transaction_bytes: Vec<_> = certificates
368            .into_iter()
369            .map(|mut cert| {
370                // set it to an all-zero user signature
371                cert.tx_signatures_mut_for_testing()[0] =
372                    GenericSignature::Signature(sui_types::crypto::Signature::Ed25519SuiSignature(
373                        Ed25519SuiSignature::default(),
374                    ));
375                bcs::to_bytes(&ConsensusTransaction::new_certificate_message(&name1, cert)).unwrap()
376            })
377            .collect();
378
379        let batch: Vec<_> = bogus_transaction_bytes
380            .iter()
381            .map(|t| t.as_slice())
382            .collect();
383        let res_batch = validator.verify_batch(&batch);
384        assert!(res_batch.is_err());
385    }
386
387    #[tokio::test]
388    async fn test_verify_and_vote_batch() {
389        // 1 account keypair
390        let (sender, keypair) = deterministic_random_account_key();
391
392        // 8 gas objects.
393        let gas_objects: Vec<Object> = (0..8)
394            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
395            .collect();
396
397        // 2 owned objects.
398        let owned_objects: Vec<Object> = (0..2)
399            .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender))
400            .collect();
401        let denied_object = owned_objects[1].clone();
402
403        let mut objects = gas_objects.clone();
404        objects.extend(owned_objects.clone());
405
406        let network_config =
407            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir()
408                .committee_size(NonZeroUsize::new(1).unwrap())
409                .with_objects(objects.clone())
410                .build();
411
412        // Add the 2nd object in the deny list. Once we try to process/vote on the transaction that depends on this object, it will be rejected.
413        let transaction_deny_config = TransactionDenyConfigBuilder::new()
414            .add_denied_object(denied_object.id())
415            .build();
416        let state = TestAuthorityBuilder::new()
417            .with_network_config(&network_config, 0)
418            .with_transaction_deny_config(transaction_deny_config)
419            .build()
420            .await;
421
422        // Create two user transactions
423
424        // A valid transaction
425        let valid_transaction = test_user_transaction(
426            &state,
427            sender,
428            &keypair,
429            gas_objects[0].clone(),
430            vec![owned_objects[0].clone()],
431        )
432        .await;
433
434        // An invalid transaction where the input object is denied
435        let invalid_transaction = test_user_transaction(
436            &state,
437            sender,
438            &keypair,
439            gas_objects[1].clone(),
440            vec![denied_object.clone()],
441        )
442        .await;
443
444        // Now create the vector with the transactions and serialize them.
445        let transactions = vec![valid_transaction, invalid_transaction];
446        let serialized_transactions: Vec<_> = transactions
447            .into_iter()
448            .map(|t| {
449                bcs::to_bytes(&ConsensusTransaction::new_user_transaction_message(
450                    &state.name,
451                    t.inner().clone(),
452                ))
453                .unwrap()
454            })
455            .collect();
456        let batch: Vec<_> = serialized_transactions
457            .iter()
458            .map(|t| t.as_slice())
459            .collect();
460
461        let validator = SuiTxValidator::new(
462            state.clone(),
463            Arc::new(NoopConsensusOverloadChecker {}),
464            Arc::new(CheckpointServiceNoop {}),
465            SuiTxValidatorMetrics::new(&Default::default()),
466        );
467
468        // WHEN
469        let rejected_transactions = validator
470            .verify_and_vote_batch(&BlockRef::MAX, &batch)
471            .unwrap();
472
473        // THEN
474        // The 2nd transaction should be rejected
475        assert_eq!(rejected_transactions, vec![1]);
476
477        // AND
478        // The reject reason should get cached
479        let epoch_store = state.load_epoch_store_one_call_per_task();
480        let reason = epoch_store
481            .get_rejection_vote_reason(ConsensusPosition {
482                epoch: state.load_epoch_store_one_call_per_task().epoch(),
483                block: BlockRef::MAX,
484                index: 1,
485            })
486            .expect("Rejection vote reason should be set");
487
488        assert_eq!(
489            reason,
490            SuiErrorKind::UserInputError {
491                error: UserInputError::TransactionDenied {
492                    error: format!(
493                        "Access to input object {:?} is temporarily disabled",
494                        denied_object.id()
495                    )
496                }
497            }
498        );
499    }
500
501    #[sim_test]
502    async fn reject_checkpoint_signature_v2_when_flag_disabled() {
503        // Build a single-validator network and authority with protocol version < 93 (flag disabled)
504        let network_config =
505            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
506
507        let disabled_cfg =
508            ProtocolConfig::get_for_version(ProtocolVersion::new(92), Chain::Unknown);
509        let state = TestAuthorityBuilder::new()
510            .with_network_config(&network_config, 0)
511            .with_protocol_config(disabled_cfg)
512            .build()
513            .await;
514
515        let epoch_store = state.load_epoch_store_one_call_per_task();
516
517        // Create a minimal checkpoint summary and sign it with the validator's protocol key
518        let checkpoint_summary = CheckpointSummary::new(
519            &ProtocolConfig::get_for_max_version_UNSAFE(),
520            epoch_store.epoch(),
521            0,
522            0,
523            &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
524            None,
525            Default::default(),
526            None,
527            0,
528            Vec::new(),
529            Vec::new(),
530        );
531
532        let keypair = network_config.validator_configs()[0].protocol_key_pair();
533        let authority = keypair.public().into();
534        let signed = SignedCheckpointSummary::new(
535            epoch_store.epoch(),
536            checkpoint_summary,
537            keypair,
538            authority,
539        );
540        let message = CheckpointSignatureMessage { summary: signed };
541
542        let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
543        let bytes = bcs::to_bytes(&tx).unwrap();
544
545        let validator = SuiTxValidator::new(
546            state.clone(),
547            Arc::new(NoopConsensusOverloadChecker {}),
548            Arc::new(CheckpointServiceNoop {}),
549            SuiTxValidatorMetrics::new(&Default::default()),
550        );
551
552        let res = validator.verify_batch(&[&bytes]);
553        assert!(res.is_err());
554    }
555
556    #[sim_test]
557    async fn accept_checkpoint_signature_v2_when_flag_enabled() {
558        // Build a single-validator network and authority with protocol version >= 93 (flag enabled)
559        let network_config =
560            sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build();
561
562        let enabled_cfg = ProtocolConfig::get_for_version(ProtocolVersion::new(93), Chain::Unknown);
563        let state = TestAuthorityBuilder::new()
564            .with_network_config(&network_config, 0)
565            .with_protocol_config(enabled_cfg)
566            .build()
567            .await;
568
569        let epoch_store = state.load_epoch_store_one_call_per_task();
570
571        // Create a minimal checkpoint summary and sign it with the validator's protocol key
572        let checkpoint_summary = CheckpointSummary::new(
573            &ProtocolConfig::get_for_max_version_UNSAFE(),
574            epoch_store.epoch(),
575            0,
576            0,
577            &CheckpointContents::new_with_digests_only_for_tests([ExecutionDigests::random()]),
578            None,
579            Default::default(),
580            None,
581            0,
582            Vec::new(),
583            Vec::new(),
584        );
585
586        let keypair = network_config.validator_configs()[0].protocol_key_pair();
587        let authority = keypair.public().into();
588        let signed = SignedCheckpointSummary::new(
589            epoch_store.epoch(),
590            checkpoint_summary,
591            keypair,
592            authority,
593        );
594        let message = CheckpointSignatureMessage { summary: signed };
595
596        let tx = ConsensusTransaction::new_checkpoint_signature_message_v2(message);
597        let bytes = bcs::to_bytes(&tx).unwrap();
598
599        let validator = SuiTxValidator::new(
600            state.clone(),
601            Arc::new(NoopConsensusOverloadChecker {}),
602            Arc::new(CheckpointServiceNoop {}),
603            SuiTxValidatorMetrics::new(&Default::default()),
604        );
605
606        let res = validator.verify_batch(&[&bytes]);
607        assert!(res.is_ok(), "{res:?}");
608    }
609}