1#![allow(clippy::type_complexity)]
5
6use crate::crypto::{BridgeAuthorityKeyPair, BridgeAuthoritySignInfo};
7use crate::error::{BridgeError, BridgeResult};
8use crate::eth_client::EthClient;
9use crate::metrics::BridgeMetrics;
10use crate::sui_client::{SuiClient, SuiClientInner};
11use crate::types::{BridgeAction, SignedBridgeAction};
12use async_trait::async_trait;
13use axum::Json;
14use ethers::providers::JsonRpcClient;
15use ethers::types::TxHash;
16use lru::LruCache;
17use std::num::NonZeroUsize;
18use std::str::FromStr;
19use std::sync::Arc;
20use sui_types::digests::TransactionDigest;
21use tap::TapFallible;
22use tokio::sync::{Mutex, oneshot};
23use tracing::info;
24
25use super::governance_verifier::GovernanceVerifier;
26
27#[async_trait]
28pub trait BridgeRequestHandlerTrait {
29 async fn handle_eth_tx_hash(
33 &self,
34 tx_hash_hex: String,
35 event_idx: u16,
36 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
37 async fn handle_sui_tx_digest(
41 &self,
42 tx_digest_base58: String,
43 event_idx: u16,
44 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
45
46 async fn handle_governance_action(
48 &self,
49 action: BridgeAction,
50 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
51}
52
53#[async_trait::async_trait]
54pub trait ActionVerifier<K>: Send + Sync {
55 fn name(&self) -> &'static str;
57 async fn verify(&self, key: K) -> BridgeResult<BridgeAction>;
58}
59
60struct SuiActionVerifier<C> {
61 sui_client: Arc<SuiClient<C>>,
62}
63
64struct EthActionVerifier<P> {
65 eth_client: Arc<EthClient<P>>,
66}
67
68#[async_trait::async_trait]
69impl<C> ActionVerifier<(TransactionDigest, u16)> for SuiActionVerifier<C>
70where
71 C: SuiClientInner + Send + Sync + 'static,
72{
73 fn name(&self) -> &'static str {
74 "SuiActionVerifier"
75 }
76
77 async fn verify(&self, key: (TransactionDigest, u16)) -> BridgeResult<BridgeAction> {
78 let (tx_digest, event_idx) = key;
79 self.sui_client
80 .get_bridge_action_by_tx_digest_and_event_idx_maybe(&tx_digest, event_idx)
81 .await
82 .tap_ok(|action| info!("Sui action found: {:?}", action))
83 }
84}
85
86#[async_trait::async_trait]
87impl<C> ActionVerifier<(TxHash, u16)> for EthActionVerifier<C>
88where
89 C: JsonRpcClient + Send + Sync + 'static,
90{
91 fn name(&self) -> &'static str {
92 "EthActionVerifier"
93 }
94
95 async fn verify(&self, key: (TxHash, u16)) -> BridgeResult<BridgeAction> {
96 let (tx_hash, event_idx) = key;
97 self.eth_client
98 .get_finalized_bridge_action_maybe(tx_hash, event_idx)
99 .await
100 .tap_ok(|action| info!("Eth action found: {:?}", action))
101 }
102}
103
104struct SignerWithCache<K> {
105 signer: Arc<BridgeAuthorityKeyPair>,
106 verifier: Arc<dyn ActionVerifier<K>>,
107 mutex: Arc<Mutex<()>>,
108 cache: LruCache<K, Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>>>,
109 metrics: Arc<BridgeMetrics>,
110}
111
112impl<K> SignerWithCache<K>
113where
114 K: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
115{
116 fn new(
117 signer: Arc<BridgeAuthorityKeyPair>,
118 verifier: impl ActionVerifier<K> + 'static,
119 metrics: Arc<BridgeMetrics>,
120 ) -> Self {
121 Self {
122 signer,
123 verifier: Arc::new(verifier),
124 mutex: Arc::new(Mutex::new(())),
125 cache: LruCache::new(NonZeroUsize::new(1000).unwrap()),
126 metrics,
127 }
128 }
129
130 fn spawn(
131 mut self,
132 mut rx: mysten_metrics::metered_channel::Receiver<(
133 K,
134 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
135 )>,
136 ) -> tokio::task::JoinHandle<()> {
137 tokio::spawn(async move {
138 loop {
139 let (key, tx) = rx
140 .recv()
141 .await
142 .unwrap_or_else(|| panic!("Server signer's channel is closed"));
143 let result = self.sign(key).await;
144 let _ = tx.send(result);
147 }
148 })
149 }
150
151 async fn get_cache_entry(
152 &mut self,
153 key: K,
154 ) -> Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>> {
155 let _ = self.mutex.lock().await;
157 self.cache
158 .get_or_insert(key, || Arc::new(Mutex::new(None)))
159 .clone()
160 }
161
162 async fn sign(&mut self, key: K) -> BridgeResult<SignedBridgeAction> {
163 let signer = self.signer.clone();
164 let verifier = self.verifier.clone();
165 let verifier_name = verifier.name();
166 let entry = self.get_cache_entry(key.clone()).await;
167 let mut guard = entry.lock().await;
168 if let Some(result) = &*guard {
169 self.metrics
170 .signer_with_cache_hit
171 .with_label_values(&[verifier_name])
172 .inc();
173 return result.clone();
174 }
175 self.metrics
176 .signer_with_cache_miss
177 .with_label_values(&[verifier_name])
178 .inc();
179 match verifier.verify(key.clone()).await {
180 Ok(bridge_action) => {
181 let sig = BridgeAuthoritySignInfo::new(&bridge_action, &signer);
182 let result = SignedBridgeAction::new_from_data_and_sig(bridge_action, sig);
183 *guard = Some(Ok(result.clone()));
185 Ok(result)
186 }
187 Err(e) => {
188 match e {
189 BridgeError::GovernanceActionIsNotApproved
191 | BridgeError::ActionIsNotGovernanceAction(..)
192 | BridgeError::BridgeEventInUnrecognizedSuiPackage
193 | BridgeError::BridgeEventInUnrecognizedEthContract
194 | BridgeError::BridgeEventNotActionable
195 | BridgeError::NoBridgeEventsInTxPosition => {
196 *guard = Some(Err(e.clone()));
197 }
198 _ => (),
199 }
200 Err(e)
201 }
202 }
203 }
204
205 #[cfg(test)]
206 async fn get_testing_only(
207 &mut self,
208 key: K,
209 ) -> Option<&Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>>> {
210 let _ = self.mutex.lock().await;
211 self.cache.get(&key)
212 }
213}
214
215pub struct BridgeRequestHandler {
216 sui_signer_tx: mysten_metrics::metered_channel::Sender<(
217 (TransactionDigest, u16),
218 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
219 )>,
220 eth_signer_tx: mysten_metrics::metered_channel::Sender<(
221 (TxHash, u16),
222 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
223 )>,
224 governance_signer_tx: mysten_metrics::metered_channel::Sender<(
225 BridgeAction,
226 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
227 )>,
228}
229
230impl BridgeRequestHandler {
231 pub fn new<
232 SC: SuiClientInner + Send + Sync + 'static,
233 EP: JsonRpcClient + Send + Sync + 'static,
234 >(
235 signer: BridgeAuthorityKeyPair,
236 sui_client: Arc<SuiClient<SC>>,
237 eth_client: Arc<EthClient<EP>>,
238 approved_governance_actions: Vec<BridgeAction>,
239 metrics: Arc<BridgeMetrics>,
240 ) -> Self {
241 let (sui_signer_tx, sui_rx) = mysten_metrics::metered_channel::channel(
242 1000,
243 &mysten_metrics::get_metrics()
244 .unwrap()
245 .channel_inflight
246 .with_label_values(&["server_sui_action_signing_queue"]),
247 );
248 let (eth_signer_tx, eth_rx) = mysten_metrics::metered_channel::channel(
249 1000,
250 &mysten_metrics::get_metrics()
251 .unwrap()
252 .channel_inflight
253 .with_label_values(&["server_eth_action_signing_queue"]),
254 );
255 let (governance_signer_tx, governance_rx) = mysten_metrics::metered_channel::channel(
256 1000,
257 &mysten_metrics::get_metrics()
258 .unwrap()
259 .channel_inflight
260 .with_label_values(&["server_governance_action_signing_queue"]),
261 );
262 let signer = Arc::new(signer);
263
264 SignerWithCache::new(
265 signer.clone(),
266 SuiActionVerifier { sui_client },
267 metrics.clone(),
268 )
269 .spawn(sui_rx);
270 SignerWithCache::new(
271 signer.clone(),
272 EthActionVerifier { eth_client },
273 metrics.clone(),
274 )
275 .spawn(eth_rx);
276 SignerWithCache::new(
277 signer.clone(),
278 GovernanceVerifier::new(approved_governance_actions).unwrap(),
279 metrics.clone(),
280 )
281 .spawn(governance_rx);
282
283 Self {
284 sui_signer_tx,
285 eth_signer_tx,
286 governance_signer_tx,
287 }
288 }
289}
290
291#[async_trait]
292impl BridgeRequestHandlerTrait for BridgeRequestHandler {
293 async fn handle_eth_tx_hash(
294 &self,
295 tx_hash_hex: String,
296 event_idx: u16,
297 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
298 let tx_hash = TxHash::from_str(&tx_hash_hex).map_err(|_| BridgeError::InvalidTxHash)?;
299
300 let (tx, rx) = oneshot::channel();
301 self.eth_signer_tx
302 .send(((tx_hash, event_idx), tx))
303 .await
304 .unwrap_or_else(|_| panic!("Server eth signing channel is closed"));
305 let signed_action = rx
306 .await
307 .unwrap_or_else(|_| panic!("Server signing task's oneshot channel is dropped"))?;
308 Ok(Json(signed_action))
309 }
310
311 async fn handle_sui_tx_digest(
312 &self,
313 tx_digest_base58: String,
314 event_idx: u16,
315 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
316 let tx_digest = TransactionDigest::from_str(&tx_digest_base58)
317 .map_err(|_e| BridgeError::InvalidTxHash)?;
318 let (tx, rx) = oneshot::channel();
319 self.sui_signer_tx
320 .send(((tx_digest, event_idx), tx))
321 .await
322 .unwrap_or_else(|_| panic!("Server sui signing channel is closed"));
323 let signed_action = rx
324 .await
325 .unwrap_or_else(|_| panic!("Server signing task's oneshot channel is dropped"))?;
326 Ok(Json(signed_action))
327 }
328
329 async fn handle_governance_action(
330 &self,
331 action: BridgeAction,
332 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
333 if !action.is_governace_action() {
334 return Err(BridgeError::ActionIsNotGovernanceAction(action));
335 }
336 let (tx, rx) = oneshot::channel();
337 self.governance_signer_tx
338 .send((action, tx))
339 .await
340 .unwrap_or_else(|_| panic!("Server governance action signing channel is closed"));
341 let signed_action = rx.await.unwrap_or_else(|_| {
342 panic!("Server governance action task's oneshot channel is dropped")
343 })?;
344 Ok(Json(signed_action))
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use std::collections::HashSet;
351
352 use super::*;
353 use crate::{
354 eth_mock_provider::EthMockProvider,
355 events::{MoveTokenDepositedEvent, SuiToEthTokenBridgeV1, init_all_struct_tags},
356 sui_mock_client::SuiMockClient,
357 test_utils::{
358 get_test_log_and_action, get_test_sui_to_eth_bridge_action, mock_last_finalized_block,
359 },
360 types::{EmergencyAction, EmergencyActionType, LimitUpdateAction},
361 };
362 use ethers::types::{Address as EthAddress, TransactionReceipt};
363 use sui_json_rpc_types::{BcsEvent, SuiEvent};
364 use sui_types::bridge::{BridgeChainId, TOKEN_ID_USDC};
365 use sui_types::{base_types::SuiAddress, crypto::get_key_pair};
366
367 #[tokio::test]
368 async fn test_sui_signer_with_cache() {
369 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
370 let signer = Arc::new(kp);
371 let sui_client_mock = SuiMockClient::default();
372 let sui_verifier = SuiActionVerifier {
373 sui_client: Arc::new(SuiClient::new_for_testing(sui_client_mock.clone())),
374 };
375 let metrics = Arc::new(BridgeMetrics::new_for_testing());
376 let mut sui_signer_with_cache = SignerWithCache::new(signer.clone(), sui_verifier, metrics);
377
378 let sui_tx_digest = TransactionDigest::random();
380 let sui_event_idx = 42;
381 assert!(
382 sui_signer_with_cache
383 .get_testing_only((sui_tx_digest, sui_event_idx))
384 .await
385 .is_none()
386 );
387 let entry = sui_signer_with_cache
388 .get_cache_entry((sui_tx_digest, sui_event_idx))
389 .await;
390 let entry_ = sui_signer_with_cache
391 .get_testing_only((sui_tx_digest, sui_event_idx))
392 .await;
393 assert!(entry_.unwrap().lock().await.is_none());
394
395 let action = get_test_sui_to_eth_bridge_action(
396 Some(sui_tx_digest),
397 Some(sui_event_idx),
398 None,
399 None,
400 None,
401 None,
402 None,
403 );
404 let sig = BridgeAuthoritySignInfo::new(&action, &signer);
405 let signed_action = SignedBridgeAction::new_from_data_and_sig(action.clone(), sig);
406 entry.lock().await.replace(Ok(signed_action));
407 let entry_ = sui_signer_with_cache
408 .get_testing_only((sui_tx_digest, sui_event_idx))
409 .await;
410 assert!(entry_.unwrap().lock().await.is_some());
411
412 let sui_tx_digest = TransactionDigest::random();
414 let sui_event_idx = 0;
415
416 sui_client_mock.add_events_by_tx_digest_error(sui_tx_digest);
418 sui_signer_with_cache
419 .sign((sui_tx_digest, sui_event_idx))
420 .await
421 .unwrap_err();
422 let entry_ = sui_signer_with_cache
423 .get_testing_only((sui_tx_digest, sui_event_idx))
424 .await;
425 assert!(entry_.unwrap().lock().await.is_none());
426
427 sui_client_mock.add_events_by_tx_digest(sui_tx_digest, vec![]);
429 assert!(matches!(
430 sui_signer_with_cache
431 .sign((sui_tx_digest, sui_event_idx))
432 .await,
433 Err(BridgeError::NoBridgeEventsInTxPosition)
434 ));
435 let entry_ = sui_signer_with_cache
436 .get_testing_only((sui_tx_digest, sui_event_idx))
437 .await;
438 assert_eq!(
439 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
440 BridgeError::NoBridgeEventsInTxPosition,
441 );
442
443 let emitted_event_1 = MoveTokenDepositedEvent {
448 seq_num: 1,
449 source_chain: BridgeChainId::SuiCustom as u8,
450 sender_address: SuiAddress::random_for_testing_only().to_vec(),
451 target_chain: BridgeChainId::EthCustom as u8,
452 target_address: EthAddress::random().as_bytes().to_vec(),
453 token_type: TOKEN_ID_USDC,
454 amount_sui_adjusted: 12345,
455 };
456
457 init_all_struct_tags();
458
459 let mut sui_event_1 = SuiEvent::random_for_testing();
460 sui_event_1.type_ = SuiToEthTokenBridgeV1.get().unwrap().clone();
461 sui_event_1.bcs = BcsEvent::new(bcs::to_bytes(&emitted_event_1).unwrap());
462 let sui_tx_digest = sui_event_1.id.tx_digest;
463
464 let mut sui_event_2 = SuiEvent::random_for_testing();
465 sui_event_2.type_ = SuiToEthTokenBridgeV1.get().unwrap().clone();
466 sui_event_2.bcs = BcsEvent::new(bcs::to_bytes(&emitted_event_1).unwrap());
467 let sui_event_idx_2 = 1;
468 sui_client_mock.add_events_by_tx_digest(sui_tx_digest, vec![sui_event_2.clone()]);
469
470 sui_client_mock.add_events_by_tx_digest(
471 sui_tx_digest,
472 vec![sui_event_1.clone(), sui_event_2.clone()],
473 );
474 let signed_1 = sui_signer_with_cache
475 .sign((sui_tx_digest, sui_event_idx))
476 .await
477 .unwrap();
478 let signed_2 = sui_signer_with_cache
479 .sign((sui_tx_digest, sui_event_idx_2))
480 .await
481 .unwrap();
482
483 sui_client_mock.add_events_by_tx_digest(sui_tx_digest, vec![]);
486 assert_eq!(
487 sui_signer_with_cache
488 .sign((sui_tx_digest, sui_event_idx))
489 .await
490 .unwrap(),
491 signed_1
492 );
493 assert_eq!(
494 sui_signer_with_cache
495 .sign((sui_tx_digest, sui_event_idx_2))
496 .await
497 .unwrap(),
498 signed_2
499 );
500 }
501
502 #[tokio::test]
503 async fn test_eth_signer_with_cache() {
504 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
505 let signer = Arc::new(kp);
506 let eth_mock_provider = EthMockProvider::default();
507 let contract_address = EthAddress::random();
508 let eth_client = EthClient::new_mocked(
509 eth_mock_provider.clone(),
510 HashSet::from_iter(vec![contract_address]),
511 );
512 let eth_verifier = EthActionVerifier {
513 eth_client: Arc::new(eth_client),
514 };
515 let metrics = Arc::new(BridgeMetrics::new_for_testing());
516 let mut eth_signer_with_cache =
517 SignerWithCache::new(signer.clone(), eth_verifier, metrics.clone());
518
519 let eth_tx_hash = TxHash::random();
521 let eth_event_idx = 42;
522 assert!(
523 eth_signer_with_cache
524 .get_testing_only((eth_tx_hash, eth_event_idx))
525 .await
526 .is_none()
527 );
528 let entry = eth_signer_with_cache
529 .get_cache_entry((eth_tx_hash, eth_event_idx))
530 .await;
531 let entry_ = eth_signer_with_cache
532 .get_testing_only((eth_tx_hash, eth_event_idx))
533 .await;
534 assert!(entry_.unwrap().lock().await.is_none());
536
537 let (_, action) = get_test_log_and_action(contract_address, eth_tx_hash, eth_event_idx);
538 let sig = BridgeAuthoritySignInfo::new(&action, &signer);
539 let signed_action = SignedBridgeAction::new_from_data_and_sig(action.clone(), sig);
540 entry.lock().await.replace(Ok(signed_action.clone()));
541 let entry_ = eth_signer_with_cache
542 .get_testing_only((eth_tx_hash, eth_event_idx))
543 .await;
544 assert_eq!(
545 entry_.unwrap().lock().await.clone().unwrap().unwrap(),
546 signed_action
547 );
548
549 let eth_tx_hash = TxHash::random();
551 let eth_event_idx = 0;
552 let (log, _action) = get_test_log_and_action(contract_address, eth_tx_hash, eth_event_idx);
553 eth_mock_provider
554 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
555 "eth_getTransactionReceipt",
556 [log.transaction_hash.unwrap()],
557 TransactionReceipt {
558 block_number: log.block_number,
559 logs: vec![log.clone()],
560 ..Default::default()
561 },
562 )
563 .unwrap();
564 mock_last_finalized_block(ð_mock_provider, log.block_number.unwrap().as_u64());
565
566 eth_signer_with_cache
567 .sign((eth_tx_hash, eth_event_idx))
568 .await
569 .unwrap();
570 let entry_ = eth_signer_with_cache
571 .get_testing_only((eth_tx_hash, eth_event_idx))
572 .await;
573 entry_.unwrap().lock().await.clone().unwrap().unwrap();
574 }
575
576 #[tokio::test]
577 async fn test_signer_with_governace_verifier() {
578 let action_1 = BridgeAction::EmergencyAction(EmergencyAction {
579 chain_id: BridgeChainId::EthCustom,
580 nonce: 1,
581 action_type: EmergencyActionType::Pause,
582 });
583 let action_2 = BridgeAction::LimitUpdateAction(LimitUpdateAction {
584 chain_id: BridgeChainId::EthCustom,
585 sending_chain_id: BridgeChainId::SuiCustom,
586 nonce: 1,
587 new_usd_limit: 10000,
588 });
589
590 let verifier = GovernanceVerifier::new(vec![action_1.clone(), action_2.clone()]).unwrap();
591 assert_eq!(
592 verifier.verify(action_1.clone()).await.unwrap(),
593 action_1.clone()
594 );
595 assert_eq!(
596 verifier.verify(action_2.clone()).await.unwrap(),
597 action_2.clone()
598 );
599
600 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
601 let signer = Arc::new(kp);
602 let metrics = Arc::new(BridgeMetrics::new_for_testing());
603 let mut signer_with_cache = SignerWithCache::new(signer.clone(), verifier, metrics.clone());
604
605 signer_with_cache.sign(action_1.clone()).await.unwrap();
607 let entry_ = signer_with_cache.get_testing_only(action_1.clone()).await;
609 assert_eq!(
610 entry_
611 .unwrap()
612 .lock()
613 .await
614 .clone()
615 .unwrap()
616 .unwrap()
617 .data(),
618 &action_1
619 );
620
621 let action_3 = BridgeAction::EmergencyAction(EmergencyAction {
623 chain_id: BridgeChainId::EthCustom,
624 nonce: 1,
625 action_type: EmergencyActionType::Unpause,
626 });
627 assert!(matches!(
629 signer_with_cache.sign(action_3.clone()).await.unwrap_err(),
630 BridgeError::GovernanceActionIsNotApproved
631 ));
632 let entry_ = signer_with_cache.get_testing_only(action_3.clone()).await;
634 assert!(matches!(
635 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
636 BridgeError::GovernanceActionIsNotApproved
637 ));
638
639 let action_4 = get_test_sui_to_eth_bridge_action(None, None, None, None, None, None, None);
641 assert!(matches!(
642 signer_with_cache.sign(action_4.clone()).await.unwrap_err(),
643 BridgeError::ActionIsNotGovernanceAction(..)
644 ));
645 let entry_ = signer_with_cache.get_testing_only(action_4.clone()).await;
647 assert!(matches!(
648 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
649 BridgeError::ActionIsNotGovernanceAction { .. }
650 ));
651 }
652 }