sui_bridge/client/
bridge_authority_aggregator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeAuthorityAggregator aggregates signatures from BridgeCommittee.
5
6use crate::client::bridge_client::BridgeClient;
7use crate::crypto::BridgeAuthorityPublicKeyBytes;
8use crate::crypto::BridgeAuthoritySignInfo;
9use crate::error::{BridgeError, BridgeResult};
10use crate::metrics::BridgeMetrics;
11use crate::types::BridgeCommitteeValiditySignInfo;
12use crate::types::{
13    BridgeAction, BridgeCommittee, CertifiedBridgeAction, VerifiedCertifiedBridgeAction,
14    VerifiedSignedBridgeAction,
15};
16use std::collections::BTreeMap;
17use std::collections::BTreeSet;
18use std::collections::btree_map::Entry;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_authority_aggregation::ReduceOutput;
22use sui_authority_aggregation::{SigRequestPrefs, quorum_map_then_reduce_with_timeout_and_prefs};
23use sui_types::base_types::ConciseableName;
24use sui_types::committee::StakeUnit;
25use sui_types::committee::TOTAL_VOTING_POWER;
26use tracing::{error, info, warn};
27
28const TOTAL_TIMEOUT_MS: u64 = 5_000;
29const PREFETCH_TIMEOUT_MS: u64 = 1_500;
30const RETRY_INTERVAL_MS: u64 = 500;
31
32pub struct BridgeAuthorityAggregator {
33    pub committee: Arc<BridgeCommittee>,
34    pub clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
35    pub metrics: Arc<BridgeMetrics>,
36    pub committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
37}
38
39impl BridgeAuthorityAggregator {
40    pub fn new(
41        committee: Arc<BridgeCommittee>,
42        metrics: Arc<BridgeMetrics>,
43        committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
44    ) -> Self {
45        let clients: BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>> = committee
46            .members()
47            .iter()
48            .filter_map(|(name, authority)| {
49                if authority.is_blocklisted {
50                    warn!("Ignored blocklisted authority {:?} (stake: {}) when creating BridgeAuthorityAggregator", name.concise(), authority.voting_power);
51                    return None;
52                }
53                // TODO: we could also record bad stakes here and use in signature aggregation
54                match BridgeClient::new(
55                    name.clone(),
56                    committee.clone(),
57                ) {
58                    Ok(client) => Some((name.clone(), Arc::new(client))),
59                    Err(e) => {
60                        error!(
61                            "Failed to create BridgeClient for {:?}: {:?}",
62                            name.concise(),
63                            e
64                        );
65                        None
66                    }
67                }
68            })
69            .collect::<BTreeMap<_, _>>();
70        Self {
71            committee,
72            clients: Arc::new(clients),
73            metrics,
74            committee_keys_to_names,
75        }
76    }
77
78    #[cfg(test)]
79    pub fn new_for_testing(committee: Arc<BridgeCommittee>) -> Self {
80        Self::new(
81            committee,
82            Arc::new(BridgeMetrics::new_for_testing()),
83            Arc::new(BTreeMap::new()),
84        )
85    }
86
87    pub async fn request_committee_signatures(
88        &self,
89        action: BridgeAction,
90    ) -> BridgeResult<VerifiedCertifiedBridgeAction> {
91        let state = GetSigsState::new(
92            action.approval_threshold(),
93            self.committee.clone(),
94            self.metrics.clone(),
95            self.committee_keys_to_names.clone(),
96        );
97        request_sign_bridge_action_into_certification(
98            action,
99            self.committee.clone(),
100            self.clients.clone(),
101            state,
102            Duration::from_millis(PREFETCH_TIMEOUT_MS),
103        )
104        .await
105    }
106}
107
108#[derive(Debug)]
109struct GetSigsState {
110    total_bad_stake: StakeUnit,
111    total_ok_stake: StakeUnit,
112    sigs: BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthoritySignInfo>,
113    validity_threshold: StakeUnit,
114    committee: Arc<BridgeCommittee>,
115    metrics: Arc<BridgeMetrics>,
116    committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
117}
118
119impl GetSigsState {
120    fn new(
121        validity_threshold: StakeUnit,
122        committee: Arc<BridgeCommittee>,
123        metrics: Arc<BridgeMetrics>,
124        committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
125    ) -> Self {
126        Self {
127            committee,
128            total_bad_stake: 0,
129            total_ok_stake: 0,
130            sigs: BTreeMap::new(),
131            validity_threshold,
132            metrics,
133            committee_keys_to_names,
134        }
135    }
136
137    fn handle_verified_signed_action(
138        &mut self,
139        name: BridgeAuthorityPublicKeyBytes,
140        stake: StakeUnit,
141        signed_action: VerifiedSignedBridgeAction,
142    ) -> BridgeResult<Option<VerifiedCertifiedBridgeAction>> {
143        info!("Got signatures from {}, stake: {}", name.concise(), stake);
144        if !self.committee.is_active_member(&name) {
145            return Err(BridgeError::InvalidBridgeAuthority(name));
146        }
147
148        // safeguard here to assert passed in stake matches the stake in committee
149        // unwrap safe: if name is an active member then it must be in committee set
150        assert_eq!(stake, self.committee.member(&name).unwrap().voting_power);
151
152        match self.sigs.entry(name.clone()) {
153            Entry::Vacant(e) => {
154                e.insert(signed_action.auth_sig().clone());
155                self.add_ok_stake(stake, &name);
156            }
157            Entry::Occupied(_e) => {
158                return Err(BridgeError::AuthoritySignatureDuplication(format!(
159                    "Got signatures for the same authority twice: {}",
160                    name.concise()
161                )));
162            }
163        }
164        if self.total_ok_stake >= self.validity_threshold {
165            info!(
166                "Got enough signatures from {} validators with total_ok_stake {}",
167                self.sigs.len(),
168                self.total_ok_stake
169            );
170            let signatures = self
171                .sigs
172                .iter()
173                .map(|(k, v)| (k.clone(), v.signature.clone()))
174                .collect::<BTreeMap<_, _>>();
175            let sig_info = BridgeCommitteeValiditySignInfo { signatures };
176            let certified_action: sui_types::message_envelope::Envelope<
177                BridgeAction,
178                BridgeCommitteeValiditySignInfo,
179            > = CertifiedBridgeAction::new_from_data_and_sig(
180                signed_action.into_inner().into_data(),
181                sig_info,
182            );
183            // `BridgeClient` already verified individual signatures
184            Ok(Some(VerifiedCertifiedBridgeAction::new_from_verified(
185                certified_action,
186            )))
187        } else {
188            Ok(None)
189        }
190    }
191
192    fn add_ok_stake(&mut self, ok_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
193        if let Some(host_name) = self.committee_keys_to_names.get(name) {
194            self.metrics
195                .auth_agg_ok_responses
196                .with_label_values(&[host_name])
197                .inc();
198        }
199        self.total_ok_stake += ok_stake;
200    }
201
202    fn add_bad_stake(&mut self, bad_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
203        if let Some(host_name) = self.committee_keys_to_names.get(name) {
204            self.metrics
205                .auth_agg_bad_responses
206                .with_label_values(&[host_name])
207                .inc();
208        }
209        self.total_bad_stake += bad_stake;
210    }
211
212    fn is_too_many_error(&self) -> bool {
213        TOTAL_VOTING_POWER - self.total_bad_stake - self.committee.total_blocklisted_stake()
214            < self.validity_threshold
215    }
216}
217
218async fn request_sign_bridge_action_into_certification(
219    action: BridgeAction,
220    committee: Arc<BridgeCommittee>,
221    clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
222    state: GetSigsState,
223    prefetch_timeout: Duration,
224) -> BridgeResult<VerifiedCertifiedBridgeAction> {
225    // `preferences` is used as a trick here to influence the order of validators to be requested.
226    // * if `Some(_)`, then we will request validators in the order of the voting power.
227    // * if `None`, we still refer to voting power, but they are shuffled by randomness.
228    // Because ethereum gas price is not negligible, when the signatures are to be verified on ethereum,
229    // we pass in `Some` to make sure the validators with higher voting power are requested first
230    // to save gas cost.
231    let preference = match action {
232        BridgeAction::SuiToEthBridgeAction(_)
233        | BridgeAction::SuiToEthTokenTransfer(_)
234        | BridgeAction::SuiToEthTokenTransferV2(_) => Some(SigRequestPrefs {
235            ordering_pref: BTreeSet::new(),
236            prefetch_timeout,
237        }),
238        BridgeAction::EthToSuiBridgeAction(_) | BridgeAction::EthToSuiTokenTransferV2(_) => None,
239        _ => {
240            if action.chain_id().is_sui_chain() {
241                None
242            } else {
243                Some(SigRequestPrefs {
244                    ordering_pref: BTreeSet::new(),
245                    prefetch_timeout,
246                })
247            }
248        }
249    };
250    let (result, _) = quorum_map_then_reduce_with_timeout_and_prefs(
251        committee,
252        clients,
253        preference,
254        state,
255        |name, client| {
256            Box::pin(async move {
257                let start = std::time::Instant::now();
258                let timeout = Duration::from_millis(TOTAL_TIMEOUT_MS);
259                let retry_interval = Duration::from_millis(RETRY_INTERVAL_MS);
260                while start.elapsed() < timeout {
261                    match client.request_sign_bridge_action(action.clone()).await {
262                        Ok(result) => {
263                            return Ok(result);
264                        }
265                        // retryable errors
266                        Err(BridgeError::TxNotFinalized) => {
267                            warn!("Bridge authority {} observing transaction not yet finalized, retrying in {:?}", name.concise(), retry_interval);
268                            tokio::time::sleep(retry_interval).await;
269                        }
270                        // non-retryable errors
271                        Err(e) => {
272                            return Err(e);
273                        }
274                    }
275                }
276                Err(BridgeError::TransientProviderError(format!("Bridge authority {} did not observe finalized transaction after {:?}", name.concise(), timeout)))
277            })
278        },
279        |mut state, name, stake, result| {
280            Box::pin(async move {
281                match result {
282                    Ok(verified_signed_action) => {
283                        match state.handle_verified_signed_action(
284                            name.clone(),
285                            stake,
286                            verified_signed_action,
287                        ) {
288                            Ok(Some(certified_action)) => {
289                                return ReduceOutput::Success(certified_action)
290                            }
291                            Ok(None) => (),
292                            Err(e) => {
293                                error!(
294                                    "Failed to handle verified signed action from {}: {:?}",
295                                    name.concise(),
296                                    e
297                                );
298                                state.add_bad_stake(stake, &name);
299                            }
300                        }
301                    }
302                    Err(e) => {
303                        warn!(
304                            "Failed to get signature from {:?}. Error: {:?}",
305                            name.concise(),
306                            e
307                        );
308                        state.add_bad_stake(stake, &name);
309                    }
310                };
311
312                // If bad stake (including blocklisted stake) is too high to reach validity threshold, return error
313                if state.is_too_many_error() {
314                    ReduceOutput::Failed(state)
315                } else {
316                    ReduceOutput::Continue(state)
317                }
318            })
319        },
320        Duration::from_millis(TOTAL_TIMEOUT_MS),
321    )
322    .await
323    .map_err(|state| {
324        error!(
325            "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
326            state.total_bad_stake,
327            state.committee.total_blocklisted_stake(),
328            state.total_ok_stake,
329            state.validity_threshold,
330        );
331        BridgeError::AuthoritySignatureAggregationTooManyError(format!(
332            "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
333            state.total_bad_stake,
334            state.committee.total_blocklisted_stake(),
335            state.total_ok_stake,
336            state.validity_threshold,
337        ))
338    })?;
339    Ok(result)
340}
341
342#[cfg(test)]
343mod tests {
344    use std::collections::BTreeSet;
345
346    use fastcrypto::traits::ToFromBytes;
347    use sui_types::committee::VALIDITY_THRESHOLD;
348    use sui_types::digests::TransactionDigest;
349
350    use crate::crypto::BridgeAuthorityPublicKey;
351    use crate::server::mock_handler::BridgeRequestMockHandler;
352
353    use super::*;
354    use crate::test_utils::{
355        get_test_authorities_and_run_mock_bridge_server, get_test_authority_and_key,
356        get_test_sui_to_eth_bridge_action, sign_action_with_key,
357    };
358    use crate::types::BridgeCommittee;
359
360    #[tokio::test]
361    async fn test_bridge_auth_agg_construction() {
362        telemetry_subscribers::init_for_testing();
363
364        let mut authorities = vec![];
365        for _i in 0..4 {
366            let (authority, _, _) = get_test_authority_and_key(2500, 12345);
367            authorities.push(authority);
368        }
369        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
370
371        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
372        assert_eq!(
373            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
374            BTreeSet::from_iter(vec![
375                authorities[0].pubkey_bytes(),
376                authorities[1].pubkey_bytes(),
377                authorities[2].pubkey_bytes(),
378                authorities[3].pubkey_bytes()
379            ])
380        );
381
382        // authority 2 is blocklisted
383        authorities[2].is_blocklisted = true;
384        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
385        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
386        assert_eq!(
387            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
388            BTreeSet::from_iter(vec![
389                authorities[0].pubkey_bytes(),
390                authorities[1].pubkey_bytes(),
391                authorities[3].pubkey_bytes()
392            ])
393        );
394
395        // authority 3 has bad url
396        authorities[3].base_url = "".into();
397        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
398        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
399        assert_eq!(
400            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
401            BTreeSet::from_iter(vec![
402                authorities[0].pubkey_bytes(),
403                authorities[1].pubkey_bytes(),
404                authorities[3].pubkey_bytes()
405            ])
406        );
407    }
408
409    #[tokio::test]
410    async fn test_bridge_auth_agg_ok() {
411        telemetry_subscribers::init_for_testing();
412
413        let mock0 = BridgeRequestMockHandler::new();
414        let mock1 = BridgeRequestMockHandler::new();
415        let mock2 = BridgeRequestMockHandler::new();
416        let mock3 = BridgeRequestMockHandler::new();
417
418        // start servers
419        let (_handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
420            vec![2500, 2500, 2500, 2500],
421            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
422        );
423
424        let committee = BridgeCommittee::new(authorities).unwrap();
425
426        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
427
428        let sui_tx_digest = TransactionDigest::random();
429        let sui_tx_event_index = 0;
430        let nonce = 0;
431        let amount = 1000;
432        let action = get_test_sui_to_eth_bridge_action(
433            Some(sui_tx_digest),
434            Some(sui_tx_event_index),
435            Some(nonce),
436            Some(amount),
437            None,
438            None,
439            None,
440        );
441
442        // All authorities return signatures
443        mock0.add_sui_event_response(
444            sui_tx_digest,
445            sui_tx_event_index,
446            Ok(sign_action_with_key(&action, &secrets[0])),
447            None,
448        );
449        mock1.add_sui_event_response(
450            sui_tx_digest,
451            sui_tx_event_index,
452            Ok(sign_action_with_key(&action, &secrets[1])),
453            None,
454        );
455        mock2.add_sui_event_response(
456            sui_tx_digest,
457            sui_tx_event_index,
458            Ok(sign_action_with_key(&action, &secrets[2])),
459            None,
460        );
461        mock3.add_sui_event_response(
462            sui_tx_digest,
463            sui_tx_event_index,
464            Ok(sign_action_with_key(&action, &secrets[3])),
465            None,
466        );
467        agg.request_committee_signatures(action.clone())
468            .await
469            .unwrap();
470
471        // 1 out of 4 authorities returns error
472        mock3.add_sui_event_response(
473            sui_tx_digest,
474            sui_tx_event_index,
475            Err(BridgeError::RestAPIError("".into())),
476            None,
477        );
478        agg.request_committee_signatures(action.clone())
479            .await
480            .unwrap();
481
482        // 2 out of 4 authorities returns error
483        mock2.add_sui_event_response(
484            sui_tx_digest,
485            sui_tx_event_index,
486            Err(BridgeError::RestAPIError("".into())),
487            None,
488        );
489        agg.request_committee_signatures(action.clone())
490            .await
491            .unwrap();
492
493        // 3 out of 4 authorities returns error - good stake below valdiity threshold
494        mock1.add_sui_event_response(
495            sui_tx_digest,
496            sui_tx_event_index,
497            Err(BridgeError::RestAPIError("".into())),
498            None,
499        );
500        let err = agg
501            .request_committee_signatures(action.clone())
502            .await
503            .unwrap_err();
504        assert!(matches!(
505            err,
506            BridgeError::AuthoritySignatureAggregationTooManyError(_)
507        ));
508    }
509
510    #[tokio::test]
511    async fn test_bridge_auth_agg_optimized() {
512        telemetry_subscribers::init_for_testing();
513
514        let mock0 = BridgeRequestMockHandler::new();
515        let mock1 = BridgeRequestMockHandler::new();
516        let mock2 = BridgeRequestMockHandler::new();
517        let mock3 = BridgeRequestMockHandler::new();
518        let mock4 = BridgeRequestMockHandler::new();
519        let mock5 = BridgeRequestMockHandler::new();
520        let mock6 = BridgeRequestMockHandler::new();
521        let mock7 = BridgeRequestMockHandler::new();
522        let mock8 = BridgeRequestMockHandler::new();
523
524        // start servers - there is only one permutation of size 2 (1112, 2222) that will achieve quorum
525        let (_handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
526            vec![666, 1000, 900, 900, 900, 900, 900, 1612, 2222],
527            vec![
528                mock0.clone(),
529                mock1.clone(),
530                mock2.clone(),
531                mock3.clone(),
532                mock4.clone(),
533                mock5.clone(),
534                mock6.clone(),
535                mock7.clone(),
536                mock8.clone(),
537            ],
538        );
539
540        let authorities_clone = authorities.clone();
541        let committee = Arc::new(BridgeCommittee::new(authorities_clone).unwrap());
542
543        let agg = BridgeAuthorityAggregator::new_for_testing(committee.clone());
544
545        let sui_tx_digest = TransactionDigest::random();
546        let sui_tx_event_index = 0;
547        let nonce = 0;
548        let amount = 1000;
549        let action = get_test_sui_to_eth_bridge_action(
550            Some(sui_tx_digest),
551            Some(sui_tx_event_index),
552            Some(nonce),
553            Some(amount),
554            None,
555            None,
556            None,
557        );
558
559        // All authorities return signatures
560        mock0.add_sui_event_response(
561            sui_tx_digest,
562            sui_tx_event_index,
563            Ok(sign_action_with_key(&action, &secrets[0])),
564            Some(Duration::from_millis(200)),
565        );
566        mock1.add_sui_event_response(
567            sui_tx_digest,
568            sui_tx_event_index,
569            Ok(sign_action_with_key(&action, &secrets[1])),
570            Some(Duration::from_millis(200)),
571        );
572        mock2.add_sui_event_response(
573            sui_tx_digest,
574            sui_tx_event_index,
575            Ok(sign_action_with_key(&action, &secrets[2])),
576            Some(Duration::from_millis(700)),
577        );
578        mock3.add_sui_event_response(
579            sui_tx_digest,
580            sui_tx_event_index,
581            Ok(sign_action_with_key(&action, &secrets[3])),
582            Some(Duration::from_millis(700)),
583        );
584        mock4.add_sui_event_response(
585            sui_tx_digest,
586            sui_tx_event_index,
587            Ok(sign_action_with_key(&action, &secrets[4])),
588            Some(Duration::from_millis(700)),
589        );
590        mock5.add_sui_event_response(
591            sui_tx_digest,
592            sui_tx_event_index,
593            Ok(sign_action_with_key(&action, &secrets[5])),
594            Some(Duration::from_millis(700)),
595        );
596        mock6.add_sui_event_response(
597            sui_tx_digest,
598            sui_tx_event_index,
599            Ok(sign_action_with_key(&action, &secrets[6])),
600            Some(Duration::from_millis(700)),
601        );
602        mock7.add_sui_event_response(
603            sui_tx_digest,
604            sui_tx_event_index,
605            Ok(sign_action_with_key(&action, &secrets[7])),
606            Some(Duration::from_millis(900)),
607        );
608        mock8.add_sui_event_response(
609            sui_tx_digest,
610            sui_tx_event_index,
611            Ok(sign_action_with_key(&action, &secrets[8])),
612            Some(Duration::from_millis(1_500)),
613        );
614
615        // we should receive all signatures in time, but only aggregate 2 authorities
616        // to achieve quorum
617        let metrics = Arc::new(BridgeMetrics::new_for_testing());
618        let state = GetSigsState::new(
619            action.approval_threshold(),
620            committee.clone(),
621            metrics.clone(),
622            Arc::new(BTreeMap::new()),
623        );
624        let resp = request_sign_bridge_action_into_certification(
625            action.clone(),
626            agg.committee.clone(),
627            agg.clients.clone(),
628            state,
629            Duration::from_millis(2_000),
630        )
631        .await
632        .unwrap();
633        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
634        assert_eq!(sig_keys.len(), 2);
635        assert!(sig_keys.contains(&authorities[7].pubkey_bytes()));
636        assert!(sig_keys.contains(&authorities[8].pubkey_bytes()));
637
638        // we should receive all but the highest stake signatures in time, but still be able to
639        // achieve quorum with 3 sigs
640        let state = GetSigsState::new(
641            action.approval_threshold(),
642            committee.clone(),
643            metrics.clone(),
644            Arc::new(BTreeMap::new()),
645        );
646        let resp = request_sign_bridge_action_into_certification(
647            action.clone(),
648            agg.committee.clone(),
649            agg.clients.clone(),
650            state,
651            Duration::from_millis(1_200),
652        )
653        .await
654        .unwrap();
655        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
656        assert_eq!(sig_keys.len(), 3);
657        assert!(sig_keys.contains(&authorities[7].pubkey_bytes()));
658        // this should not have come in time
659        assert!(!sig_keys.contains(&authorities[8].pubkey_bytes()));
660
661        // we should have fallen back to arrival order given that we timeout before we reach quorum
662        let state = GetSigsState::new(
663            action.approval_threshold(),
664            committee.clone(),
665            metrics.clone(),
666            Arc::new(BTreeMap::new()),
667        );
668        let start = std::time::Instant::now();
669        let resp = request_sign_bridge_action_into_certification(
670            action.clone(),
671            agg.committee.clone(),
672            agg.clients.clone(),
673            state,
674            Duration::from_millis(500),
675        )
676        .await
677        .unwrap();
678        let elapsed = start.elapsed();
679        assert!(
680            elapsed >= Duration::from_millis(700),
681            "Expected to have to wait at least 700ms to fallback to arrival order and achieve quorum, but was {:?}",
682            elapsed
683        );
684        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
685        assert_eq!(sig_keys.len(), 4);
686        // These two do not make it on time initially, and then we should be able
687        // to achieve quorum before these ultimately arrive
688        assert!(!sig_keys.contains(&authorities[7].pubkey_bytes()));
689        assert!(!sig_keys.contains(&authorities[8].pubkey_bytes()));
690        // These were the first two to respond, and should be immediately
691        // included once we fallback to arrival order
692        assert!(sig_keys.contains(&authorities[0].pubkey_bytes()));
693        assert!(sig_keys.contains(&authorities[1].pubkey_bytes()));
694    }
695
696    #[tokio::test]
697    async fn test_bridge_auth_agg_more_cases() {
698        telemetry_subscribers::init_for_testing();
699
700        let mock0 = BridgeRequestMockHandler::new();
701        let mock1 = BridgeRequestMockHandler::new();
702        let mock2 = BridgeRequestMockHandler::new();
703        let mock3 = BridgeRequestMockHandler::new();
704
705        // start servers
706        let (_handles, mut authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
707            vec![2500, 2500, 2500, 2500],
708            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
709        );
710        // 0 and 1 are blocklisted
711        authorities[0].is_blocklisted = true;
712        authorities[1].is_blocklisted = true;
713
714        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
715
716        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
717
718        let sui_tx_digest = TransactionDigest::random();
719        let sui_tx_event_index = 0;
720        let nonce = 0;
721        let amount = 1000;
722        let action = get_test_sui_to_eth_bridge_action(
723            Some(sui_tx_digest),
724            Some(sui_tx_event_index),
725            Some(nonce),
726            Some(amount),
727            None,
728            None,
729            None,
730        );
731
732        // Only mock authority 2 and 3 to return signatures, such that if BridgeAuthorityAggregator
733        // requests to authority 0 and 1 (which should not happen) it will panic.
734        mock2.add_sui_event_response(
735            sui_tx_digest,
736            sui_tx_event_index,
737            Ok(sign_action_with_key(&action, &secrets[2])),
738            None,
739        );
740        mock3.add_sui_event_response(
741            sui_tx_digest,
742            sui_tx_event_index,
743            Ok(sign_action_with_key(&action, &secrets[3])),
744            None,
745        );
746        let certified = agg
747            .request_committee_signatures(action.clone())
748            .await
749            .unwrap();
750        let signers = certified
751            .auth_sig()
752            .signatures
753            .keys()
754            .cloned()
755            .collect::<BTreeSet<_>>();
756        assert_eq!(
757            signers,
758            BTreeSet::from_iter(vec![
759                authorities[2].pubkey_bytes(),
760                authorities[3].pubkey_bytes()
761            ])
762        );
763
764        // if mock 3 returns error, then it won't reach validity threshold
765        mock3.add_sui_event_response(
766            sui_tx_digest,
767            sui_tx_event_index,
768            Err(BridgeError::RestAPIError("".into())),
769            None,
770        );
771        let err = agg
772            .request_committee_signatures(action.clone())
773            .await
774            .unwrap_err();
775        assert!(matches!(
776            err,
777            BridgeError::AuthoritySignatureAggregationTooManyError(_)
778        ));
779
780        // if mock 3 returns duplicated signature (by authority 2), `BridgeClient` will catch this
781        mock3.add_sui_event_response(
782            sui_tx_digest,
783            sui_tx_event_index,
784            Ok(sign_action_with_key(&action, &secrets[2])),
785            None,
786        );
787        let err = agg
788            .request_committee_signatures(action.clone())
789            .await
790            .unwrap_err();
791        assert!(matches!(
792            err,
793            BridgeError::AuthoritySignatureAggregationTooManyError(_)
794        ));
795    }
796
797    #[test]
798    fn test_get_sigs_state() {
799        telemetry_subscribers::init_for_testing();
800
801        let mut authorities = vec![];
802        let mut secrets = vec![];
803        for _i in 0..4 {
804            let (authority, _, secret) = get_test_authority_and_key(2500, 12345);
805            authorities.push(authority);
806            secrets.push(secret);
807        }
808
809        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
810
811        let threshold = VALIDITY_THRESHOLD;
812        let metrics = Arc::new(BridgeMetrics::new_for_testing());
813        let mut state = GetSigsState::new(
814            threshold,
815            Arc::new(committee),
816            metrics.clone(),
817            Arc::new(BTreeMap::new()),
818        );
819
820        assert!(!state.is_too_many_error());
821        let dummy = authorities[0].pubkey_bytes();
822        // bad stake: 2500
823        state.add_bad_stake(2500, &dummy);
824        assert!(!state.is_too_many_error());
825
826        // bad stake ; 5000
827        state.add_bad_stake(2500, &dummy);
828        assert!(!state.is_too_many_error());
829
830        // bad stake : 6666
831        state.add_bad_stake(1666, &dummy);
832        assert!(!state.is_too_many_error());
833
834        // bad stake : 6667 - too many errors
835        state.add_bad_stake(1, &dummy);
836        assert!(state.is_too_many_error());
837
838        // Authority 0 is blocklisted, we lose 2500 stake
839        authorities[0].is_blocklisted = true;
840        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
841        let threshold = VALIDITY_THRESHOLD;
842        let metrics = Arc::new(BridgeMetrics::new_for_testing());
843        let mut state = GetSigsState::new(
844            threshold,
845            Arc::new(committee),
846            metrics.clone(),
847            Arc::new(BTreeMap::new()),
848        );
849
850        assert!(!state.is_too_many_error());
851
852        // bad stake: 2500 + 2500
853        state.add_bad_stake(2500, &dummy);
854        assert!(!state.is_too_many_error());
855
856        // bad stake: 5000 + 2500 - too many errors
857        state.add_bad_stake(2500, &dummy);
858        assert!(state.is_too_many_error());
859
860        // Below we test `handle_verified_signed_action`
861        authorities[0].is_blocklisted = false;
862        authorities[1].voting_power = 1; // set authority's voting power to minimal
863        authorities[2].voting_power = 4999;
864        authorities[3].is_blocklisted = true; // blocklist authority 3
865        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
866        let threshold = VALIDITY_THRESHOLD;
867        let mut state = GetSigsState::new(
868            threshold,
869            Arc::new(committee.clone()),
870            metrics.clone(),
871            Arc::new(BTreeMap::new()),
872        );
873
874        let sui_tx_digest = TransactionDigest::random();
875        let sui_tx_event_index = 0;
876        let nonce = 0;
877        let amount = 1000;
878        let action = get_test_sui_to_eth_bridge_action(
879            Some(sui_tx_digest),
880            Some(sui_tx_event_index),
881            Some(nonce),
882            Some(amount),
883            None,
884            None,
885            None,
886        );
887
888        let sig_0 = sign_action_with_key(&action, &secrets[0]);
889        // returns Ok(None)
890        assert!(
891            state
892                .handle_verified_signed_action(
893                    authorities[0].pubkey_bytes().clone(),
894                    authorities[0].voting_power,
895                    VerifiedSignedBridgeAction::new_from_verified(sig_0.clone())
896                )
897                .unwrap()
898                .is_none()
899        );
900        assert_eq!(state.total_ok_stake, 2500);
901
902        // Handling a sig from an already signed authority would fail
903        let new_sig_0 = sign_action_with_key(&action, &secrets[0]);
904        // returns Err(BridgeError::AuthoritySignatureDuplication)
905        let err = state
906            .handle_verified_signed_action(
907                authorities[0].pubkey_bytes().clone(),
908                authorities[0].voting_power,
909                VerifiedSignedBridgeAction::new_from_verified(new_sig_0.clone()),
910            )
911            .unwrap_err();
912        assert!(matches!(err, BridgeError::AuthoritySignatureDuplication(_)));
913        assert_eq!(state.total_ok_stake, 2500);
914
915        // Handling a sig from an authority not in committee would fail
916        let (unknown_authority, _, kp) = get_test_authority_and_key(2500, 12345);
917        let unknown_sig = sign_action_with_key(&action, &kp);
918        // returns Err(BridgeError::InvalidBridgeAuthority)
919        let err = state
920            .handle_verified_signed_action(
921                unknown_authority.pubkey_bytes().clone(),
922                authorities[0].voting_power,
923                VerifiedSignedBridgeAction::new_from_verified(unknown_sig.clone()),
924            )
925            .unwrap_err();
926        assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
927        assert_eq!(state.total_ok_stake, 2500);
928
929        // Handling a blocklisted authority would fail
930        let sig_3 = sign_action_with_key(&action, &secrets[3]);
931        // returns Err(BridgeError::InvalidBridgeAuthority)
932        let err = state
933            .handle_verified_signed_action(
934                authorities[3].pubkey_bytes().clone(),
935                authorities[3].voting_power,
936                VerifiedSignedBridgeAction::new_from_verified(sig_3.clone()),
937            )
938            .unwrap_err();
939        assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
940        assert_eq!(state.total_ok_stake, 2500);
941
942        // Collect signtuare from authority 1 (voting power = 1)
943        let sig_1 = sign_action_with_key(&action, &secrets[1]);
944        // returns Ok(None)
945        assert!(
946            state
947                .handle_verified_signed_action(
948                    authorities[1].pubkey_bytes().clone(),
949                    authorities[1].voting_power,
950                    VerifiedSignedBridgeAction::new_from_verified(sig_1.clone())
951                )
952                .unwrap()
953                .is_none()
954        );
955        assert_eq!(state.total_ok_stake, 2501);
956
957        // Collect signature from authority 2 - reach validity threshold
958        let sig_2 = sign_action_with_key(&action, &secrets[2]);
959        // returns Ok(None)
960        let certificate = state
961            .handle_verified_signed_action(
962                authorities[2].pubkey_bytes().clone(),
963                authorities[2].voting_power,
964                VerifiedSignedBridgeAction::new_from_verified(sig_2.clone()),
965            )
966            .unwrap()
967            .unwrap();
968        assert_eq!(state.total_ok_stake, 7500);
969
970        assert_eq!(certificate.data(), &action);
971        let signers = certificate
972            .auth_sig()
973            .signatures
974            .keys()
975            .cloned()
976            .collect::<BTreeSet<_>>();
977        assert_eq!(
978            signers,
979            BTreeSet::from_iter(vec![
980                authorities[0].pubkey_bytes(),
981                authorities[1].pubkey_bytes(),
982                authorities[2].pubkey_bytes()
983            ])
984        );
985
986        for (pubkey, sig) in &certificate.auth_sig().signatures {
987            let sign_info = BridgeAuthoritySignInfo {
988                authority_pub_key: BridgeAuthorityPublicKey::from_bytes(pubkey.as_ref()).unwrap(),
989                signature: sig.clone(),
990            };
991            assert!(sign_info.verify(&action, &committee).is_ok());
992        }
993    }
994}