sui_bridge/
sui_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The SuiSyncer module is responsible for synchronizing Events emitted
5//! on Sui blockchain from concerned modules of bridge package 0x9.
6//!
7//! There are two modes of operation:
8//! - Event-based (legacy): Uses JSON-RPC to query events by module
9//! - gRPC-based (new): Iterates over bridge records using LinkedTable iteration
10//!
11//! As of now, only the event-based mode is being used.
12
13use crate::{
14    error::BridgeResult,
15    events::{EmittedSuiToEthTokenBridgeV1, SuiBridgeEvent},
16    metrics::BridgeMetrics,
17    retry_with_max_elapsed_time,
18    sui_client::{SuiClient, SuiClientInner},
19    types::BridgeAction,
20};
21use mysten_metrics::spawn_logged_monitored_task;
22use std::{collections::HashMap, sync::Arc};
23use sui_json_rpc_types::SuiEvent;
24use sui_types::BRIDGE_PACKAGE_ID;
25use sui_types::{Identifier, event::EventID};
26use tokio::{
27    sync::Notify,
28    task::JoinHandle,
29    time::{self, Duration},
30};
31
32const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
33
34/// Map from contract address to their start cursor (exclusive)
35pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
36
37pub type GrpcSyncedEvents = (u64, Vec<SuiBridgeEvent>);
38
39pub struct SuiSyncer<C> {
40    sui_client: Arc<SuiClient<C>>,
41    // The last transaction that the syncer has fully processed.
42    // Syncer will resume post this transaction (i.e. exclusive), when it starts.
43    cursors: SuiTargetModules,
44    metrics: Arc<BridgeMetrics>,
45}
46
47impl<C> SuiSyncer<C>
48where
49    C: SuiClientInner + 'static,
50{
51    pub fn new(
52        sui_client: Arc<SuiClient<C>>,
53        cursors: SuiTargetModules,
54        metrics: Arc<BridgeMetrics>,
55    ) -> Self {
56        Self {
57            sui_client,
58            cursors,
59            metrics,
60        }
61    }
62
63    pub async fn run(
64        self,
65        query_interval: Duration,
66    ) -> BridgeResult<(
67        Vec<JoinHandle<()>>,
68        mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
69    )> {
70        let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
71            SUI_EVENTS_CHANNEL_SIZE,
72            &mysten_metrics::get_metrics()
73                .unwrap()
74                .channel_inflight
75                .with_label_values(&["sui_events_queue"]),
76        );
77
78        let mut task_handles = vec![];
79        for (module, cursor) in self.cursors {
80            let metrics = self.metrics.clone();
81            let events_rx_clone: mysten_metrics::metered_channel::Sender<(
82                Identifier,
83                Vec<SuiEvent>,
84            )> = events_tx.clone();
85            let sui_client_clone = self.sui_client.clone();
86            task_handles.push(spawn_logged_monitored_task!(
87                Self::run_event_listening_task(
88                    module,
89                    cursor,
90                    events_rx_clone,
91                    sui_client_clone,
92                    query_interval,
93                    metrics,
94                )
95            ));
96        }
97        Ok((task_handles, events_rx))
98    }
99
100    async fn run_event_listening_task(
101        // The module where interested events are defined.
102        // Module is always of bridge package 0x9.
103        module: Identifier,
104        mut cursor: Option<EventID>,
105        events_sender: mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
106        sui_client: Arc<SuiClient<C>>,
107        query_interval: Duration,
108        metrics: Arc<BridgeMetrics>,
109    ) {
110        tracing::info!(?module, ?cursor, "Starting sui events listening task");
111        let mut interval = time::interval(query_interval);
112        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
113
114        // Create a task to update metrics
115        let notify = Arc::new(Notify::new());
116        let notify_clone = notify.clone();
117        let sui_client_clone = sui_client.clone();
118        let last_synced_sui_checkpoints_metric = metrics
119            .last_synced_sui_checkpoints
120            .with_label_values(&[&module.to_string()]);
121        spawn_logged_monitored_task!(async move {
122            loop {
123                notify_clone.notified().await;
124                let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
125                    sui_client_clone.get_latest_checkpoint_sequence_number(),
126                    Duration::from_secs(120)
127                ) else {
128                    tracing::error!(
129                        "Failed to query latest checkpoint sequence number from sui client after retry"
130                    );
131                    continue;
132                };
133                last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
134            }
135        });
136
137        loop {
138            interval.tick().await;
139            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
140                sui_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
141                Duration::from_secs(120)
142            ) else {
143                tracing::error!("Failed to query events from sui client after retry");
144                continue;
145            };
146
147            let len = events.data.len();
148            if len != 0 {
149                if !events.has_next_page {
150                    // If this is the last page, it means we have processed all events up to the latest checkpoint
151                    // We can then update the latest checkpoint metric.
152                    notify.notify_one();
153                }
154                events_sender
155                    .send((module.clone(), events.data))
156                    .await
157                    .expect("All Sui event channel receivers are closed");
158                if let Some(next) = events.next_cursor {
159                    cursor = Some(next);
160                }
161                tracing::info!(?module, ?cursor, "Observed {len} new Sui events");
162            }
163        }
164    }
165
166    pub async fn run_grpc(
167        self,
168        source_chain_id: u8,
169        next_sequence_number: u64,
170        query_interval: Duration,
171        batch_size: u64,
172    ) -> BridgeResult<(
173        Vec<JoinHandle<()>>,
174        mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
175    )> {
176        let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
177            SUI_EVENTS_CHANNEL_SIZE,
178            &mysten_metrics::get_metrics()
179                .unwrap()
180                .channel_inflight
181                .with_label_values(&["sui_grpc_events_queue"]),
182        );
183
184        let task_handle = spawn_logged_monitored_task!(Self::run_grpc_listening_task(
185            source_chain_id,
186            next_sequence_number,
187            events_tx,
188            self.sui_client.clone(),
189            query_interval,
190            batch_size,
191            self.metrics.clone(),
192        ));
193
194        Ok((vec![task_handle], events_rx))
195    }
196
197    async fn run_grpc_listening_task(
198        source_chain_id: u8,
199        mut next_sequence_cursor: u64,
200        events_sender: mysten_metrics::metered_channel::Sender<GrpcSyncedEvents>,
201        sui_client: Arc<SuiClient<C>>,
202        query_interval: Duration,
203        batch_size: u64,
204        metrics: Arc<BridgeMetrics>,
205    ) {
206        tracing::info!(
207            source_chain_id,
208            next_sequence_cursor,
209            "Starting sui grpc records listening task"
210        );
211        let mut interval = time::interval(query_interval);
212        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
213
214        // Create a task to update metrics
215        let notify = Arc::new(Notify::new());
216        let notify_clone = notify.clone();
217        let sui_client_clone = sui_client.clone();
218        let chain_label = source_chain_id.to_string();
219        let last_synced_sui_checkpoints_metric = metrics
220            .last_synced_sui_checkpoints
221            .with_label_values(&[&chain_label]);
222        spawn_logged_monitored_task!(async move {
223            loop {
224                notify_clone.notified().await;
225                let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
226                    sui_client_clone.get_latest_checkpoint_sequence_number(),
227                    Duration::from_secs(120)
228                ) else {
229                    tracing::error!(
230                        "Failed to query latest checkpoint sequence number from sui client after retry"
231                    );
232                    continue;
233                };
234                last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
235            }
236        });
237
238        loop {
239            interval.tick().await;
240            let Ok(Ok(on_chain_next_sequence_index)) = retry_with_max_elapsed_time!(
241                sui_client.get_token_transfer_next_seq_number(source_chain_id),
242                Duration::from_secs(120)
243            ) else {
244                tracing::error!(
245                    source_chain_id,
246                    "Failed to get next seq num from sui client after retry"
247                );
248                continue;
249            };
250
251            // start querying from the next_sequence_cursor till on_chain_next_sequence_index in batches
252            let start_index = next_sequence_cursor;
253            if start_index >= on_chain_next_sequence_index {
254                notify.notify_one();
255                continue;
256            }
257
258            let end_index = std::cmp::min(
259                start_index + batch_size - 1,
260                on_chain_next_sequence_index - 1,
261            );
262
263            let Ok(Ok(records)) = retry_with_max_elapsed_time!(
264                sui_client.get_bridge_records_in_range(source_chain_id, start_index, end_index),
265                Duration::from_secs(120)
266            ) else {
267                tracing::error!(
268                    source_chain_id,
269                    start_index,
270                    end_index,
271                    "Failed to get records from sui client after retry"
272                );
273                continue;
274            };
275
276            let len = records.len();
277            if len != 0 {
278                let mut events = Vec::with_capacity(len);
279                let mut batch_last_sequence_index = start_index;
280
281                for (seq_index, record) in records {
282                    let event = match Self::bridge_record_to_event(&record, source_chain_id) {
283                        Ok(event) => event,
284                        Err(e) => {
285                            tracing::error!(
286                                source_chain_id,
287                                seq_index,
288                                "Failed to convert record to event: {:?}",
289                                e
290                            );
291                            continue;
292                        }
293                    };
294
295                    events.push(event);
296                    batch_last_sequence_index = seq_index;
297                }
298
299                if !events.is_empty() {
300                    events_sender
301                        .send((batch_last_sequence_index + 1, events))
302                        .await
303                        .expect("Bridge events channel receiver is closed");
304
305                    next_sequence_cursor = batch_last_sequence_index + 1;
306                    tracing::info!(
307                        source_chain_id,
308                        last_processed_seq = batch_last_sequence_index,
309                        next_sequence_cursor,
310                        "Processed {len} bridge records"
311                    );
312                }
313            }
314
315            if end_index >= on_chain_next_sequence_index - 1 {
316                // we have processed all records up to the latest checkpoint
317                // so we can update the latest checkpoint metric
318                notify.notify_one();
319            }
320        }
321    }
322
323    fn bridge_record_to_event(
324        record: &sui_types::bridge::MoveTypeBridgeRecord,
325        source_chain_id: u8,
326    ) -> Result<SuiBridgeEvent, crate::error::BridgeError> {
327        let action = BridgeAction::try_from_bridge_record(record)?;
328
329        match action {
330            BridgeAction::SuiToEthTokenTransfer(transfer) => Ok(
331                SuiBridgeEvent::SuiToEthTokenBridgeV1(EmittedSuiToEthTokenBridgeV1 {
332                    nonce: transfer.nonce,
333                    sui_chain_id: transfer.sui_chain_id,
334                    eth_chain_id: transfer.eth_chain_id,
335                    sui_address: transfer.sui_address,
336                    eth_address: transfer.eth_address,
337                    token_id: transfer.token_id,
338                    amount_sui_adjusted: transfer.amount_adjusted,
339                }),
340            ),
341            _ => Err(crate::error::BridgeError::Generic(format!(
342                "Unexpected action type for source_chain_id {}: {:?}",
343                source_chain_id, action
344            ))),
345        }
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use super::*;
352
353    use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
354    use prometheus::Registry;
355    use sui_json_rpc_types::EventPage;
356    use sui_types::bridge::{BridgeChainId, MoveTypeBridgeMessage, MoveTypeBridgeRecord};
357    use sui_types::{Identifier, digests::TransactionDigest, event::EventID};
358    use tokio::time::timeout;
359
360    #[tokio::test]
361    async fn test_sui_syncer_basic() -> anyhow::Result<()> {
362        telemetry_subscribers::init_for_testing();
363        let registry = Registry::new();
364        mysten_metrics::init_metrics(&registry);
365        let metrics = Arc::new(BridgeMetrics::new(&registry));
366        let mock = SuiMockClient::default();
367        let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
368        let module_foo = Identifier::new("Foo").unwrap();
369        let module_bar = Identifier::new("Bar").unwrap();
370        let empty_events = EventPage::empty();
371        let cursor = EventID {
372            tx_digest: TransactionDigest::random(),
373            event_seq: 0,
374        };
375        add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
376        add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
377
378        let target_modules = HashMap::from_iter(vec![
379            (module_foo.clone(), Some(cursor)),
380            (module_bar.clone(), Some(cursor)),
381        ]);
382        let interval = Duration::from_millis(200);
383        let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules, metrics.clone())
384            .run(interval)
385            .await
386            .unwrap();
387
388        // Initially there are no events
389        assert_no_more_events(interval, &mut events_rx).await;
390
391        mock.set_latest_checkpoint_sequence_number(999);
392        // Module Foo has new events
393        let mut event_1: SuiEvent = SuiEvent::random_for_testing();
394        let package_id = BRIDGE_PACKAGE_ID;
395        event_1.type_.address = package_id.into();
396        event_1.type_.module = module_foo.clone();
397        let module_foo_events_1: sui_json_rpc_types::Page<SuiEvent, EventID> = EventPage {
398            data: vec![event_1.clone(), event_1.clone()],
399            next_cursor: Some(event_1.id),
400            has_next_page: false,
401        };
402        add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
403        add_event_response(
404            &mock,
405            module_foo.clone(),
406            cursor,
407            module_foo_events_1.clone(),
408        );
409
410        let (identifier, received_events) = events_rx.recv().await.unwrap();
411        assert_eq!(identifier, module_foo);
412        assert_eq!(received_events.len(), 2);
413        assert_eq!(received_events[0].id, event_1.id);
414        assert_eq!(received_events[1].id, event_1.id);
415        // No more
416        assert_no_more_events(interval, &mut events_rx).await;
417        assert_eq!(
418            metrics
419                .last_synced_sui_checkpoints
420                .get_metric_with_label_values(&["Foo"])
421                .unwrap()
422                .get(),
423            999
424        );
425
426        // Module Bar has new events
427        let mut event_2: SuiEvent = SuiEvent::random_for_testing();
428        event_2.type_.address = package_id.into();
429        event_2.type_.module = module_bar.clone();
430        let module_bar_events_1 = EventPage {
431            data: vec![event_2.clone()],
432            next_cursor: Some(event_2.id),
433            has_next_page: true, // Set to true so that the syncer will not update the last synced checkpoint
434        };
435        add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
436
437        add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
438
439        let (identifier, received_events) = events_rx.recv().await.unwrap();
440        assert_eq!(identifier, module_bar);
441        assert_eq!(received_events.len(), 1);
442        assert_eq!(received_events[0].id, event_2.id);
443        // No more
444        assert_no_more_events(interval, &mut events_rx).await;
445        assert_eq!(
446            metrics
447                .last_synced_sui_checkpoints
448                .get_metric_with_label_values(&["Bar"])
449                .unwrap()
450                .get(),
451            0, // Not updated
452        );
453
454        Ok(())
455    }
456
457    async fn assert_no_more_events<T: std::fmt::Debug>(
458        interval: Duration,
459        events_rx: &mut mysten_metrics::metered_channel::Receiver<T>,
460    ) {
461        match timeout(interval * 2, events_rx.recv()).await {
462            Err(_e) => (),
463            other => panic!("Should have timed out, but got: {:?}", other),
464        };
465    }
466
467    fn add_event_response(
468        mock: &SuiMockClient,
469        module: Identifier,
470        cursor: EventID,
471        events: EventPage,
472    ) {
473        mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
474    }
475
476    /// Creates a test bridge record with valid BCS-encoded payload
477    fn create_test_bridge_record(
478        seq_num: u64,
479        source_chain: BridgeChainId,
480        target_chain: BridgeChainId,
481        amount: u64,
482    ) -> MoveTypeBridgeRecord {
483        // Create the payload struct matching SuiToEthOnChainBcsPayload
484        #[derive(serde::Serialize)]
485        struct TestPayload {
486            sui_address: Vec<u8>,
487            target_chain: u8,
488            eth_address: Vec<u8>,
489            token_type: u8,
490            amount: [u8; 8],
491        }
492
493        let payload = TestPayload {
494            sui_address: vec![0u8; 32], // 32-byte SuiAddress
495            target_chain: target_chain as u8,
496            eth_address: vec![0u8; 20], // 20-byte EthAddress
497            token_type: 1,              // SUI token
498            amount: amount.to_be_bytes(),
499        };
500
501        let payload_bytes = bcs::to_bytes(&payload).unwrap();
502
503        MoveTypeBridgeRecord {
504            message: MoveTypeBridgeMessage {
505                message_type: 0, // TokenTransfer
506                message_version: 1,
507                seq_num,
508                source_chain: source_chain as u8,
509                payload: payload_bytes,
510            },
511            verified_signatures: None,
512            claimed: false,
513        }
514    }
515
516    #[tokio::test]
517    async fn test_sui_syncer_grpc_basic() -> anyhow::Result<()> {
518        telemetry_subscribers::init_for_testing();
519        let registry = Registry::new();
520        mysten_metrics::init_metrics(&registry);
521        let metrics = Arc::new(BridgeMetrics::new(&registry));
522        let mock = SuiMockClient::default();
523        let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
524
525        let source_chain_id = BridgeChainId::SuiCustom as u8;
526        let target_modules = HashMap::new(); // Not used for gRPC mode
527
528        let interval = Duration::from_millis(200);
529        let batch_size = 10;
530        let next_sequence_number = 0;
531
532        // Initially, no records on chain
533        mock.set_next_seq_num(source_chain_id, 0);
534
535        let (_handles, mut events_rx) =
536            SuiSyncer::new(client.clone(), target_modules.clone(), metrics.clone())
537                .run_grpc(source_chain_id, next_sequence_number, interval, batch_size)
538                .await
539                .unwrap();
540
541        // Initially there are no records
542        assert_no_more_events(interval, &mut events_rx).await;
543
544        mock.set_latest_checkpoint_sequence_number(1000);
545
546        // Add some bridge records
547        let record_0 =
548            create_test_bridge_record(0, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 1000);
549        let record_1 =
550            create_test_bridge_record(1, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 2000);
551
552        mock.add_bridge_record(source_chain_id, 0, record_0);
553        mock.add_bridge_record(source_chain_id, 1, record_1);
554        mock.set_next_seq_num(source_chain_id, 2); // 2 records available (0 and 1)
555
556        let (next_cursor, received_events) = events_rx.recv().await.unwrap();
557        assert_eq!(received_events.len(), 2);
558        assert_eq!(next_cursor, 2); // Next sequence number to process
559
560        match &received_events[0] {
561            SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
562                assert_eq!(event.nonce, 0);
563                assert_eq!(event.sui_chain_id, BridgeChainId::SuiCustom);
564                assert_eq!(event.eth_chain_id, BridgeChainId::EthCustom);
565                assert_eq!(event.amount_sui_adjusted, 1000);
566            }
567            _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
568        }
569        match &received_events[1] {
570            SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
571                assert_eq!(event.nonce, 1);
572                assert_eq!(event.amount_sui_adjusted, 2000);
573            }
574            _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
575        }
576
577        // No more events should be received
578        assert_no_more_events(interval, &mut events_rx).await;
579        assert_eq!(
580            metrics
581                .last_synced_sui_checkpoints
582                .get_metric_with_label_values(&[&source_chain_id.to_string()])
583                .unwrap()
584                .get(),
585            1000
586        );
587
588        Ok(())
589    }
590}