1use either::Either;
5use fastcrypto_zkp::bn254::zk_login::JwkId;
6use fastcrypto_zkp::bn254::zk_login::{JWK, OIDCProvider};
7use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
8use futures::pin_mut;
9use im::hashmap::HashMap as ImHashMap;
10use itertools::{Itertools as _, izip};
11use mysten_common::debug_fatal;
12use mysten_metrics::monitored_scope;
13use nonempty::NonEmpty;
14use parking_lot::{Mutex, MutexGuard, RwLock};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use shared_crypto::intent::Intent;
17use std::sync::Arc;
18use sui_types::address_alias;
19use sui_types::base_types::{SequenceNumber, SuiAddress};
20use sui_types::digests::SenderSignedDataDigest;
21use sui_types::digests::ZKLoginInputsDigest;
22use sui_types::signature_verification::{
23 VerifiedDigestCache, verify_sender_signed_data_message_signatures,
24};
25use sui_types::storage::ObjectStore;
26use sui_types::transaction::{SenderSignedData, TransactionDataAPI};
27use sui_types::{
28 committee::Committee,
29 crypto::{AuthoritySignInfoTrait, VerificationObligation},
30 digests::CertificateDigest,
31 error::{SuiErrorKind, SuiResult},
32 message_envelope::Message,
33 messages_checkpoint::SignedCheckpointSummary,
34 signature::VerifyParams,
35 transaction::{CertifiedTransaction, VerifiedCertificate},
36};
37use tap::TapFallible;
38use tokio::runtime::Handle;
39use tokio::{
40 sync::oneshot,
41 time::{Duration, timeout},
42};
43use tracing::debug;
44
45const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
47
48const MAX_BATCH_SIZE: usize = 8;
53
54type Sender = oneshot::Sender<SuiResult<VerifiedCertificate>>;
55
56struct CertBuffer {
57 certs: Vec<CertifiedTransaction>,
58 senders: Vec<Sender>,
59 id: u64,
60}
61
62impl CertBuffer {
63 fn new(capacity: usize) -> Self {
64 Self {
65 certs: Vec::with_capacity(capacity),
66 senders: Vec::with_capacity(capacity),
67 id: 0,
68 }
69 }
70
71 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
73 let this = &mut *guard;
74 let mut new = CertBuffer::new(this.capacity());
75 new.id = this.id + 1;
76 std::mem::swap(&mut new, this);
77 new
78 }
79
80 fn capacity(&self) -> usize {
81 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
82 self.certs.capacity()
83 }
84
85 fn len(&self) -> usize {
86 debug_assert_eq!(self.certs.len(), self.senders.len());
87 self.certs.len()
88 }
89
90 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
91 self.senders.push(tx);
92 self.certs.push(cert);
93 }
94}
95
96pub struct SignatureVerifier {
100 committee: Arc<Committee>,
101 object_store: Arc<dyn ObjectStore + Send + Sync>,
102 certificate_cache: VerifiedDigestCache<CertificateDigest>,
103 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest, Vec<u8>>,
104 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
105
106 jwks: RwLock<ImHashMap<JwkId, JWK>>,
112
113 zk_login_params: ZkLoginParams,
115
116 enable_address_aliases: bool,
118
119 queue: Mutex<CertBuffer>,
120 pub metrics: Arc<SignatureVerifierMetrics>,
121}
122
123#[derive(Clone)]
125struct ZkLoginParams {
126 pub supported_providers: Vec<OIDCProvider>,
128 pub env: ZkLoginEnv,
130 pub verify_legacy_zklogin_address: bool,
132 pub accept_zklogin_in_multisig: bool,
134 pub accept_passkey_in_multisig: bool,
136 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
138 pub additional_multisig_checks: bool,
140 pub validate_zklogin_public_identifier: bool,
142}
143
144impl SignatureVerifier {
145 pub fn new_with_batch_size(
146 committee: Arc<Committee>,
147 object_store: Arc<dyn ObjectStore + Send + Sync>,
148 batch_size: usize,
149 metrics: Arc<SignatureVerifierMetrics>,
150 supported_providers: Vec<OIDCProvider>,
151 env: ZkLoginEnv,
152 verify_legacy_zklogin_address: bool,
153 accept_zklogin_in_multisig: bool,
154 accept_passkey_in_multisig: bool,
155 zklogin_max_epoch_upper_bound_delta: Option<u64>,
156 additional_multisig_checks: bool,
157 validate_zklogin_public_identifier: bool,
158 enable_address_aliases: bool,
159 ) -> Self {
160 Self {
161 committee,
162 object_store,
163 certificate_cache: VerifiedDigestCache::new(
164 metrics.certificate_signatures_cache_hits.clone(),
165 metrics.certificate_signatures_cache_misses.clone(),
166 metrics.certificate_signatures_cache_evictions.clone(),
167 ),
168 signed_data_cache: VerifiedDigestCache::new(
169 metrics.signed_data_cache_hits.clone(),
170 metrics.signed_data_cache_misses.clone(),
171 metrics.signed_data_cache_evictions.clone(),
172 ),
173 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
174 metrics.zklogin_inputs_cache_hits.clone(),
175 metrics.zklogin_inputs_cache_misses.clone(),
176 metrics.zklogin_inputs_cache_evictions.clone(),
177 )),
178 jwks: Default::default(),
179 enable_address_aliases,
180 queue: Mutex::new(CertBuffer::new(batch_size)),
181 metrics,
182 zk_login_params: ZkLoginParams {
183 supported_providers,
184 env,
185 verify_legacy_zklogin_address,
186 accept_zklogin_in_multisig,
187 accept_passkey_in_multisig,
188 zklogin_max_epoch_upper_bound_delta,
189 additional_multisig_checks,
190 validate_zklogin_public_identifier,
191 },
192 }
193 }
194
195 pub fn new(
196 committee: Arc<Committee>,
197 object_store: Arc<dyn ObjectStore + Send + Sync>,
198 metrics: Arc<SignatureVerifierMetrics>,
199 supported_providers: Vec<OIDCProvider>,
200 zklogin_env: ZkLoginEnv,
201 verify_legacy_zklogin_address: bool,
202 accept_zklogin_in_multisig: bool,
203 accept_passkey_in_multisig: bool,
204 zklogin_max_epoch_upper_bound_delta: Option<u64>,
205 additional_multisig_checks: bool,
206 validate_zklogin_public_identifier: bool,
207 enable_address_aliases: bool,
208 ) -> Self {
209 Self::new_with_batch_size(
210 committee,
211 object_store,
212 MAX_BATCH_SIZE,
213 metrics,
214 supported_providers,
215 zklogin_env,
216 verify_legacy_zklogin_address,
217 accept_zklogin_in_multisig,
218 accept_passkey_in_multisig,
219 zklogin_max_epoch_upper_bound_delta,
220 additional_multisig_checks,
221 validate_zklogin_public_identifier,
222 enable_address_aliases,
223 )
224 }
225
226 pub fn verify_certs_and_checkpoints(
228 &self,
229 certs: Vec<&CertifiedTransaction>,
230 checkpoints: Vec<&SignedCheckpointSummary>,
231 ) -> SuiResult {
232 let certs: Vec<_> = certs
233 .into_iter()
234 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
235 .collect();
236
237 for cert in &certs {
241 self.verify_tx_require_no_aliases(cert.data())?;
242 }
243 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
244 self.certificate_cache
245 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
246 Ok(())
247 }
248
249 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> SuiResult<VerifiedCertificate> {
251 let cert_digest = cert.certificate_digest();
252 if self.certificate_cache.is_cached(&cert_digest) {
253 return Ok(VerifiedCertificate::new_unchecked(cert));
254 }
255 self.verify_tx_require_no_aliases(cert.data())?;
257 self.verify_cert_skip_cache(cert)
258 .await
259 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
260 }
261
262 pub async fn multi_verify_certs(
263 &self,
264 certs: Vec<CertifiedTransaction>,
265 ) -> Vec<SuiResult<VerifiedCertificate>> {
266 let mut futures = Vec::with_capacity(certs.len());
269 for cert in certs {
270 futures.push(self.verify_cert(cert));
271 }
272 futures::future::join_all(futures).await
273 }
274
275 pub async fn verify_cert_skip_cache(
277 &self,
278 cert: CertifiedTransaction,
279 ) -> SuiResult<VerifiedCertificate> {
280 if cert.auth_sig().epoch != self.committee.epoch() {
283 return Err(SuiErrorKind::WrongEpoch {
284 expected_epoch: self.committee.epoch(),
285 actual_epoch: cert.auth_sig().epoch,
286 }
287 .into());
288 }
289
290 self.verify_cert_inner(cert).await
291 }
292
293 async fn verify_cert_inner(
294 &self,
295 cert: CertifiedTransaction,
296 ) -> SuiResult<VerifiedCertificate> {
297 let (tx, rx) = oneshot::channel();
301 pin_mut!(rx);
302
303 let prev_id_or_buffer = {
304 let mut queue = self.queue.lock();
305 queue.push(tx, cert);
306 if queue.len() == queue.capacity() {
307 Either::Right(CertBuffer::take_and_replace(queue))
308 } else {
309 Either::Left(queue.id)
310 }
311 };
312 let prev_id = match prev_id_or_buffer {
313 Either::Left(prev_id) => prev_id,
314 Either::Right(buffer) => {
315 self.metrics.full_batches.inc();
316 self.process_queue(buffer).await;
317 return rx.try_recv().unwrap();
319 }
320 };
321
322 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
323 return res.unwrap();
325 }
326 self.metrics.timeouts.inc();
327
328 let buffer = {
329 let queue = self.queue.lock();
330 if prev_id == queue.id {
332 debug_assert_ne!(queue.len(), queue.capacity());
333 Some(CertBuffer::take_and_replace(queue))
334 } else {
335 None
336 }
337 };
338
339 if let Some(buffer) = buffer {
340 self.metrics.partial_batches.inc();
341 self.process_queue(buffer).await;
342 return rx.try_recv().unwrap();
344 }
345
346 rx.await.unwrap()
349 }
350
351 async fn process_queue(&self, buffer: CertBuffer) {
352 let committee = self.committee.clone();
353 let metrics = self.metrics.clone();
354 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
355 Handle::current()
356 .spawn_blocking(move || {
357 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
358 })
359 .await
360 .expect("Spawn blocking should not fail");
361 }
362
363 fn process_queue_sync(
364 committee: Arc<Committee>,
365 metrics: Arc<SignatureVerifierMetrics>,
366 buffer: CertBuffer,
367 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
368 ) {
369 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
370
371 let results = batch_verify_certificates(
372 &committee,
373 &buffer.certs.iter().collect_vec(),
374 zklogin_inputs_cache,
375 );
376 izip!(
377 results.into_iter(),
378 buffer.certs.into_iter(),
379 buffer.senders.into_iter(),
380 )
381 .for_each(|(result, cert, tx)| {
382 tx.send(match result {
383 Ok(()) => {
384 metrics.total_verified_certs.inc();
385 Ok(VerifiedCertificate::new_unchecked(cert))
386 }
387 Err(e) => {
388 metrics.total_failed_certs.inc();
389 Err(e)
390 }
391 })
392 .ok();
393 });
394 }
395
396 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
399 let mut jwks = self.jwks.write();
400 match jwks.entry(jwk_id.clone()) {
401 im::hashmap::Entry::Occupied(_) => {
402 debug!("JWK with kid {:?} already exists", jwk_id);
403 }
404 im::hashmap::Entry::Vacant(entry) => {
405 debug!("inserting JWK with kid: {:?}", jwk_id);
406 entry.insert(jwk.clone());
407 }
408 }
409 }
410
411 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
412 let jwks = self.jwks.read();
413 jwks.get(jwk_id) == Some(jwk)
414 }
415
416 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
417 self.jwks.read().clone()
418 }
419
420 pub fn verify_tx_with_current_aliases(
423 &self,
424 signed_tx: &SenderSignedData,
425 ) -> SuiResult<NonEmpty<(u8, Option<SequenceNumber>)>> {
426 let mut alias_versions_by_signer = Vec::new();
427 let mut aliases = Vec::new();
428
429 let signers = signed_tx.intent_message().value.required_signers();
431 for signer in signers {
432 if !self.enable_address_aliases {
433 alias_versions_by_signer.push((signer, None));
434 aliases.push((signer, NonEmpty::singleton(signer)));
435 } else {
436 let address_aliases =
438 address_alias::get_address_aliases_from_store(&self.object_store, signer)?;
439
440 alias_versions_by_signer.push((signer, address_aliases.as_ref().map(|(_, v)| *v)));
441 aliases.push((
442 signer,
443 address_aliases
444 .map(|(aliases, _)| {
445 NonEmpty::from_vec(aliases.aliases.contents.clone()).unwrap_or_else(
446 || {
447 debug_fatal!(
448 "AddressAliases struct has empty aliases field for signer {}",
449 signer
450 );
451 NonEmpty::singleton(signer)
452 },
453 )
454 })
455 .unwrap_or(NonEmpty::singleton(signer)),
456 ));
457 }
458 }
459
460 let sig_indices = self.verify_tx(signed_tx, &alias_versions_by_signer, aliases)?;
462
463 let result: Vec<(u8, Option<SequenceNumber>)> = sig_indices
465 .into_iter()
466 .zip_eq(alias_versions_by_signer.into_iter().map(|(_, seq)| seq))
467 .collect();
468
469 Ok(NonEmpty::from_vec(result).expect("must have at least one required_signer"))
470 }
471
472 pub fn verify_tx_require_no_aliases(&self, signed_tx: &SenderSignedData) -> SuiResult {
473 let current_aliases = self.verify_tx_with_current_aliases(signed_tx)?;
474 for (_, version) in current_aliases {
475 if version.is_some() {
476 return Err(SuiErrorKind::AliasesChanged.into());
477 }
478 }
479 Ok(())
480 }
481
482 fn verify_tx(
483 &self,
484 signed_tx: &SenderSignedData,
485 alias_versions: &Vec<(SuiAddress, Option<SequenceNumber>)>,
486 aliased_addresses: Vec<(SuiAddress, NonEmpty<SuiAddress>)>,
487 ) -> SuiResult<Vec<u8>> {
488 let digest = signed_tx.full_message_digest_with_alias_versions(alias_versions);
489
490 if let Some(indices) = self.signed_data_cache.get_cached(&digest) {
491 return Ok(indices);
492 }
493
494 let jwks = self.jwks.read().clone();
495 let verify_params = VerifyParams::new(
496 jwks,
497 self.zk_login_params.supported_providers.clone(),
498 self.zk_login_params.env,
499 self.zk_login_params.verify_legacy_zklogin_address,
500 self.zk_login_params.accept_zklogin_in_multisig,
501 self.zk_login_params.accept_passkey_in_multisig,
502 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
503 self.zk_login_params.additional_multisig_checks,
504 self.zk_login_params.validate_zklogin_public_identifier,
505 );
506 let indices = verify_sender_signed_data_message_signatures(
507 signed_tx,
508 self.committee.epoch(),
509 &verify_params,
510 self.zklogin_inputs_cache.clone(),
511 aliased_addresses,
512 )?;
513
514 self.signed_data_cache
515 .cache_with_value(digest, indices.clone());
516 Ok(indices)
517 }
518
519 pub fn clear_signature_cache(&self) {
520 self.certificate_cache.clear();
521 self.signed_data_cache.clear();
522 self.zklogin_inputs_cache.clear();
523 }
524}
525
526pub struct SignatureVerifierMetrics {
527 pub certificate_signatures_cache_hits: IntCounter,
528 pub certificate_signatures_cache_misses: IntCounter,
529 pub certificate_signatures_cache_evictions: IntCounter,
530 pub signed_data_cache_hits: IntCounter,
531 pub signed_data_cache_misses: IntCounter,
532 pub signed_data_cache_evictions: IntCounter,
533 pub zklogin_inputs_cache_hits: IntCounter,
534 pub zklogin_inputs_cache_misses: IntCounter,
535 pub zklogin_inputs_cache_evictions: IntCounter,
536 timeouts: IntCounter,
537 full_batches: IntCounter,
538 partial_batches: IntCounter,
539 total_verified_certs: IntCounter,
540 total_failed_certs: IntCounter,
541}
542
543impl SignatureVerifierMetrics {
544 pub fn new(registry: &Registry) -> Arc<Self> {
545 Arc::new(Self {
546 certificate_signatures_cache_hits: register_int_counter_with_registry!(
547 "certificate_signatures_cache_hits",
548 "Number of certificates which were known to be verified because of signature cache.",
549 registry
550 )
551 .unwrap(),
552 certificate_signatures_cache_misses: register_int_counter_with_registry!(
553 "certificate_signatures_cache_misses",
554 "Number of certificates which missed the signature cache",
555 registry
556 )
557 .unwrap(),
558 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
559 "certificate_signatures_cache_evictions",
560 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
561 registry
562 )
563 .unwrap(),
564 signed_data_cache_hits: register_int_counter_with_registry!(
565 "signed_data_cache_hits",
566 "Number of signed data which were known to be verified because of signature cache.",
567 registry
568 )
569 .unwrap(),
570 signed_data_cache_misses: register_int_counter_with_registry!(
571 "signed_data_cache_misses",
572 "Number of signed data which missed the signature cache.",
573 registry
574 )
575 .unwrap(),
576 signed_data_cache_evictions: register_int_counter_with_registry!(
577 "signed_data_cache_evictions",
578 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
579 registry
580 )
581 .unwrap(),
582 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
583 "zklogin_inputs_cache_hits",
584 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
585 registry
586 )
587 .unwrap(),
588 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
589 "zklogin_inputs_cache_misses",
590 "Number of zklogin signatures which missed the zklogin inputs cache.",
591 registry
592 )
593 .unwrap(),
594 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
595 "zklogin_inputs_cache_evictions",
596 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
597 registry
598 )
599 .unwrap(),
600 timeouts: register_int_counter_with_registry!(
601 "async_batch_verifier_timeouts",
602 "Number of times batch verifier times out and verifies a partial batch",
603 registry
604 )
605 .unwrap(),
606 full_batches: register_int_counter_with_registry!(
607 "async_batch_verifier_full_batches",
608 "Number of times batch verifier verifies a full batch",
609 registry
610 )
611 .unwrap(),
612 partial_batches: register_int_counter_with_registry!(
613 "async_batch_verifier_partial_batches",
614 "Number of times batch verifier verifies a partial batch",
615 registry
616 )
617 .unwrap(),
618 total_verified_certs: register_int_counter_with_registry!(
619 "async_batch_verifier_total_verified_certs",
620 "Total number of certs batch verifier has verified",
621 registry
622 )
623 .unwrap(),
624 total_failed_certs: register_int_counter_with_registry!(
625 "async_batch_verifier_total_failed_certs",
626 "Total number of certs batch verifier has rejected",
627 registry
628 )
629 .unwrap(),
630 })
631 }
632}
633
634pub fn batch_verify_all_certificates_and_checkpoints(
636 committee: &Committee,
637 certs: &[&CertifiedTransaction],
638 checkpoints: &[&SignedCheckpointSummary],
639) -> SuiResult {
640 for ckpt in checkpoints {
643 ckpt.data().verify_epoch(committee.epoch())?;
644 }
645
646 batch_verify(committee, certs, checkpoints)
647}
648
649pub fn batch_verify_certificates(
651 committee: &Committee,
652 certs: &[&CertifiedTransaction],
653 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
654) -> Vec<SuiResult> {
655 let verify_params = VerifyParams::default();
657 match batch_verify(committee, certs, &[]) {
658 Ok(_) => vec![Ok(()); certs.len()],
659
660 Err(_) if certs.len() > 1 => certs
662 .iter()
663 .map(|c| {
666 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
667 })
668 .collect(),
669
670 Err(e) => vec![Err(e)],
671 }
672}
673
674fn batch_verify(
675 committee: &Committee,
676 certs: &[&CertifiedTransaction],
677 checkpoints: &[&SignedCheckpointSummary],
678) -> SuiResult {
679 let mut obligation = VerificationObligation::default();
680
681 for cert in certs {
682 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::sui_app(cert.scope()));
683 cert.auth_sig()
684 .add_to_verification_obligation(committee, &mut obligation, idx)?;
685 }
686
687 for ckpt in checkpoints {
688 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::sui_app(ckpt.scope()));
689 ckpt.auth_sig()
690 .add_to_verification_obligation(committee, &mut obligation, idx)?;
691 }
692
693 obligation.verify_all()
694}