1use either::Either;
5use fastcrypto_zkp::bn254::zk_login::JwkId;
6use fastcrypto_zkp::bn254::zk_login::{JWK, OIDCProvider};
7use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv;
8use futures::pin_mut;
9use im::hashmap::HashMap as ImHashMap;
10use itertools::{Itertools as _, izip};
11use mysten_common::debug_fatal;
12use mysten_metrics::monitored_scope;
13use nonempty::NonEmpty;
14use parking_lot::{Mutex, MutexGuard, RwLock};
15use prometheus::{IntCounter, Registry, register_int_counter_with_registry};
16use shared_crypto::intent::Intent;
17use std::sync::Arc;
18use sui_types::address_alias;
19use sui_types::base_types::{SequenceNumber, SuiAddress};
20use sui_types::digests::SenderSignedDataDigest;
21use sui_types::digests::ZKLoginInputsDigest;
22use sui_types::signature_verification::{
23 VerifiedDigestCache, verify_sender_signed_data_message_signatures,
24};
25use sui_types::storage::ObjectStore;
26use sui_types::transaction::{SenderSignedData, TransactionDataAPI};
27use sui_types::{
28 committee::Committee,
29 crypto::{AuthoritySignInfoTrait, VerificationObligation},
30 digests::CertificateDigest,
31 error::{SuiErrorKind, SuiResult},
32 message_envelope::Message,
33 messages_checkpoint::SignedCheckpointSummary,
34 signature::VerifyParams,
35 transaction::{CertifiedTransaction, VerifiedCertificate},
36};
37use tap::TapFallible;
38use tokio::runtime::Handle;
39use tokio::{
40 sync::oneshot,
41 time::{Duration, timeout},
42};
43use tracing::debug;
44
45const BATCH_TIMEOUT_MS: Duration = Duration::from_millis(10);
47
48const MAX_BATCH_SIZE: usize = 8;
53
54type Sender = oneshot::Sender<SuiResult<VerifiedCertificate>>;
55
56struct CertBuffer {
57 certs: Vec<CertifiedTransaction>,
58 senders: Vec<Sender>,
59 id: u64,
60}
61
62impl CertBuffer {
63 fn new(capacity: usize) -> Self {
64 Self {
65 certs: Vec::with_capacity(capacity),
66 senders: Vec::with_capacity(capacity),
67 id: 0,
68 }
69 }
70
71 fn take_and_replace(mut guard: MutexGuard<'_, Self>) -> Self {
73 let this = &mut *guard;
74 let mut new = CertBuffer::new(this.capacity());
75 new.id = this.id + 1;
76 std::mem::swap(&mut new, this);
77 new
78 }
79
80 fn capacity(&self) -> usize {
81 debug_assert_eq!(self.certs.capacity(), self.senders.capacity());
82 self.certs.capacity()
83 }
84
85 fn len(&self) -> usize {
86 debug_assert_eq!(self.certs.len(), self.senders.len());
87 self.certs.len()
88 }
89
90 fn push(&mut self, tx: Sender, cert: CertifiedTransaction) {
91 self.senders.push(tx);
92 self.certs.push(cert);
93 }
94}
95
96pub struct SignatureVerifier {
100 committee: Arc<Committee>,
101 object_store: Arc<dyn ObjectStore + Send + Sync>,
102 certificate_cache: VerifiedDigestCache<CertificateDigest>,
103 signed_data_cache: VerifiedDigestCache<SenderSignedDataDigest>,
104 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
105
106 jwks: RwLock<ImHashMap<JwkId, JWK>>,
112
113 zk_login_params: ZkLoginParams,
115
116 enable_address_aliases: bool,
118
119 queue: Mutex<CertBuffer>,
120 pub metrics: Arc<SignatureVerifierMetrics>,
121}
122
123#[derive(Clone)]
125struct ZkLoginParams {
126 pub supported_providers: Vec<OIDCProvider>,
128 pub env: ZkLoginEnv,
130 pub verify_legacy_zklogin_address: bool,
132 pub accept_zklogin_in_multisig: bool,
134 pub accept_passkey_in_multisig: bool,
136 pub zklogin_max_epoch_upper_bound_delta: Option<u64>,
138 pub additional_multisig_checks: bool,
140}
141
142impl SignatureVerifier {
143 pub fn new_with_batch_size(
144 committee: Arc<Committee>,
145 object_store: Arc<dyn ObjectStore + Send + Sync>,
146 batch_size: usize,
147 metrics: Arc<SignatureVerifierMetrics>,
148 supported_providers: Vec<OIDCProvider>,
149 env: ZkLoginEnv,
150 verify_legacy_zklogin_address: bool,
151 accept_zklogin_in_multisig: bool,
152 accept_passkey_in_multisig: bool,
153 zklogin_max_epoch_upper_bound_delta: Option<u64>,
154 additional_multisig_checks: bool,
155 enable_address_aliases: bool,
156 ) -> Self {
157 Self {
158 committee,
159 object_store,
160 certificate_cache: VerifiedDigestCache::new(
161 metrics.certificate_signatures_cache_hits.clone(),
162 metrics.certificate_signatures_cache_misses.clone(),
163 metrics.certificate_signatures_cache_evictions.clone(),
164 ),
165 signed_data_cache: VerifiedDigestCache::new(
166 metrics.signed_data_cache_hits.clone(),
167 metrics.signed_data_cache_misses.clone(),
168 metrics.signed_data_cache_evictions.clone(),
169 ),
170 zklogin_inputs_cache: Arc::new(VerifiedDigestCache::new(
171 metrics.zklogin_inputs_cache_hits.clone(),
172 metrics.zklogin_inputs_cache_misses.clone(),
173 metrics.zklogin_inputs_cache_evictions.clone(),
174 )),
175 jwks: Default::default(),
176 enable_address_aliases,
177 queue: Mutex::new(CertBuffer::new(batch_size)),
178 metrics,
179 zk_login_params: ZkLoginParams {
180 supported_providers,
181 env,
182 verify_legacy_zklogin_address,
183 accept_zklogin_in_multisig,
184 accept_passkey_in_multisig,
185 zklogin_max_epoch_upper_bound_delta,
186 additional_multisig_checks,
187 },
188 }
189 }
190
191 pub fn new(
192 committee: Arc<Committee>,
193 object_store: Arc<dyn ObjectStore + Send + Sync>,
194 metrics: Arc<SignatureVerifierMetrics>,
195 supported_providers: Vec<OIDCProvider>,
196 zklogin_env: ZkLoginEnv,
197 verify_legacy_zklogin_address: bool,
198 accept_zklogin_in_multisig: bool,
199 accept_passkey_in_multisig: bool,
200 zklogin_max_epoch_upper_bound_delta: Option<u64>,
201 additional_multisig_checks: bool,
202 enable_address_aliases: bool,
203 ) -> Self {
204 Self::new_with_batch_size(
205 committee,
206 object_store,
207 MAX_BATCH_SIZE,
208 metrics,
209 supported_providers,
210 zklogin_env,
211 verify_legacy_zklogin_address,
212 accept_zklogin_in_multisig,
213 accept_passkey_in_multisig,
214 zklogin_max_epoch_upper_bound_delta,
215 additional_multisig_checks,
216 enable_address_aliases,
217 )
218 }
219
220 pub fn verify_certs_and_checkpoints(
222 &self,
223 certs: Vec<&CertifiedTransaction>,
224 checkpoints: Vec<&SignedCheckpointSummary>,
225 ) -> SuiResult {
226 let certs: Vec<_> = certs
227 .into_iter()
228 .filter(|cert| !self.certificate_cache.is_cached(&cert.certificate_digest()))
229 .collect();
230
231 for cert in &certs {
235 self.verify_tx_require_no_aliases(cert.data())?;
236 }
237 batch_verify_all_certificates_and_checkpoints(&self.committee, &certs, &checkpoints)?;
238 self.certificate_cache
239 .cache_digests(certs.into_iter().map(|c| c.certificate_digest()).collect());
240 Ok(())
241 }
242
243 pub async fn verify_cert(&self, cert: CertifiedTransaction) -> SuiResult<VerifiedCertificate> {
245 let cert_digest = cert.certificate_digest();
246 if self.certificate_cache.is_cached(&cert_digest) {
247 return Ok(VerifiedCertificate::new_unchecked(cert));
248 }
249 self.verify_tx_require_no_aliases(cert.data())?;
251 self.verify_cert_skip_cache(cert)
252 .await
253 .tap_ok(|_| self.certificate_cache.cache_digest(cert_digest))
254 }
255
256 pub async fn multi_verify_certs(
257 &self,
258 certs: Vec<CertifiedTransaction>,
259 ) -> Vec<SuiResult<VerifiedCertificate>> {
260 let mut futures = Vec::with_capacity(certs.len());
263 for cert in certs {
264 futures.push(self.verify_cert(cert));
265 }
266 futures::future::join_all(futures).await
267 }
268
269 pub async fn verify_cert_skip_cache(
271 &self,
272 cert: CertifiedTransaction,
273 ) -> SuiResult<VerifiedCertificate> {
274 if cert.auth_sig().epoch != self.committee.epoch() {
277 return Err(SuiErrorKind::WrongEpoch {
278 expected_epoch: self.committee.epoch(),
279 actual_epoch: cert.auth_sig().epoch,
280 }
281 .into());
282 }
283
284 self.verify_cert_inner(cert).await
285 }
286
287 async fn verify_cert_inner(
288 &self,
289 cert: CertifiedTransaction,
290 ) -> SuiResult<VerifiedCertificate> {
291 let (tx, rx) = oneshot::channel();
295 pin_mut!(rx);
296
297 let prev_id_or_buffer = {
298 let mut queue = self.queue.lock();
299 queue.push(tx, cert);
300 if queue.len() == queue.capacity() {
301 Either::Right(CertBuffer::take_and_replace(queue))
302 } else {
303 Either::Left(queue.id)
304 }
305 };
306 let prev_id = match prev_id_or_buffer {
307 Either::Left(prev_id) => prev_id,
308 Either::Right(buffer) => {
309 self.metrics.full_batches.inc();
310 self.process_queue(buffer).await;
311 return rx.try_recv().unwrap();
313 }
314 };
315
316 if let Ok(res) = timeout(BATCH_TIMEOUT_MS, &mut rx).await {
317 return res.unwrap();
319 }
320 self.metrics.timeouts.inc();
321
322 let buffer = {
323 let queue = self.queue.lock();
324 if prev_id == queue.id {
326 debug_assert_ne!(queue.len(), queue.capacity());
327 Some(CertBuffer::take_and_replace(queue))
328 } else {
329 None
330 }
331 };
332
333 if let Some(buffer) = buffer {
334 self.metrics.partial_batches.inc();
335 self.process_queue(buffer).await;
336 return rx.try_recv().unwrap();
338 }
339
340 rx.await.unwrap()
343 }
344
345 async fn process_queue(&self, buffer: CertBuffer) {
346 let committee = self.committee.clone();
347 let metrics = self.metrics.clone();
348 let zklogin_inputs_cache = self.zklogin_inputs_cache.clone();
349 Handle::current()
350 .spawn_blocking(move || {
351 Self::process_queue_sync(committee, metrics, buffer, zklogin_inputs_cache)
352 })
353 .await
354 .expect("Spawn blocking should not fail");
355 }
356
357 fn process_queue_sync(
358 committee: Arc<Committee>,
359 metrics: Arc<SignatureVerifierMetrics>,
360 buffer: CertBuffer,
361 zklogin_inputs_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
362 ) {
363 let _scope = monitored_scope("BatchCertificateVerifier::process_queue");
364
365 let results = batch_verify_certificates(
366 &committee,
367 &buffer.certs.iter().collect_vec(),
368 zklogin_inputs_cache,
369 );
370 izip!(
371 results.into_iter(),
372 buffer.certs.into_iter(),
373 buffer.senders.into_iter(),
374 )
375 .for_each(|(result, cert, tx)| {
376 tx.send(match result {
377 Ok(()) => {
378 metrics.total_verified_certs.inc();
379 Ok(VerifiedCertificate::new_unchecked(cert))
380 }
381 Err(e) => {
382 metrics.total_failed_certs.inc();
383 Err(e)
384 }
385 })
386 .ok();
387 });
388 }
389
390 pub(crate) fn insert_jwk(&self, jwk_id: &JwkId, jwk: &JWK) {
393 let mut jwks = self.jwks.write();
394 match jwks.entry(jwk_id.clone()) {
395 im::hashmap::Entry::Occupied(_) => {
396 debug!("JWK with kid {:?} already exists", jwk_id);
397 }
398 im::hashmap::Entry::Vacant(entry) => {
399 debug!("inserting JWK with kid: {:?}", jwk_id);
400 entry.insert(jwk.clone());
401 }
402 }
403 }
404
405 pub fn has_jwk(&self, jwk_id: &JwkId, jwk: &JWK) -> bool {
406 let jwks = self.jwks.read();
407 jwks.get(jwk_id) == Some(jwk)
408 }
409
410 pub fn get_jwks(&self) -> ImHashMap<JwkId, JWK> {
411 self.jwks.read().clone()
412 }
413
414 pub fn verify_tx_with_current_aliases(
415 &self,
416 signed_tx: &SenderSignedData,
417 ) -> SuiResult<NonEmpty<(SuiAddress, Option<SequenceNumber>)>> {
418 let mut versions = Vec::new();
419 let mut aliases = Vec::new();
420
421 let signers = signed_tx.intent_message().value.required_signers();
423 for signer in signers {
424 if !self.enable_address_aliases {
425 versions.push((signer, None));
426 aliases.push((signer, NonEmpty::singleton(signer)));
427 } else {
428 let address_aliases =
430 address_alias::get_address_aliases_from_store(&self.object_store, signer)?;
431
432 versions.push((signer, address_aliases.as_ref().map(|(_, v)| *v)));
433 aliases.push((
434 signer,
435 address_aliases
436 .map(|(aliases, _)| {
437 NonEmpty::from_vec(aliases.aliases.contents.clone()).unwrap_or_else(
438 || {
439 debug_fatal!(
440 "AddressAliases struct has empty aliases field for signer {}",
441 signer
442 );
443 NonEmpty::singleton(signer)
444 },
445 )
446 })
447 .unwrap_or(NonEmpty::singleton(signer)),
448 ));
449 }
450 }
451
452 self.verify_tx(signed_tx, aliases)?;
453 Ok(NonEmpty::from_vec(versions).expect("must have at least one required_signer"))
454 }
455
456 pub fn verify_tx_require_no_aliases(&self, signed_tx: &SenderSignedData) -> SuiResult {
457 let current_aliases = self.verify_tx_with_current_aliases(signed_tx)?;
458 for (_, version) in current_aliases {
459 if version.is_some() {
460 return Err(SuiErrorKind::AliasesChanged.into());
461 }
462 }
463 Ok(())
464 }
465
466 fn verify_tx(
467 &self,
468 signed_tx: &SenderSignedData,
469 aliased_addresses: Vec<(SuiAddress, NonEmpty<SuiAddress>)>,
470 ) -> SuiResult {
471 self.signed_data_cache.is_verified(
472 signed_tx.full_message_digest(),
473 || {
474 let jwks = self.jwks.read().clone();
475 let verify_params = VerifyParams::new(
476 jwks,
477 self.zk_login_params.supported_providers.clone(),
478 self.zk_login_params.env,
479 self.zk_login_params.verify_legacy_zklogin_address,
480 self.zk_login_params.accept_zklogin_in_multisig,
481 self.zk_login_params.accept_passkey_in_multisig,
482 self.zk_login_params.zklogin_max_epoch_upper_bound_delta,
483 self.zk_login_params.additional_multisig_checks,
484 );
485 verify_sender_signed_data_message_signatures(
486 signed_tx,
487 self.committee.epoch(),
488 &verify_params,
489 self.zklogin_inputs_cache.clone(),
490 aliased_addresses,
491 )
492 },
493 || Ok(()),
494 )
495 }
496
497 pub fn clear_signature_cache(&self) {
498 self.certificate_cache.clear();
499 self.signed_data_cache.clear();
500 self.zklogin_inputs_cache.clear();
501 }
502}
503
504pub struct SignatureVerifierMetrics {
505 pub certificate_signatures_cache_hits: IntCounter,
506 pub certificate_signatures_cache_misses: IntCounter,
507 pub certificate_signatures_cache_evictions: IntCounter,
508 pub signed_data_cache_hits: IntCounter,
509 pub signed_data_cache_misses: IntCounter,
510 pub signed_data_cache_evictions: IntCounter,
511 pub zklogin_inputs_cache_hits: IntCounter,
512 pub zklogin_inputs_cache_misses: IntCounter,
513 pub zklogin_inputs_cache_evictions: IntCounter,
514 timeouts: IntCounter,
515 full_batches: IntCounter,
516 partial_batches: IntCounter,
517 total_verified_certs: IntCounter,
518 total_failed_certs: IntCounter,
519}
520
521impl SignatureVerifierMetrics {
522 pub fn new(registry: &Registry) -> Arc<Self> {
523 Arc::new(Self {
524 certificate_signatures_cache_hits: register_int_counter_with_registry!(
525 "certificate_signatures_cache_hits",
526 "Number of certificates which were known to be verified because of signature cache.",
527 registry
528 )
529 .unwrap(),
530 certificate_signatures_cache_misses: register_int_counter_with_registry!(
531 "certificate_signatures_cache_misses",
532 "Number of certificates which missed the signature cache",
533 registry
534 )
535 .unwrap(),
536 certificate_signatures_cache_evictions: register_int_counter_with_registry!(
537 "certificate_signatures_cache_evictions",
538 "Number of times we evict a pre-existing key were known to be verified because of signature cache.",
539 registry
540 )
541 .unwrap(),
542 signed_data_cache_hits: register_int_counter_with_registry!(
543 "signed_data_cache_hits",
544 "Number of signed data which were known to be verified because of signature cache.",
545 registry
546 )
547 .unwrap(),
548 signed_data_cache_misses: register_int_counter_with_registry!(
549 "signed_data_cache_misses",
550 "Number of signed data which missed the signature cache.",
551 registry
552 )
553 .unwrap(),
554 signed_data_cache_evictions: register_int_counter_with_registry!(
555 "signed_data_cache_evictions",
556 "Number of times we evict a pre-existing signed data were known to be verified because of signature cache.",
557 registry
558 )
559 .unwrap(),
560 zklogin_inputs_cache_hits: register_int_counter_with_registry!(
561 "zklogin_inputs_cache_hits",
562 "Number of zklogin signature which were known to be partially verified because of zklogin inputs cache.",
563 registry
564 )
565 .unwrap(),
566 zklogin_inputs_cache_misses: register_int_counter_with_registry!(
567 "zklogin_inputs_cache_misses",
568 "Number of zklogin signatures which missed the zklogin inputs cache.",
569 registry
570 )
571 .unwrap(),
572 zklogin_inputs_cache_evictions: register_int_counter_with_registry!(
573 "zklogin_inputs_cache_evictions",
574 "Number of times we evict a pre-existing zklogin inputs digest that was known to be verified because of zklogin inputs cache.",
575 registry
576 )
577 .unwrap(),
578 timeouts: register_int_counter_with_registry!(
579 "async_batch_verifier_timeouts",
580 "Number of times batch verifier times out and verifies a partial batch",
581 registry
582 )
583 .unwrap(),
584 full_batches: register_int_counter_with_registry!(
585 "async_batch_verifier_full_batches",
586 "Number of times batch verifier verifies a full batch",
587 registry
588 )
589 .unwrap(),
590 partial_batches: register_int_counter_with_registry!(
591 "async_batch_verifier_partial_batches",
592 "Number of times batch verifier verifies a partial batch",
593 registry
594 )
595 .unwrap(),
596 total_verified_certs: register_int_counter_with_registry!(
597 "async_batch_verifier_total_verified_certs",
598 "Total number of certs batch verifier has verified",
599 registry
600 )
601 .unwrap(),
602 total_failed_certs: register_int_counter_with_registry!(
603 "async_batch_verifier_total_failed_certs",
604 "Total number of certs batch verifier has rejected",
605 registry
606 )
607 .unwrap(),
608 })
609 }
610}
611
612pub fn batch_verify_all_certificates_and_checkpoints(
614 committee: &Committee,
615 certs: &[&CertifiedTransaction],
616 checkpoints: &[&SignedCheckpointSummary],
617) -> SuiResult {
618 for ckpt in checkpoints {
621 ckpt.data().verify_epoch(committee.epoch())?;
622 }
623
624 batch_verify(committee, certs, checkpoints)
625}
626
627pub fn batch_verify_certificates(
629 committee: &Committee,
630 certs: &[&CertifiedTransaction],
631 zk_login_cache: Arc<VerifiedDigestCache<ZKLoginInputsDigest>>,
632) -> Vec<SuiResult> {
633 let verify_params = VerifyParams::default();
635 match batch_verify(committee, certs, &[]) {
636 Ok(_) => vec![Ok(()); certs.len()],
637
638 Err(_) if certs.len() > 1 => certs
640 .iter()
641 .map(|c| {
644 c.verify_signatures_authenticated(committee, &verify_params, zk_login_cache.clone())
645 })
646 .collect(),
647
648 Err(e) => vec![Err(e)],
649 }
650}
651
652fn batch_verify(
653 committee: &Committee,
654 certs: &[&CertifiedTransaction],
655 checkpoints: &[&SignedCheckpointSummary],
656) -> SuiResult {
657 let mut obligation = VerificationObligation::default();
658
659 for cert in certs {
660 let idx = obligation.add_message(cert.data(), cert.epoch(), Intent::sui_app(cert.scope()));
661 cert.auth_sig()
662 .add_to_verification_obligation(committee, &mut obligation, idx)?;
663 }
664
665 for ckpt in checkpoints {
666 let idx = obligation.add_message(ckpt.data(), ckpt.epoch(), Intent::sui_app(ckpt.scope()));
667 ckpt.auth_sig()
668 .add_to_verification_obligation(committee, &mut obligation, idx)?;
669 }
670
671 obligation.verify_all()
672}