sui_bridge/
node.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::action_executor::BridgeActionExecutor;
5use crate::client::bridge_authority_aggregator::BridgeAuthorityAggregator;
6use crate::config::{BridgeClientConfig, BridgeNodeConfig, WatchdogConfig};
7use crate::crypto::BridgeAuthorityPublicKeyBytes;
8use crate::eth_syncer::EthSyncer;
9use crate::events::init_all_struct_tags;
10use crate::metrics::BridgeMetrics;
11use crate::monitor::BridgeMonitor;
12use crate::orchestrator::BridgeOrchestrator;
13use crate::server::handler::BridgeRequestHandler;
14use crate::server::{BridgeNodePublicMetadata, run_server};
15use crate::storage::BridgeOrchestratorTables;
16use crate::sui_bridge_watchdog::eth_bridge_status::EthBridgeStatus;
17use crate::sui_bridge_watchdog::eth_vault_balance::{EthereumVaultBalance, VaultAsset};
18use crate::sui_bridge_watchdog::metrics::WatchdogMetrics;
19use crate::sui_bridge_watchdog::sui_bridge_status::SuiBridgeStatus;
20use crate::sui_bridge_watchdog::total_supplies::TotalSupplies;
21use crate::sui_bridge_watchdog::{BridgeWatchDog, Observable};
22use crate::sui_client::SuiBridgeClient;
23use crate::sui_syncer::SuiSyncer;
24use crate::types::BridgeCommittee;
25use crate::utils::{
26    EthProvider, get_committee_voting_power_by_name, get_eth_contract_addresses,
27    get_validator_names_by_pub_keys,
28};
29use alloy::primitives::Address as EthAddress;
30use arc_swap::ArcSwap;
31use mysten_metrics::spawn_logged_monitored_task;
32use std::collections::{BTreeMap, HashMap};
33use std::net::{IpAddr, Ipv4Addr, SocketAddr};
34use std::sync::Arc;
35use std::time::Duration;
36use sui_types::Identifier;
37use sui_types::bridge::{
38    BRIDGE_COMMITTEE_MODULE_NAME, BRIDGE_LIMITER_MODULE_NAME, BRIDGE_MODULE_NAME,
39    BRIDGE_TREASURY_MODULE_NAME,
40};
41use sui_types::event::EventID;
42use tokio::task::JoinHandle;
43use tracing::info;
44
45pub async fn run_bridge_node(
46    config: BridgeNodeConfig,
47    metadata: BridgeNodePublicMetadata,
48    prometheus_registry: prometheus::Registry,
49) -> anyhow::Result<JoinHandle<()>> {
50    init_all_struct_tags();
51    let metrics = Arc::new(BridgeMetrics::new(&prometheus_registry));
52    let watchdog_config = config.watchdog_config.clone();
53    let (server_config, client_config) = config.validate(metrics.clone()).await?;
54    let sui_chain_identifier = server_config
55        .sui_client
56        .get_chain_identifier()
57        .await
58        .map_err(|e| anyhow::anyhow!("Failed to get sui chain identifier: {:?}", e))?;
59    let eth_chain_identifier = server_config
60        .eth_client
61        .get_chain_id()
62        .await
63        .map_err(|e| anyhow::anyhow!("Failed to get eth chain identifier: {:?}", e))?;
64    prometheus_registry
65        .register(mysten_metrics::bridge_uptime_metric(
66            "bridge",
67            metadata.version,
68            &sui_chain_identifier,
69            &eth_chain_identifier.to_string(),
70            client_config.is_some(),
71        ))
72        .unwrap();
73
74    let committee = Arc::new(
75        server_config
76            .sui_client
77            .get_bridge_committee()
78            .await
79            .expect("Failed to get committee"),
80    );
81    let mut handles = vec![];
82
83    // Start watchdog
84    let eth_provider = server_config.eth_client.provider();
85    let eth_bridge_proxy_address = server_config.eth_bridge_proxy_address;
86    let sui_client = server_config.sui_client.clone();
87    handles.push(spawn_logged_monitored_task!(start_watchdog(
88        watchdog_config,
89        &prometheus_registry,
90        eth_provider,
91        eth_bridge_proxy_address,
92        sui_client
93    )));
94
95    // Update voting right metrics
96    // Before reconfiguration happens we only set it once when the node starts
97    let sui_system = server_config
98        .sui_client
99        .jsonrpc_client()
100        .governance_api()
101        .get_latest_sui_system_state()
102        .await?;
103
104    // Start Client
105    if let Some(client_config) = client_config {
106        let committee_keys_to_names =
107            Arc::new(get_validator_names_by_pub_keys(&committee, &sui_system).await);
108        let client_components = start_client_components(
109            client_config,
110            committee.clone(),
111            committee_keys_to_names,
112            metrics.clone(),
113        )
114        .await?;
115        handles.extend(client_components);
116    }
117
118    let committee_name_mapping = get_committee_voting_power_by_name(&committee, &sui_system).await;
119    for (name, voting_power) in committee_name_mapping.into_iter() {
120        metrics
121            .current_bridge_voting_rights
122            .with_label_values(&[name.as_str()])
123            .set(voting_power as i64);
124    }
125
126    // Start Server
127    let socket_address = SocketAddr::new(
128        IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
129        server_config.server_listen_port,
130    );
131    Ok(run_server(
132        &socket_address,
133        BridgeRequestHandler::new(
134            server_config.key,
135            server_config.sui_client,
136            server_config.eth_client,
137            server_config.approved_governance_actions,
138        ),
139        metrics,
140        Arc::new(metadata),
141    ))
142}
143
144async fn start_watchdog(
145    watchdog_config: Option<WatchdogConfig>,
146    registry: &prometheus::Registry,
147    eth_provider: EthProvider,
148    eth_bridge_proxy_address: EthAddress,
149    sui_client: Arc<SuiBridgeClient>,
150) {
151    let watchdog_metrics = WatchdogMetrics::new(registry);
152    let (
153        _committee_address,
154        _limiter_address,
155        vault_address,
156        _config_address,
157        weth_address,
158        usdt_address,
159        wbtc_address,
160        lbtc_address,
161    ) = get_eth_contract_addresses(eth_bridge_proxy_address, eth_provider.clone())
162        .await
163        .unwrap_or_else(|e| panic!("get_eth_contract_addresses should not fail: {}", e));
164
165    let eth_vault_balance = EthereumVaultBalance::new(
166        eth_provider.clone(),
167        vault_address,
168        weth_address,
169        VaultAsset::WETH,
170        watchdog_metrics.eth_vault_balance.clone(),
171    )
172    .await
173    .unwrap_or_else(|e| panic!("Failed to create eth vault balance: {}", e));
174
175    let usdt_vault_balance = EthereumVaultBalance::new(
176        eth_provider.clone(),
177        vault_address,
178        usdt_address,
179        VaultAsset::USDT,
180        watchdog_metrics.usdt_vault_balance.clone(),
181    )
182    .await
183    .unwrap_or_else(|e| panic!("Failed to create usdt vault balance: {}", e));
184
185    let wbtc_vault_balance = EthereumVaultBalance::new(
186        eth_provider.clone(),
187        vault_address,
188        wbtc_address,
189        VaultAsset::WBTC,
190        watchdog_metrics.wbtc_vault_balance.clone(),
191    )
192    .await
193    .unwrap_or_else(|e| panic!("Failed to create wbtc vault balance: {}", e));
194
195    let lbtc_vault_balance = if !lbtc_address.is_zero() {
196        Some(
197            EthereumVaultBalance::new(
198                eth_provider.clone(),
199                vault_address,
200                lbtc_address,
201                VaultAsset::LBTC,
202                watchdog_metrics.lbtc_vault_balance.clone(),
203            )
204            .await
205            .unwrap_or_else(|e| panic!("Failed to create lbtc vault balance: {}", e)),
206        )
207    } else {
208        None
209    };
210
211    let eth_bridge_status = EthBridgeStatus::new(
212        eth_provider,
213        eth_bridge_proxy_address,
214        watchdog_metrics.eth_bridge_paused.clone(),
215    );
216
217    let sui_bridge_status = SuiBridgeStatus::new(
218        sui_client.clone(),
219        watchdog_metrics.sui_bridge_paused.clone(),
220    );
221
222    let mut observables: Vec<Box<dyn Observable + Send + Sync>> = vec![
223        Box::new(eth_vault_balance),
224        Box::new(usdt_vault_balance),
225        Box::new(wbtc_vault_balance),
226        Box::new(eth_bridge_status),
227        Box::new(sui_bridge_status),
228    ];
229
230    // Add lbtc_vault_balance if it's available
231    if let Some(balance) = lbtc_vault_balance {
232        observables.push(Box::new(balance));
233    }
234
235    if let Some(watchdog_config) = watchdog_config
236        && !watchdog_config.total_supplies.is_empty()
237    {
238        let total_supplies = TotalSupplies::new(
239            Arc::new(sui_client.jsonrpc_client().clone()),
240            watchdog_config.total_supplies,
241            watchdog_metrics.total_supplies.clone(),
242        );
243        observables.push(Box::new(total_supplies));
244    }
245
246    BridgeWatchDog::new(observables).run().await
247}
248
249// TODO: is there a way to clean up the overrides after it's stored in DB?
250async fn start_client_components(
251    client_config: BridgeClientConfig,
252    committee: Arc<BridgeCommittee>,
253    committee_keys_to_names: Arc<BTreeMap<BridgeAuthorityPublicKeyBytes, String>>,
254    metrics: Arc<BridgeMetrics>,
255) -> anyhow::Result<Vec<JoinHandle<()>>> {
256    let store: std::sync::Arc<BridgeOrchestratorTables> =
257        BridgeOrchestratorTables::new(&client_config.db_path.join("client"));
258    let sui_modules_to_watch = get_sui_modules_to_watch(
259        &store,
260        client_config.sui_bridge_module_last_processed_event_id_override,
261    );
262    let eth_contracts_to_watch = get_eth_contracts_to_watch(
263        &store,
264        &client_config.eth_contracts,
265        client_config.eth_contracts_start_block_fallback,
266        client_config.eth_contracts_start_block_override,
267    );
268
269    let sui_client = client_config.sui_client.clone();
270
271    let mut all_handles = vec![];
272    let (task_handles, eth_events_rx, _) =
273        EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch)
274            .run(metrics.clone())
275            .await
276            .expect("Failed to start eth syncer");
277    all_handles.extend(task_handles);
278
279    let (task_handles, sui_events_rx) = SuiSyncer::new(
280        client_config.sui_client,
281        sui_modules_to_watch,
282        metrics.clone(),
283    )
284    .run(Duration::from_secs(2))
285    .await
286    .expect("Failed to start sui syncer");
287    all_handles.extend(task_handles);
288
289    let bridge_auth_agg = Arc::new(ArcSwap::from(Arc::new(BridgeAuthorityAggregator::new(
290        committee,
291        metrics.clone(),
292        committee_keys_to_names,
293    ))));
294    // TODO: should we use one query instead of two?
295    let sui_token_type_tags = sui_client.get_token_id_map().await.unwrap();
296    let is_bridge_paused = sui_client.is_bridge_paused().await.unwrap();
297
298    let (bridge_pause_tx, bridge_pause_rx) = tokio::sync::watch::channel(is_bridge_paused);
299
300    let (sui_monitor_tx, sui_monitor_rx) = mysten_metrics::metered_channel::channel(
301        10000,
302        &mysten_metrics::get_metrics()
303            .unwrap()
304            .channel_inflight
305            .with_label_values(&["sui_monitor_queue"]),
306    );
307    let (eth_monitor_tx, eth_monitor_rx) = mysten_metrics::metered_channel::channel(
308        10000,
309        &mysten_metrics::get_metrics()
310            .unwrap()
311            .channel_inflight
312            .with_label_values(&["eth_monitor_queue"]),
313    );
314
315    let sui_token_type_tags = Arc::new(ArcSwap::from(Arc::new(sui_token_type_tags)));
316    let bridge_action_executor = BridgeActionExecutor::new(
317        sui_client.clone(),
318        bridge_auth_agg.clone(),
319        store.clone(),
320        client_config.key,
321        client_config.sui_address,
322        client_config.gas_object_ref.0,
323        sui_token_type_tags.clone(),
324        bridge_pause_rx,
325        metrics.clone(),
326    )
327    .await;
328
329    let monitor = BridgeMonitor::new(
330        sui_client.clone(),
331        sui_monitor_rx,
332        eth_monitor_rx,
333        bridge_auth_agg.clone(),
334        bridge_pause_tx,
335        sui_token_type_tags,
336        metrics.clone(),
337    );
338    all_handles.push(spawn_logged_monitored_task!(monitor.run()));
339
340    // Create a dummy channel for the grpc sui_events_rx that the orchestrator expects
341    // This channel will never receive any events since we have not yet switched to the grpc syncer
342    let (_sui_grpc_events_tx, sui_grpc_events_rx) = mysten_metrics::metered_channel::channel(
343        1,
344        &mysten_metrics::get_metrics()
345            .unwrap()
346            .channel_inflight
347            .with_label_values(&["sui_events_queue_dummy"]),
348    );
349
350    let orchestrator = BridgeOrchestrator::new(
351        sui_client,
352        sui_events_rx,
353        sui_grpc_events_rx,
354        eth_events_rx,
355        store.clone(),
356        sui_monitor_tx,
357        eth_monitor_tx,
358        metrics,
359    );
360
361    all_handles.extend(orchestrator.run(bridge_action_executor).await);
362    Ok(all_handles)
363}
364
365// NOTE: This function will be used later when we switch to the gRPC based event syncer.
366#[allow(unused)]
367async fn get_next_sequence_number<C: crate::sui_client::SuiClientInner>(
368    store: &BridgeOrchestratorTables,
369    sui_client: &crate::sui_client::SuiClient<C>,
370    last_processed_bridge_event_id: Option<EventID>,
371    next_sequence_number_override: Option<u64>,
372) -> u64 {
373    if let Some(next_sequence_number_override) = next_sequence_number_override {
374        info!("Overriding next sequence number to {next_sequence_number_override}",);
375        return next_sequence_number_override;
376    }
377
378    if let Ok(Some(sequence_number)) = store.get_sui_sequence_number_cursor() {
379        info!("Using sequence number {sequence_number} from storage",);
380        return sequence_number;
381    }
382
383    if let Some(event_id) = last_processed_bridge_event_id {
384        match sui_client.get_sequence_number_from_event_id(event_id).await {
385            Ok(Some(sequence_number)) => {
386                let next = sequence_number + 1;
387                info!(
388                    ?event_id,
389                    last_processed_seq = sequence_number,
390                    next_seq_to_read = next,
391                    "Migrated from legacy event cursor to sequence number cursor"
392                );
393                return next;
394            }
395            Ok(None) => {
396                info!(
397                    ?event_id,
398                    "Could not extract sequence number from legacy event cursor, starting from 0"
399                );
400            }
401            Err(e) => {
402                info!(
403                    ?event_id,
404                    ?e,
405                    "Failed to get sequence number from legacy event cursor, starting from 0"
406                );
407            }
408        }
409    }
410
411    info!("No cursor found for gRPC syncer, starting from sequence number 0");
412    0
413}
414
415fn get_sui_modules_to_watch(
416    store: &std::sync::Arc<BridgeOrchestratorTables>,
417    sui_bridge_module_last_processed_event_id_override: Option<EventID>,
418) -> HashMap<Identifier, Option<EventID>> {
419    let sui_bridge_modules = vec![
420        BRIDGE_MODULE_NAME.to_owned(),
421        BRIDGE_COMMITTEE_MODULE_NAME.to_owned(),
422        BRIDGE_TREASURY_MODULE_NAME.to_owned(),
423        BRIDGE_LIMITER_MODULE_NAME.to_owned(),
424    ];
425    if let Some(cursor) = sui_bridge_module_last_processed_event_id_override {
426        info!("Overriding cursor for sui bridge modules to {:?}", cursor);
427        return HashMap::from_iter(
428            sui_bridge_modules
429                .iter()
430                .map(|module| (module.clone(), Some(cursor))),
431        );
432    }
433
434    let sui_bridge_module_stored_cursor = store
435        .get_sui_event_cursors(&sui_bridge_modules)
436        .expect("Failed to get eth sui event cursors from storage");
437    let mut sui_modules_to_watch = HashMap::new();
438    for (module_identifier, cursor) in sui_bridge_modules
439        .iter()
440        .zip(sui_bridge_module_stored_cursor)
441    {
442        if cursor.is_none() {
443            info!(
444                "No cursor found for sui bridge module {} in storage or config override, query start from the beginning.",
445                module_identifier
446            );
447        }
448        sui_modules_to_watch.insert(module_identifier.clone(), cursor);
449    }
450    sui_modules_to_watch
451}
452
453fn get_eth_contracts_to_watch(
454    store: &std::sync::Arc<BridgeOrchestratorTables>,
455    eth_contracts: &[EthAddress],
456    eth_contracts_start_block_fallback: u64,
457    eth_contracts_start_block_override: Option<u64>,
458) -> HashMap<EthAddress, u64> {
459    let stored_eth_cursors = store
460        .get_eth_event_cursors(eth_contracts)
461        .expect("Failed to get eth event cursors from storage");
462    let mut eth_contracts_to_watch = HashMap::new();
463    for (contract, stored_cursor) in eth_contracts.iter().zip(stored_eth_cursors) {
464        // start block precedence:
465        // eth_contracts_start_block_override > stored cursor > eth_contracts_start_block_fallback
466        match (eth_contracts_start_block_override, stored_cursor) {
467            (Some(override_), _) => {
468                eth_contracts_to_watch.insert(*contract, override_);
469                info!(
470                    "Overriding cursor for eth bridge contract {} to {}. Stored cursor: {:?}",
471                    contract, override_, stored_cursor
472                );
473            }
474            (None, Some(stored_cursor)) => {
475                // +1: The stored value is the last block that was processed, so we start from the next block.
476                eth_contracts_to_watch.insert(*contract, stored_cursor + 1);
477            }
478            (None, None) => {
479                // If no cursor is found, start from the fallback block.
480                eth_contracts_to_watch.insert(*contract, eth_contracts_start_block_fallback);
481            }
482        }
483    }
484    eth_contracts_to_watch
485}
486
487#[cfg(test)]
488mod tests {
489    use alloy::primitives::Address as EthAddress;
490    use alloy::primitives::U160;
491    use prometheus::Registry;
492
493    use super::*;
494    use crate::config::BridgeNodeConfig;
495    use crate::config::EthConfig;
496    use crate::config::SuiConfig;
497    use crate::config::default_ed25519_key_pair;
498    use crate::e2e_tests::test_utils::BridgeTestCluster;
499    use crate::e2e_tests::test_utils::BridgeTestClusterBuilder;
500    use crate::utils::wait_for_server_to_be_up;
501    use fastcrypto::secp256k1::Secp256k1KeyPair;
502    use sui_config::local_ip_utils::get_available_port;
503    use sui_types::base_types::SuiAddress;
504    use sui_types::bridge::BridgeChainId;
505    use sui_types::crypto::EncodeDecodeBase64;
506    use sui_types::crypto::KeypairTraits;
507    use sui_types::crypto::SuiKeyPair;
508    use sui_types::crypto::get_key_pair;
509    use sui_types::digests::TransactionDigest;
510    use sui_types::event::EventID;
511    use tempfile::tempdir;
512
513    #[tokio::test]
514    async fn test_get_eth_contracts_to_watch() {
515        telemetry_subscribers::init_for_testing();
516        let temp_dir = tempfile::tempdir().unwrap();
517        let eth_contracts = vec![
518            EthAddress::from(U160::from(1)),
519            EthAddress::from(U160::from(2)),
520        ];
521        let store = BridgeOrchestratorTables::new(temp_dir.path());
522
523        // No override, no watermark found in DB, use fallback
524        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, None);
525        assert_eq!(
526            contracts,
527            vec![(eth_contracts[0], 10), (eth_contracts[1], 10)]
528                .into_iter()
529                .collect::<HashMap<_, _>>()
530        );
531
532        // no watermark found in DB, use override
533        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, Some(420));
534        assert_eq!(
535            contracts,
536            vec![(eth_contracts[0], 420), (eth_contracts[1], 420)]
537                .into_iter()
538                .collect::<HashMap<_, _>>()
539        );
540
541        store
542            .update_eth_event_cursor(eth_contracts[0], 100)
543            .unwrap();
544        store
545            .update_eth_event_cursor(eth_contracts[1], 102)
546            .unwrap();
547
548        // No override, found watermarks in DB, use +1
549        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, None);
550        assert_eq!(
551            contracts,
552            vec![(eth_contracts[0], 101), (eth_contracts[1], 103)]
553                .into_iter()
554                .collect::<HashMap<_, _>>()
555        );
556
557        // use override
558        let contracts = get_eth_contracts_to_watch(&store, &eth_contracts, 10, Some(200));
559        assert_eq!(
560            contracts,
561            vec![(eth_contracts[0], 200), (eth_contracts[1], 200)]
562                .into_iter()
563                .collect::<HashMap<_, _>>()
564        );
565    }
566
567    #[tokio::test]
568    async fn test_get_sui_modules_to_watch() {
569        telemetry_subscribers::init_for_testing();
570        let temp_dir = tempfile::tempdir().unwrap();
571
572        let store = BridgeOrchestratorTables::new(temp_dir.path());
573        let bridge_module = BRIDGE_MODULE_NAME.to_owned();
574        let committee_module = BRIDGE_COMMITTEE_MODULE_NAME.to_owned();
575        let treasury_module = BRIDGE_TREASURY_MODULE_NAME.to_owned();
576        let limiter_module = BRIDGE_LIMITER_MODULE_NAME.to_owned();
577        // No override, no stored watermark, use None
578        let sui_modules_to_watch = get_sui_modules_to_watch(&store, None);
579        assert_eq!(
580            sui_modules_to_watch,
581            vec![
582                (bridge_module.clone(), None),
583                (committee_module.clone(), None),
584                (treasury_module.clone(), None),
585                (limiter_module.clone(), None)
586            ]
587            .into_iter()
588            .collect::<HashMap<_, _>>()
589        );
590
591        // no stored watermark, use override
592        let override_cursor = EventID {
593            tx_digest: TransactionDigest::random(),
594            event_seq: 42,
595        };
596        let sui_modules_to_watch = get_sui_modules_to_watch(&store, Some(override_cursor));
597        assert_eq!(
598            sui_modules_to_watch,
599            vec![
600                (bridge_module.clone(), Some(override_cursor)),
601                (committee_module.clone(), Some(override_cursor)),
602                (treasury_module.clone(), Some(override_cursor)),
603                (limiter_module.clone(), Some(override_cursor))
604            ]
605            .into_iter()
606            .collect::<HashMap<_, _>>()
607        );
608
609        // No override, found stored watermark for `bridge` module, use stored watermark for `bridge`
610        // and None for `committee`
611        let stored_cursor = EventID {
612            tx_digest: TransactionDigest::random(),
613            event_seq: 100,
614        };
615        store
616            .update_sui_event_cursor(bridge_module.clone(), stored_cursor)
617            .unwrap();
618        let sui_modules_to_watch = get_sui_modules_to_watch(&store, None);
619        assert_eq!(
620            sui_modules_to_watch,
621            vec![
622                (bridge_module.clone(), Some(stored_cursor)),
623                (committee_module.clone(), None),
624                (treasury_module.clone(), None),
625                (limiter_module.clone(), None)
626            ]
627            .into_iter()
628            .collect::<HashMap<_, _>>()
629        );
630
631        // found stored watermark, use override
632        let stored_cursor = EventID {
633            tx_digest: TransactionDigest::random(),
634            event_seq: 100,
635        };
636        store
637            .update_sui_event_cursor(committee_module.clone(), stored_cursor)
638            .unwrap();
639        let sui_modules_to_watch = get_sui_modules_to_watch(&store, Some(override_cursor));
640        assert_eq!(
641            sui_modules_to_watch,
642            vec![
643                (bridge_module.clone(), Some(override_cursor)),
644                (committee_module.clone(), Some(override_cursor)),
645                (treasury_module.clone(), Some(override_cursor)),
646                (limiter_module.clone(), Some(override_cursor))
647            ]
648            .into_iter()
649            .collect::<HashMap<_, _>>()
650        );
651    }
652
653    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
654    async fn test_starting_bridge_node() {
655        telemetry_subscribers::init_for_testing();
656        let bridge_test_cluster = setup().await;
657        let kp = bridge_test_cluster.bridge_authority_key(0);
658
659        // prepare node config (server only)
660        let tmp_dir = tempdir().unwrap().keep();
661        let authority_key_path = "test_starting_bridge_node_bridge_authority_key";
662        let server_listen_port = get_available_port("127.0.0.1");
663        let base64_encoded = kp.encode_base64();
664        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
665
666        let config = BridgeNodeConfig {
667            server_listen_port,
668            metrics_port: get_available_port("127.0.0.1"),
669            bridge_authority_key_path: tmp_dir.join(authority_key_path),
670            sui: SuiConfig {
671                sui_rpc_url: bridge_test_cluster.sui_rpc_url(),
672                sui_bridge_chain_id: BridgeChainId::SuiCustom as u8,
673                bridge_client_key_path: None,
674                bridge_client_gas_object: None,
675                sui_bridge_module_last_processed_event_id_override: None,
676                sui_bridge_next_sequence_number_override: None,
677            },
678            eth: EthConfig {
679                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
680                eth_bridge_proxy_address: bridge_test_cluster.sui_bridge_address(),
681                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
682                eth_contracts_start_block_fallback: None,
683                eth_contracts_start_block_override: None,
684            },
685            approved_governance_actions: vec![],
686            run_client: false,
687            db_path: None,
688            metrics_key_pair: default_ed25519_key_pair(),
689            metrics: None,
690            watchdog_config: None,
691        };
692        // Spawn bridge node in memory
693        let _handle = run_bridge_node(
694            config,
695            BridgeNodePublicMetadata::empty_for_testing(),
696            Registry::new(),
697        )
698        .await
699        .unwrap();
700
701        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
702        // Now we expect to see the server to be up and running.
703        let res = wait_for_server_to_be_up(server_url, 5).await;
704        res.unwrap();
705    }
706
707    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
708    async fn test_starting_bridge_node_with_client() {
709        telemetry_subscribers::init_for_testing();
710        let bridge_test_cluster = setup().await;
711        let kp = bridge_test_cluster.bridge_authority_key(0);
712
713        // prepare node config (server + client)
714        let tmp_dir = tempdir().unwrap().keep();
715        let db_path = tmp_dir.join("test_starting_bridge_node_with_client_db");
716        let authority_key_path = "test_starting_bridge_node_with_client_bridge_authority_key";
717        let server_listen_port = get_available_port("127.0.0.1");
718
719        let base64_encoded = kp.encode_base64();
720        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
721
722        let client_sui_address = SuiAddress::from(kp.public());
723        let sender_address = bridge_test_cluster.sui_user_address();
724        // send some gas to this address
725        bridge_test_cluster
726            .test_cluster
727            .inner
728            .transfer_sui_must_exceed(sender_address, client_sui_address, 1000000000)
729            .await;
730
731        let config = BridgeNodeConfig {
732            server_listen_port,
733            metrics_port: get_available_port("127.0.0.1"),
734            bridge_authority_key_path: tmp_dir.join(authority_key_path),
735            sui: SuiConfig {
736                sui_rpc_url: bridge_test_cluster.sui_rpc_url(),
737                sui_bridge_chain_id: BridgeChainId::SuiCustom as u8,
738                bridge_client_key_path: None,
739                bridge_client_gas_object: None,
740                sui_bridge_module_last_processed_event_id_override: Some(EventID {
741                    tx_digest: TransactionDigest::random(),
742                    event_seq: 0,
743                }),
744                sui_bridge_next_sequence_number_override: None,
745            },
746            eth: EthConfig {
747                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
748                eth_bridge_proxy_address: bridge_test_cluster.sui_bridge_address(),
749                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
750                eth_contracts_start_block_fallback: Some(0),
751                eth_contracts_start_block_override: None,
752            },
753            approved_governance_actions: vec![],
754            run_client: true,
755            db_path: Some(db_path),
756            metrics_key_pair: default_ed25519_key_pair(),
757            metrics: None,
758            watchdog_config: None,
759        };
760        // Spawn bridge node in memory
761        let _handle = run_bridge_node(
762            config,
763            BridgeNodePublicMetadata::empty_for_testing(),
764            Registry::new(),
765        )
766        .await
767        .unwrap();
768
769        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
770        // Now we expect to see the server to be up and running.
771        // client components are spawned earlier than server, so as long as the server is up,
772        // we know the client components are already running.
773        let res = wait_for_server_to_be_up(server_url, 5).await;
774        res.unwrap();
775    }
776
777    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
778    async fn test_starting_bridge_node_with_client_and_separate_client_key() {
779        telemetry_subscribers::init_for_testing();
780        let bridge_test_cluster = setup().await;
781        let kp = bridge_test_cluster.bridge_authority_key(0);
782
783        // prepare node config (server + client)
784        let tmp_dir = tempdir().unwrap().keep();
785        let db_path =
786            tmp_dir.join("test_starting_bridge_node_with_client_and_separate_client_key_db");
787        let authority_key_path =
788            "test_starting_bridge_node_with_client_and_separate_client_key_bridge_authority_key";
789        let server_listen_port = get_available_port("127.0.0.1");
790
791        // prepare bridge authority key
792        let base64_encoded = kp.encode_base64();
793        std::fs::write(tmp_dir.join(authority_key_path), base64_encoded).unwrap();
794
795        // prepare bridge client key
796        let (_, kp): (_, Secp256k1KeyPair) = get_key_pair();
797        let kp = SuiKeyPair::from(kp);
798        let client_key_path =
799            "test_starting_bridge_node_with_client_and_separate_client_key_bridge_client_key";
800        std::fs::write(tmp_dir.join(client_key_path), kp.encode_base64()).unwrap();
801        let client_sui_address = SuiAddress::from(&kp.public());
802        let sender_address = bridge_test_cluster.sui_user_address();
803        // send some gas to this address
804        let gas_obj = bridge_test_cluster
805            .test_cluster
806            .inner
807            .transfer_sui_must_exceed(sender_address, client_sui_address, 1000000000)
808            .await;
809
810        let config = BridgeNodeConfig {
811            server_listen_port,
812            metrics_port: get_available_port("127.0.0.1"),
813            bridge_authority_key_path: tmp_dir.join(authority_key_path),
814            sui: SuiConfig {
815                sui_rpc_url: bridge_test_cluster.sui_rpc_url(),
816                sui_bridge_chain_id: BridgeChainId::SuiCustom as u8,
817                bridge_client_key_path: Some(tmp_dir.join(client_key_path)),
818                bridge_client_gas_object: Some(gas_obj),
819                sui_bridge_module_last_processed_event_id_override: Some(EventID {
820                    tx_digest: TransactionDigest::random(),
821                    event_seq: 0,
822                }),
823                sui_bridge_next_sequence_number_override: None,
824            },
825            eth: EthConfig {
826                eth_rpc_url: bridge_test_cluster.eth_rpc_url(),
827                eth_bridge_proxy_address: bridge_test_cluster.sui_bridge_address(),
828                eth_bridge_chain_id: BridgeChainId::EthCustom as u8,
829                eth_contracts_start_block_fallback: Some(0),
830                eth_contracts_start_block_override: Some(0),
831            },
832            approved_governance_actions: vec![],
833            run_client: true,
834            db_path: Some(db_path),
835            metrics_key_pair: default_ed25519_key_pair(),
836            metrics: None,
837            watchdog_config: None,
838        };
839        // Spawn bridge node in memory
840        let _handle = run_bridge_node(
841            config,
842            BridgeNodePublicMetadata::empty_for_testing(),
843            Registry::new(),
844        )
845        .await
846        .unwrap();
847
848        let server_url = format!("http://127.0.0.1:{}", server_listen_port);
849        // Now we expect to see the server to be up and running.
850        // client components are spawned earlier than server, so as long as the server is up,
851        // we know the client components are already running.
852        let res = wait_for_server_to_be_up(server_url, 5).await;
853        res.unwrap();
854    }
855
856    async fn setup() -> BridgeTestCluster {
857        BridgeTestClusterBuilder::new()
858            .with_eth_env(true)
859            .with_bridge_cluster(false)
860            .with_num_validators(2)
861            .build()
862            .await
863    }
864}