sui_bridge/client/
bridge_authority_aggregator.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! BridgeAuthorityAggregator aggregates signatures from BridgeCommittee.
5
6use crate::client::bridge_client::BridgeClient;
7use crate::crypto::BridgeAuthorityPublicKeyBytes;
8use crate::crypto::BridgeAuthoritySignInfo;
9use crate::error::{BridgeError, BridgeResult};
10use crate::metrics::BridgeMetrics;
11use crate::types::BridgeCommitteeValiditySignInfo;
12use crate::types::{
13    BridgeAction, BridgeCommittee, CertifiedBridgeAction, VerifiedCertifiedBridgeAction,
14    VerifiedSignedBridgeAction,
15};
16use std::collections::BTreeMap;
17use std::collections::BTreeSet;
18use std::collections::btree_map::Entry;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_authority_aggregation::ReduceOutput;
22use sui_authority_aggregation::{SigRequestPrefs, quorum_map_then_reduce_with_timeout_and_prefs};
23use sui_types::base_types::ConciseableName;
24use sui_types::committee::StakeUnit;
25use sui_types::committee::TOTAL_VOTING_POWER;
26use tracing::{error, info, warn};
27
28const TOTAL_TIMEOUT_MS: u64 = 5_000;
29const PREFETCH_TIMEOUT_MS: u64 = 1_500;
30const RETRY_INTERVAL_MS: u64 = 500;
31
32pub struct BridgeAuthorityAggregator {
33    pub committee: Arc<BridgeCommittee>,
34    pub clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
35    pub metrics: Arc<BridgeMetrics>,
36    pub committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
37}
38
39impl BridgeAuthorityAggregator {
40    pub fn new(
41        committee: Arc<BridgeCommittee>,
42        metrics: Arc<BridgeMetrics>,
43        committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
44    ) -> Self {
45        let clients: BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>> = committee
46            .members()
47            .iter()
48            .filter_map(|(name, authority)| {
49                if authority.is_blocklisted {
50                    warn!("Ignored blocklisted authority {:?} (stake: {}) when creating BridgeAuthorityAggregator", name.concise(), authority.voting_power);
51                    return None;
52                }
53                // TODO: we could also record bad stakes here and use in signature aggregation
54                match BridgeClient::new(
55                    name.clone(),
56                    committee.clone(),
57                ) {
58                    Ok(client) => Some((name.clone(), Arc::new(client))),
59                    Err(e) => {
60                        error!(
61                            "Failed to create BridgeClient for {:?}: {:?}",
62                            name.concise(),
63                            e
64                        );
65                        None
66                    }
67                }
68            })
69            .collect::<BTreeMap<_, _>>();
70        Self {
71            committee,
72            clients: Arc::new(clients),
73            metrics,
74            committee_keys_to_names,
75        }
76    }
77
78    #[cfg(test)]
79    pub fn new_for_testing(committee: Arc<BridgeCommittee>) -> Self {
80        Self::new(
81            committee,
82            Arc::new(BridgeMetrics::new_for_testing()),
83            Arc::new(BTreeMap::new()),
84        )
85    }
86
87    pub async fn request_committee_signatures(
88        &self,
89        action: BridgeAction,
90    ) -> BridgeResult<VerifiedCertifiedBridgeAction> {
91        let state = GetSigsState::new(
92            action.approval_threshold(),
93            self.committee.clone(),
94            self.metrics.clone(),
95            self.committee_keys_to_names.clone(),
96        );
97        request_sign_bridge_action_into_certification(
98            action,
99            self.committee.clone(),
100            self.clients.clone(),
101            state,
102            Duration::from_millis(PREFETCH_TIMEOUT_MS),
103        )
104        .await
105    }
106}
107
108#[derive(Debug)]
109struct GetSigsState {
110    total_bad_stake: StakeUnit,
111    total_ok_stake: StakeUnit,
112    sigs: BTreeMap<BridgeAuthorityPublicKeyBytes, BridgeAuthoritySignInfo>,
113    validity_threshold: StakeUnit,
114    committee: Arc<BridgeCommittee>,
115    metrics: Arc<BridgeMetrics>,
116    committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
117}
118
119impl GetSigsState {
120    fn new(
121        validity_threshold: StakeUnit,
122        committee: Arc<BridgeCommittee>,
123        metrics: Arc<BridgeMetrics>,
124        committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
125    ) -> Self {
126        Self {
127            committee,
128            total_bad_stake: 0,
129            total_ok_stake: 0,
130            sigs: BTreeMap::new(),
131            validity_threshold,
132            metrics,
133            committee_keys_to_names,
134        }
135    }
136
137    fn handle_verified_signed_action(
138        &mut self,
139        name: BridgeAuthorityPublicKeyBytes,
140        stake: StakeUnit,
141        signed_action: VerifiedSignedBridgeAction,
142    ) -> BridgeResult<Option<VerifiedCertifiedBridgeAction>> {
143        info!("Got signatures from {}, stake: {}", name.concise(), stake);
144        if !self.committee.is_active_member(&name) {
145            return Err(BridgeError::InvalidBridgeAuthority(name));
146        }
147
148        // safeguard here to assert passed in stake matches the stake in committee
149        // unwrap safe: if name is an active member then it must be in committee set
150        assert_eq!(stake, self.committee.member(&name).unwrap().voting_power);
151
152        match self.sigs.entry(name.clone()) {
153            Entry::Vacant(e) => {
154                e.insert(signed_action.auth_sig().clone());
155                self.add_ok_stake(stake, &name);
156            }
157            Entry::Occupied(_e) => {
158                return Err(BridgeError::AuthoritySignatureDuplication(format!(
159                    "Got signatures for the same authority twice: {}",
160                    name.concise()
161                )));
162            }
163        }
164        if self.total_ok_stake >= self.validity_threshold {
165            info!(
166                "Got enough signatures from {} validators with total_ok_stake {}",
167                self.sigs.len(),
168                self.total_ok_stake
169            );
170            let signatures = self
171                .sigs
172                .iter()
173                .map(|(k, v)| (k.clone(), v.signature.clone()))
174                .collect::<BTreeMap<_, _>>();
175            let sig_info = BridgeCommitteeValiditySignInfo { signatures };
176            let certified_action: sui_types::message_envelope::Envelope<
177                BridgeAction,
178                BridgeCommitteeValiditySignInfo,
179            > = CertifiedBridgeAction::new_from_data_and_sig(
180                signed_action.into_inner().into_data(),
181                sig_info,
182            );
183            // `BridgeClient` already verified individual signatures
184            Ok(Some(VerifiedCertifiedBridgeAction::new_from_verified(
185                certified_action,
186            )))
187        } else {
188            Ok(None)
189        }
190    }
191
192    fn add_ok_stake(&mut self, ok_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
193        if let Some(host_name) = self.committee_keys_to_names.get(name) {
194            self.metrics
195                .auth_agg_ok_responses
196                .with_label_values(&[host_name])
197                .inc();
198        }
199        self.total_ok_stake += ok_stake;
200    }
201
202    fn add_bad_stake(&mut self, bad_stake: StakeUnit, name: &BridgeAuthorityPublicKeyBytes) {
203        if let Some(host_name) = self.committee_keys_to_names.get(name) {
204            self.metrics
205                .auth_agg_bad_responses
206                .with_label_values(&[host_name])
207                .inc();
208        }
209        self.total_bad_stake += bad_stake;
210    }
211
212    fn is_too_many_error(&self) -> bool {
213        TOTAL_VOTING_POWER - self.total_bad_stake - self.committee.total_blocklisted_stake()
214            < self.validity_threshold
215    }
216}
217
218async fn request_sign_bridge_action_into_certification(
219    action: BridgeAction,
220    committee: Arc<BridgeCommittee>,
221    clients: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, Arc<BridgeClient>>>,
222    state: GetSigsState,
223    prefetch_timeout: Duration,
224) -> BridgeResult<VerifiedCertifiedBridgeAction> {
225    // `preferences` is used as a trick here to influence the order of validators to be requested.
226    // * if `Some(_)`, then we will request validators in the order of the voting power.
227    // * if `None`, we still refer to voting power, but they are shuffled by randomness.
228    // Because ethereum gas price is not negligible, when the signatures are to be verified on ethereum,
229    // we pass in `Some` to make sure the validators with higher voting power are requested first
230    // to save gas cost.
231    let preference = match action {
232        BridgeAction::SuiToEthBridgeAction(_) => Some(SigRequestPrefs {
233            ordering_pref: BTreeSet::new(),
234            prefetch_timeout,
235        }),
236        BridgeAction::EthToSuiBridgeAction(_) => None,
237        _ => {
238            if action.chain_id().is_sui_chain() {
239                None
240            } else {
241                Some(SigRequestPrefs {
242                    ordering_pref: BTreeSet::new(),
243                    prefetch_timeout,
244                })
245            }
246        }
247    };
248    let (result, _) = quorum_map_then_reduce_with_timeout_and_prefs(
249        committee,
250        clients,
251        preference,
252        state,
253        |name, client| {
254            Box::pin(async move {
255                let start = std::time::Instant::now();
256                let timeout = Duration::from_millis(TOTAL_TIMEOUT_MS);
257                let retry_interval = Duration::from_millis(RETRY_INTERVAL_MS);
258                while start.elapsed() < timeout {
259                    match client.request_sign_bridge_action(action.clone()).await {
260                        Ok(result) => {
261                            return Ok(result);
262                        }
263                        // retryable errors
264                        Err(BridgeError::TxNotFinalized) => {
265                            warn!("Bridge authority {} observing transaction not yet finalized, retrying in {:?}", name.concise(), retry_interval);
266                            tokio::time::sleep(retry_interval).await;
267                        }
268                        // non-retryable errors
269                        Err(e) => {
270                            return Err(e);
271                        }
272                    }
273                }
274                Err(BridgeError::TransientProviderError(format!("Bridge authority {} did not observe finalized transaction after {:?}", name.concise(), timeout)))
275            })
276        },
277        |mut state, name, stake, result| {
278            Box::pin(async move {
279                match result {
280                    Ok(verified_signed_action) => {
281                        match state.handle_verified_signed_action(
282                            name.clone(),
283                            stake,
284                            verified_signed_action,
285                        ) {
286                            Ok(Some(certified_action)) => {
287                                return ReduceOutput::Success(certified_action)
288                            }
289                            Ok(None) => (),
290                            Err(e) => {
291                                error!(
292                                    "Failed to handle verified signed action from {}: {:?}",
293                                    name.concise(),
294                                    e
295                                );
296                                state.add_bad_stake(stake, &name);
297                            }
298                        }
299                    }
300                    Err(e) => {
301                        warn!(
302                            "Failed to get signature from {:?}. Error: {:?}",
303                            name.concise(),
304                            e
305                        );
306                        state.add_bad_stake(stake, &name);
307                    }
308                };
309
310                // If bad stake (including blocklisted stake) is too high to reach validity threshold, return error
311                if state.is_too_many_error() {
312                    ReduceOutput::Failed(state)
313                } else {
314                    ReduceOutput::Continue(state)
315                }
316            })
317        },
318        Duration::from_millis(TOTAL_TIMEOUT_MS),
319    )
320    .await
321    .map_err(|state| {
322        error!(
323            "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
324            state.total_bad_stake,
325            state.committee.total_blocklisted_stake(),
326            state.total_ok_stake,
327            state.validity_threshold,
328        );
329        BridgeError::AuthoritySignatureAggregationTooManyError(format!(
330            "Failed to get enough signatures, bad stake: {}, blocklisted stake: {}, good stake: {}, validity threshold: {}",
331            state.total_bad_stake,
332            state.committee.total_blocklisted_stake(),
333            state.total_ok_stake,
334            state.validity_threshold,
335        ))
336    })?;
337    Ok(result)
338}
339
340#[cfg(test)]
341mod tests {
342    use std::collections::BTreeSet;
343
344    use fastcrypto::traits::ToFromBytes;
345    use sui_types::committee::VALIDITY_THRESHOLD;
346    use sui_types::digests::TransactionDigest;
347
348    use crate::crypto::BridgeAuthorityPublicKey;
349    use crate::server::mock_handler::BridgeRequestMockHandler;
350
351    use super::*;
352    use crate::test_utils::{
353        get_test_authorities_and_run_mock_bridge_server, get_test_authority_and_key,
354        get_test_sui_to_eth_bridge_action, sign_action_with_key,
355    };
356    use crate::types::BridgeCommittee;
357
358    #[tokio::test]
359    async fn test_bridge_auth_agg_construction() {
360        telemetry_subscribers::init_for_testing();
361
362        let mut authorities = vec![];
363        for _i in 0..4 {
364            let (authority, _, _) = get_test_authority_and_key(2500, 12345);
365            authorities.push(authority);
366        }
367        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
368
369        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
370        assert_eq!(
371            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
372            BTreeSet::from_iter(vec![
373                authorities[0].pubkey_bytes(),
374                authorities[1].pubkey_bytes(),
375                authorities[2].pubkey_bytes(),
376                authorities[3].pubkey_bytes()
377            ])
378        );
379
380        // authority 2 is blocklisted
381        authorities[2].is_blocklisted = true;
382        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
383        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
384        assert_eq!(
385            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
386            BTreeSet::from_iter(vec![
387                authorities[0].pubkey_bytes(),
388                authorities[1].pubkey_bytes(),
389                authorities[3].pubkey_bytes()
390            ])
391        );
392
393        // authority 3 has bad url
394        authorities[3].base_url = "".into();
395        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
396        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
397        assert_eq!(
398            agg.clients.keys().cloned().collect::<BTreeSet<_>>(),
399            BTreeSet::from_iter(vec![
400                authorities[0].pubkey_bytes(),
401                authorities[1].pubkey_bytes(),
402                authorities[3].pubkey_bytes()
403            ])
404        );
405    }
406
407    #[tokio::test]
408    async fn test_bridge_auth_agg_ok() {
409        telemetry_subscribers::init_for_testing();
410
411        let mock0 = BridgeRequestMockHandler::new();
412        let mock1 = BridgeRequestMockHandler::new();
413        let mock2 = BridgeRequestMockHandler::new();
414        let mock3 = BridgeRequestMockHandler::new();
415
416        // start servers
417        let (_handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
418            vec![2500, 2500, 2500, 2500],
419            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
420        );
421
422        let committee = BridgeCommittee::new(authorities).unwrap();
423
424        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
425
426        let sui_tx_digest = TransactionDigest::random();
427        let sui_tx_event_index = 0;
428        let nonce = 0;
429        let amount = 1000;
430        let action = get_test_sui_to_eth_bridge_action(
431            Some(sui_tx_digest),
432            Some(sui_tx_event_index),
433            Some(nonce),
434            Some(amount),
435            None,
436            None,
437            None,
438        );
439
440        // All authorities return signatures
441        mock0.add_sui_event_response(
442            sui_tx_digest,
443            sui_tx_event_index,
444            Ok(sign_action_with_key(&action, &secrets[0])),
445            None,
446        );
447        mock1.add_sui_event_response(
448            sui_tx_digest,
449            sui_tx_event_index,
450            Ok(sign_action_with_key(&action, &secrets[1])),
451            None,
452        );
453        mock2.add_sui_event_response(
454            sui_tx_digest,
455            sui_tx_event_index,
456            Ok(sign_action_with_key(&action, &secrets[2])),
457            None,
458        );
459        mock3.add_sui_event_response(
460            sui_tx_digest,
461            sui_tx_event_index,
462            Ok(sign_action_with_key(&action, &secrets[3])),
463            None,
464        );
465        agg.request_committee_signatures(action.clone())
466            .await
467            .unwrap();
468
469        // 1 out of 4 authorities returns error
470        mock3.add_sui_event_response(
471            sui_tx_digest,
472            sui_tx_event_index,
473            Err(BridgeError::RestAPIError("".into())),
474            None,
475        );
476        agg.request_committee_signatures(action.clone())
477            .await
478            .unwrap();
479
480        // 2 out of 4 authorities returns error
481        mock2.add_sui_event_response(
482            sui_tx_digest,
483            sui_tx_event_index,
484            Err(BridgeError::RestAPIError("".into())),
485            None,
486        );
487        agg.request_committee_signatures(action.clone())
488            .await
489            .unwrap();
490
491        // 3 out of 4 authorities returns error - good stake below valdiity threshold
492        mock1.add_sui_event_response(
493            sui_tx_digest,
494            sui_tx_event_index,
495            Err(BridgeError::RestAPIError("".into())),
496            None,
497        );
498        let err = agg
499            .request_committee_signatures(action.clone())
500            .await
501            .unwrap_err();
502        assert!(matches!(
503            err,
504            BridgeError::AuthoritySignatureAggregationTooManyError(_)
505        ));
506    }
507
508    #[tokio::test]
509    async fn test_bridge_auth_agg_optimized() {
510        telemetry_subscribers::init_for_testing();
511
512        let mock0 = BridgeRequestMockHandler::new();
513        let mock1 = BridgeRequestMockHandler::new();
514        let mock2 = BridgeRequestMockHandler::new();
515        let mock3 = BridgeRequestMockHandler::new();
516        let mock4 = BridgeRequestMockHandler::new();
517        let mock5 = BridgeRequestMockHandler::new();
518        let mock6 = BridgeRequestMockHandler::new();
519        let mock7 = BridgeRequestMockHandler::new();
520        let mock8 = BridgeRequestMockHandler::new();
521
522        // start servers - there is only one permutation of size 2 (1112, 2222) that will achieve quorum
523        let (_handles, authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
524            vec![666, 1000, 900, 900, 900, 900, 900, 1612, 2222],
525            vec![
526                mock0.clone(),
527                mock1.clone(),
528                mock2.clone(),
529                mock3.clone(),
530                mock4.clone(),
531                mock5.clone(),
532                mock6.clone(),
533                mock7.clone(),
534                mock8.clone(),
535            ],
536        );
537
538        let authorities_clone = authorities.clone();
539        let committee = Arc::new(BridgeCommittee::new(authorities_clone).unwrap());
540
541        let agg = BridgeAuthorityAggregator::new_for_testing(committee.clone());
542
543        let sui_tx_digest = TransactionDigest::random();
544        let sui_tx_event_index = 0;
545        let nonce = 0;
546        let amount = 1000;
547        let action = get_test_sui_to_eth_bridge_action(
548            Some(sui_tx_digest),
549            Some(sui_tx_event_index),
550            Some(nonce),
551            Some(amount),
552            None,
553            None,
554            None,
555        );
556
557        // All authorities return signatures
558        mock0.add_sui_event_response(
559            sui_tx_digest,
560            sui_tx_event_index,
561            Ok(sign_action_with_key(&action, &secrets[0])),
562            Some(Duration::from_millis(200)),
563        );
564        mock1.add_sui_event_response(
565            sui_tx_digest,
566            sui_tx_event_index,
567            Ok(sign_action_with_key(&action, &secrets[1])),
568            Some(Duration::from_millis(200)),
569        );
570        mock2.add_sui_event_response(
571            sui_tx_digest,
572            sui_tx_event_index,
573            Ok(sign_action_with_key(&action, &secrets[2])),
574            Some(Duration::from_millis(700)),
575        );
576        mock3.add_sui_event_response(
577            sui_tx_digest,
578            sui_tx_event_index,
579            Ok(sign_action_with_key(&action, &secrets[3])),
580            Some(Duration::from_millis(700)),
581        );
582        mock4.add_sui_event_response(
583            sui_tx_digest,
584            sui_tx_event_index,
585            Ok(sign_action_with_key(&action, &secrets[4])),
586            Some(Duration::from_millis(700)),
587        );
588        mock5.add_sui_event_response(
589            sui_tx_digest,
590            sui_tx_event_index,
591            Ok(sign_action_with_key(&action, &secrets[5])),
592            Some(Duration::from_millis(700)),
593        );
594        mock6.add_sui_event_response(
595            sui_tx_digest,
596            sui_tx_event_index,
597            Ok(sign_action_with_key(&action, &secrets[6])),
598            Some(Duration::from_millis(700)),
599        );
600        mock7.add_sui_event_response(
601            sui_tx_digest,
602            sui_tx_event_index,
603            Ok(sign_action_with_key(&action, &secrets[7])),
604            Some(Duration::from_millis(900)),
605        );
606        mock8.add_sui_event_response(
607            sui_tx_digest,
608            sui_tx_event_index,
609            Ok(sign_action_with_key(&action, &secrets[8])),
610            Some(Duration::from_millis(1_500)),
611        );
612
613        // we should receive all signatures in time, but only aggregate 2 authorities
614        // to achieve quorum
615        let metrics = Arc::new(BridgeMetrics::new_for_testing());
616        let state = GetSigsState::new(
617            action.approval_threshold(),
618            committee.clone(),
619            metrics.clone(),
620            Arc::new(BTreeMap::new()),
621        );
622        let resp = request_sign_bridge_action_into_certification(
623            action.clone(),
624            agg.committee.clone(),
625            agg.clients.clone(),
626            state,
627            Duration::from_millis(2_000),
628        )
629        .await
630        .unwrap();
631        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
632        assert_eq!(sig_keys.len(), 2);
633        assert!(sig_keys.contains(&authorities[7].pubkey_bytes()));
634        assert!(sig_keys.contains(&authorities[8].pubkey_bytes()));
635
636        // we should receive all but the highest stake signatures in time, but still be able to
637        // achieve quorum with 3 sigs
638        let state = GetSigsState::new(
639            action.approval_threshold(),
640            committee.clone(),
641            metrics.clone(),
642            Arc::new(BTreeMap::new()),
643        );
644        let resp = request_sign_bridge_action_into_certification(
645            action.clone(),
646            agg.committee.clone(),
647            agg.clients.clone(),
648            state,
649            Duration::from_millis(1_200),
650        )
651        .await
652        .unwrap();
653        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
654        assert_eq!(sig_keys.len(), 3);
655        assert!(sig_keys.contains(&authorities[7].pubkey_bytes()));
656        // this should not have come in time
657        assert!(!sig_keys.contains(&authorities[8].pubkey_bytes()));
658
659        // we should have fallen back to arrival order given that we timeout before we reach quorum
660        let state = GetSigsState::new(
661            action.approval_threshold(),
662            committee.clone(),
663            metrics.clone(),
664            Arc::new(BTreeMap::new()),
665        );
666        let start = std::time::Instant::now();
667        let resp = request_sign_bridge_action_into_certification(
668            action.clone(),
669            agg.committee.clone(),
670            agg.clients.clone(),
671            state,
672            Duration::from_millis(500),
673        )
674        .await
675        .unwrap();
676        let elapsed = start.elapsed();
677        assert!(
678            elapsed >= Duration::from_millis(700),
679            "Expected to have to wait at least 700ms to fallback to arrival order and achieve quorum, but was {:?}",
680            elapsed
681        );
682        let sig_keys = resp.auth_sig().signatures.keys().collect::<BTreeSet<_>>();
683        assert_eq!(sig_keys.len(), 4);
684        // These two do not make it on time initially, and then we should be able
685        // to achieve quorum before these ultimately arrive
686        assert!(!sig_keys.contains(&authorities[7].pubkey_bytes()));
687        assert!(!sig_keys.contains(&authorities[8].pubkey_bytes()));
688        // These were the first two to respond, and should be immediately
689        // included once we fallback to arrival order
690        assert!(sig_keys.contains(&authorities[0].pubkey_bytes()));
691        assert!(sig_keys.contains(&authorities[1].pubkey_bytes()));
692    }
693
694    #[tokio::test]
695    async fn test_bridge_auth_agg_more_cases() {
696        telemetry_subscribers::init_for_testing();
697
698        let mock0 = BridgeRequestMockHandler::new();
699        let mock1 = BridgeRequestMockHandler::new();
700        let mock2 = BridgeRequestMockHandler::new();
701        let mock3 = BridgeRequestMockHandler::new();
702
703        // start servers
704        let (_handles, mut authorities, secrets) = get_test_authorities_and_run_mock_bridge_server(
705            vec![2500, 2500, 2500, 2500],
706            vec![mock0.clone(), mock1.clone(), mock2.clone(), mock3.clone()],
707        );
708        // 0 and 1 are blocklisted
709        authorities[0].is_blocklisted = true;
710        authorities[1].is_blocklisted = true;
711
712        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
713
714        let agg = BridgeAuthorityAggregator::new_for_testing(Arc::new(committee));
715
716        let sui_tx_digest = TransactionDigest::random();
717        let sui_tx_event_index = 0;
718        let nonce = 0;
719        let amount = 1000;
720        let action = get_test_sui_to_eth_bridge_action(
721            Some(sui_tx_digest),
722            Some(sui_tx_event_index),
723            Some(nonce),
724            Some(amount),
725            None,
726            None,
727            None,
728        );
729
730        // Only mock authority 2 and 3 to return signatures, such that if BridgeAuthorityAggregator
731        // requests to authority 0 and 1 (which should not happen) it will panic.
732        mock2.add_sui_event_response(
733            sui_tx_digest,
734            sui_tx_event_index,
735            Ok(sign_action_with_key(&action, &secrets[2])),
736            None,
737        );
738        mock3.add_sui_event_response(
739            sui_tx_digest,
740            sui_tx_event_index,
741            Ok(sign_action_with_key(&action, &secrets[3])),
742            None,
743        );
744        let certified = agg
745            .request_committee_signatures(action.clone())
746            .await
747            .unwrap();
748        let signers = certified
749            .auth_sig()
750            .signatures
751            .keys()
752            .cloned()
753            .collect::<BTreeSet<_>>();
754        assert_eq!(
755            signers,
756            BTreeSet::from_iter(vec![
757                authorities[2].pubkey_bytes(),
758                authorities[3].pubkey_bytes()
759            ])
760        );
761
762        // if mock 3 returns error, then it won't reach validity threshold
763        mock3.add_sui_event_response(
764            sui_tx_digest,
765            sui_tx_event_index,
766            Err(BridgeError::RestAPIError("".into())),
767            None,
768        );
769        let err = agg
770            .request_committee_signatures(action.clone())
771            .await
772            .unwrap_err();
773        assert!(matches!(
774            err,
775            BridgeError::AuthoritySignatureAggregationTooManyError(_)
776        ));
777
778        // if mock 3 returns duplicated signature (by authority 2), `BridgeClient` will catch this
779        mock3.add_sui_event_response(
780            sui_tx_digest,
781            sui_tx_event_index,
782            Ok(sign_action_with_key(&action, &secrets[2])),
783            None,
784        );
785        let err = agg
786            .request_committee_signatures(action.clone())
787            .await
788            .unwrap_err();
789        assert!(matches!(
790            err,
791            BridgeError::AuthoritySignatureAggregationTooManyError(_)
792        ));
793    }
794
795    #[test]
796    fn test_get_sigs_state() {
797        telemetry_subscribers::init_for_testing();
798
799        let mut authorities = vec![];
800        let mut secrets = vec![];
801        for _i in 0..4 {
802            let (authority, _, secret) = get_test_authority_and_key(2500, 12345);
803            authorities.push(authority);
804            secrets.push(secret);
805        }
806
807        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
808
809        let threshold = VALIDITY_THRESHOLD;
810        let metrics = Arc::new(BridgeMetrics::new_for_testing());
811        let mut state = GetSigsState::new(
812            threshold,
813            Arc::new(committee),
814            metrics.clone(),
815            Arc::new(BTreeMap::new()),
816        );
817
818        assert!(!state.is_too_many_error());
819        let dummy = authorities[0].pubkey_bytes();
820        // bad stake: 2500
821        state.add_bad_stake(2500, &dummy);
822        assert!(!state.is_too_many_error());
823
824        // bad stake ; 5000
825        state.add_bad_stake(2500, &dummy);
826        assert!(!state.is_too_many_error());
827
828        // bad stake : 6666
829        state.add_bad_stake(1666, &dummy);
830        assert!(!state.is_too_many_error());
831
832        // bad stake : 6667 - too many errors
833        state.add_bad_stake(1, &dummy);
834        assert!(state.is_too_many_error());
835
836        // Authority 0 is blocklisted, we lose 2500 stake
837        authorities[0].is_blocklisted = true;
838        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
839        let threshold = VALIDITY_THRESHOLD;
840        let metrics = Arc::new(BridgeMetrics::new_for_testing());
841        let mut state = GetSigsState::new(
842            threshold,
843            Arc::new(committee),
844            metrics.clone(),
845            Arc::new(BTreeMap::new()),
846        );
847
848        assert!(!state.is_too_many_error());
849
850        // bad stake: 2500 + 2500
851        state.add_bad_stake(2500, &dummy);
852        assert!(!state.is_too_many_error());
853
854        // bad stake: 5000 + 2500 - too many errors
855        state.add_bad_stake(2500, &dummy);
856        assert!(state.is_too_many_error());
857
858        // Below we test `handle_verified_signed_action`
859        authorities[0].is_blocklisted = false;
860        authorities[1].voting_power = 1; // set authority's voting power to minimal
861        authorities[2].voting_power = 4999;
862        authorities[3].is_blocklisted = true; // blocklist authority 3
863        let committee = BridgeCommittee::new(authorities.clone()).unwrap();
864        let threshold = VALIDITY_THRESHOLD;
865        let mut state = GetSigsState::new(
866            threshold,
867            Arc::new(committee.clone()),
868            metrics.clone(),
869            Arc::new(BTreeMap::new()),
870        );
871
872        let sui_tx_digest = TransactionDigest::random();
873        let sui_tx_event_index = 0;
874        let nonce = 0;
875        let amount = 1000;
876        let action = get_test_sui_to_eth_bridge_action(
877            Some(sui_tx_digest),
878            Some(sui_tx_event_index),
879            Some(nonce),
880            Some(amount),
881            None,
882            None,
883            None,
884        );
885
886        let sig_0 = sign_action_with_key(&action, &secrets[0]);
887        // returns Ok(None)
888        assert!(
889            state
890                .handle_verified_signed_action(
891                    authorities[0].pubkey_bytes().clone(),
892                    authorities[0].voting_power,
893                    VerifiedSignedBridgeAction::new_from_verified(sig_0.clone())
894                )
895                .unwrap()
896                .is_none()
897        );
898        assert_eq!(state.total_ok_stake, 2500);
899
900        // Handling a sig from an already signed authority would fail
901        let new_sig_0 = sign_action_with_key(&action, &secrets[0]);
902        // returns Err(BridgeError::AuthoritySignatureDuplication)
903        let err = state
904            .handle_verified_signed_action(
905                authorities[0].pubkey_bytes().clone(),
906                authorities[0].voting_power,
907                VerifiedSignedBridgeAction::new_from_verified(new_sig_0.clone()),
908            )
909            .unwrap_err();
910        assert!(matches!(err, BridgeError::AuthoritySignatureDuplication(_)));
911        assert_eq!(state.total_ok_stake, 2500);
912
913        // Handling a sig from an authority not in committee would fail
914        let (unknown_authority, _, kp) = get_test_authority_and_key(2500, 12345);
915        let unknown_sig = sign_action_with_key(&action, &kp);
916        // returns Err(BridgeError::InvalidBridgeAuthority)
917        let err = state
918            .handle_verified_signed_action(
919                unknown_authority.pubkey_bytes().clone(),
920                authorities[0].voting_power,
921                VerifiedSignedBridgeAction::new_from_verified(unknown_sig.clone()),
922            )
923            .unwrap_err();
924        assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
925        assert_eq!(state.total_ok_stake, 2500);
926
927        // Handling a blocklisted authority would fail
928        let sig_3 = sign_action_with_key(&action, &secrets[3]);
929        // returns Err(BridgeError::InvalidBridgeAuthority)
930        let err = state
931            .handle_verified_signed_action(
932                authorities[3].pubkey_bytes().clone(),
933                authorities[3].voting_power,
934                VerifiedSignedBridgeAction::new_from_verified(sig_3.clone()),
935            )
936            .unwrap_err();
937        assert!(matches!(err, BridgeError::InvalidBridgeAuthority(_)));
938        assert_eq!(state.total_ok_stake, 2500);
939
940        // Collect signtuare from authority 1 (voting power = 1)
941        let sig_1 = sign_action_with_key(&action, &secrets[1]);
942        // returns Ok(None)
943        assert!(
944            state
945                .handle_verified_signed_action(
946                    authorities[1].pubkey_bytes().clone(),
947                    authorities[1].voting_power,
948                    VerifiedSignedBridgeAction::new_from_verified(sig_1.clone())
949                )
950                .unwrap()
951                .is_none()
952        );
953        assert_eq!(state.total_ok_stake, 2501);
954
955        // Collect signature from authority 2 - reach validity threshold
956        let sig_2 = sign_action_with_key(&action, &secrets[2]);
957        // returns Ok(None)
958        let certificate = state
959            .handle_verified_signed_action(
960                authorities[2].pubkey_bytes().clone(),
961                authorities[2].voting_power,
962                VerifiedSignedBridgeAction::new_from_verified(sig_2.clone()),
963            )
964            .unwrap()
965            .unwrap();
966        assert_eq!(state.total_ok_stake, 7500);
967
968        assert_eq!(certificate.data(), &action);
969        let signers = certificate
970            .auth_sig()
971            .signatures
972            .keys()
973            .cloned()
974            .collect::<BTreeSet<_>>();
975        assert_eq!(
976            signers,
977            BTreeSet::from_iter(vec![
978                authorities[0].pubkey_bytes(),
979                authorities[1].pubkey_bytes(),
980                authorities[2].pubkey_bytes()
981            ])
982        );
983
984        for (pubkey, sig) in &certificate.auth_sig().signatures {
985            let sign_info = BridgeAuthoritySignInfo {
986                authority_pub_key: BridgeAuthorityPublicKey::from_bytes(pubkey.as_ref()).unwrap(),
987                signature: sig.clone(),
988            };
989            assert!(sign_info.verify(&action, &committee).is_ok());
990        }
991    }
992}