sui_bridge/
sui_syncer.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! The SuiSyncer module is responsible for synchronizing Events emitted
5//! on Sui blockchain from concerned modules of bridge package 0x9.
6//!
7//! There are two modes of operation:
8//! - Event-based (legacy): Uses JSON-RPC to query events by module
9//! - gRPC-based (new): Iterates over bridge records using LinkedTable iteration
10//!
11//! As of now, only the event-based mode is being used.
12
13use crate::{
14    error::BridgeResult,
15    events::{EmittedSuiToEthTokenBridgeV1, EmittedSuiToEthTokenBridgeV2, SuiBridgeEvent},
16    metrics::BridgeMetrics,
17    retry_with_max_elapsed_time,
18    sui_client::{SuiClient, SuiClientInner},
19    types::BridgeAction,
20};
21use mysten_metrics::spawn_logged_monitored_task;
22use std::{collections::HashMap, sync::Arc};
23use sui_types::{Identifier, event::EventID};
24use tokio::{
25    sync::Notify,
26    task::JoinHandle,
27    time::{self, Duration},
28};
29
30const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
31
32/// Map from contract address to their start cursor (exclusive)
33pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
34
35pub type GrpcSyncedEvents = (u64, Vec<SuiBridgeEvent>);
36
37pub struct SuiSyncer<C> {
38    sui_client: Arc<SuiClient<C>>,
39    // The last transaction that the syncer has fully processed.
40    // Syncer will resume post this transaction (i.e. exclusive), when it starts.
41    #[allow(unused)]
42    cursors: SuiTargetModules,
43    metrics: Arc<BridgeMetrics>,
44}
45
46impl<C> SuiSyncer<C>
47where
48    C: SuiClientInner + 'static,
49{
50    pub fn new(
51        sui_client: Arc<SuiClient<C>>,
52        cursors: SuiTargetModules,
53        metrics: Arc<BridgeMetrics>,
54    ) -> Self {
55        Self {
56            sui_client,
57            cursors,
58            metrics,
59        }
60    }
61
62    pub async fn run_grpc(
63        self,
64        source_chain_id: u8,
65        next_sequence_number: u64,
66        query_interval: Duration,
67        batch_size: u64,
68    ) -> BridgeResult<(
69        Vec<JoinHandle<()>>,
70        mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
71    )> {
72        let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
73            SUI_EVENTS_CHANNEL_SIZE,
74            &mysten_metrics::get_metrics()
75                .unwrap()
76                .channel_inflight
77                .with_label_values(&["sui_grpc_events_queue"]),
78        );
79
80        let task_handle = spawn_logged_monitored_task!(Self::run_grpc_listening_task(
81            source_chain_id,
82            next_sequence_number,
83            events_tx,
84            self.sui_client.clone(),
85            query_interval,
86            batch_size,
87            self.metrics.clone(),
88        ));
89
90        Ok((vec![task_handle], events_rx))
91    }
92
93    async fn run_grpc_listening_task(
94        source_chain_id: u8,
95        mut next_sequence_cursor: u64,
96        events_sender: mysten_metrics::metered_channel::Sender<GrpcSyncedEvents>,
97        sui_client: Arc<SuiClient<C>>,
98        query_interval: Duration,
99        batch_size: u64,
100        metrics: Arc<BridgeMetrics>,
101    ) {
102        tracing::info!(
103            source_chain_id,
104            next_sequence_cursor,
105            "Starting sui grpc records listening task"
106        );
107        let mut interval = time::interval(query_interval);
108        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
109
110        // Create a task to update metrics
111        let notify = Arc::new(Notify::new());
112        let notify_clone = notify.clone();
113        let sui_client_clone = sui_client.clone();
114        let chain_label = source_chain_id.to_string();
115        let last_synced_sui_checkpoints_metric = metrics
116            .last_synced_sui_checkpoints
117            .with_label_values(&[&chain_label]);
118        spawn_logged_monitored_task!(async move {
119            loop {
120                notify_clone.notified().await;
121                let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
122                    sui_client_clone.get_latest_checkpoint_sequence_number(),
123                    Duration::from_secs(120)
124                ) else {
125                    tracing::error!(
126                        "Failed to query latest checkpoint sequence number from sui client after retry"
127                    );
128                    continue;
129                };
130                last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
131            }
132        });
133
134        loop {
135            interval.tick().await;
136            let Ok(Ok(on_chain_next_sequence_index)) = retry_with_max_elapsed_time!(
137                sui_client.get_token_transfer_next_seq_number(source_chain_id),
138                Duration::from_secs(120)
139            ) else {
140                tracing::error!(
141                    source_chain_id,
142                    "Failed to get next seq num from sui client after retry"
143                );
144                continue;
145            };
146
147            // start querying from the next_sequence_cursor till on_chain_next_sequence_index in batches
148            let start_index = next_sequence_cursor;
149            if start_index >= on_chain_next_sequence_index {
150                notify.notify_one();
151                continue;
152            }
153
154            let end_index = std::cmp::min(
155                start_index + batch_size - 1,
156                on_chain_next_sequence_index - 1,
157            );
158
159            let Ok(Ok(records)) = retry_with_max_elapsed_time!(
160                sui_client.get_bridge_records_in_range(source_chain_id, start_index, end_index),
161                Duration::from_secs(120)
162            ) else {
163                tracing::error!(
164                    source_chain_id,
165                    start_index,
166                    end_index,
167                    "Failed to get records from sui client after retry"
168                );
169                continue;
170            };
171
172            let len = records.len();
173            if len != 0 {
174                let mut events = Vec::with_capacity(len);
175                let mut batch_last_sequence_index = start_index;
176
177                for (seq_index, record) in records {
178                    let event = match Self::bridge_record_to_event(&record, source_chain_id) {
179                        Ok(event) => event,
180                        Err(e) => {
181                            tracing::error!(
182                                source_chain_id,
183                                seq_index,
184                                "Failed to convert record to event: {:?}",
185                                e
186                            );
187                            continue;
188                        }
189                    };
190
191                    events.push(event);
192                    batch_last_sequence_index = seq_index;
193                }
194
195                if !events.is_empty() {
196                    events_sender
197                        .send((batch_last_sequence_index + 1, events))
198                        .await
199                        .expect("Bridge events channel receiver is closed");
200
201                    next_sequence_cursor = batch_last_sequence_index + 1;
202                    tracing::info!(
203                        source_chain_id,
204                        last_processed_seq = batch_last_sequence_index,
205                        next_sequence_cursor,
206                        "Processed {len} bridge records"
207                    );
208                }
209            }
210
211            if end_index >= on_chain_next_sequence_index - 1 {
212                // we have processed all records up to the latest checkpoint
213                // so we can update the latest checkpoint metric
214                notify.notify_one();
215            }
216        }
217    }
218
219    fn bridge_record_to_event(
220        record: &sui_types::bridge::MoveTypeBridgeRecord,
221        source_chain_id: u8,
222    ) -> Result<SuiBridgeEvent, crate::error::BridgeError> {
223        let action = BridgeAction::try_from_bridge_record(record)?;
224
225        match action {
226            BridgeAction::SuiToEthTokenTransfer(transfer) => Ok(
227                SuiBridgeEvent::SuiToEthTokenBridgeV1(EmittedSuiToEthTokenBridgeV1 {
228                    nonce: transfer.nonce,
229                    sui_chain_id: transfer.sui_chain_id,
230                    eth_chain_id: transfer.eth_chain_id,
231                    sui_address: transfer.sui_address,
232                    eth_address: transfer.eth_address,
233                    token_id: transfer.token_id,
234                    amount_sui_adjusted: transfer.amount_adjusted,
235                }),
236            ),
237            BridgeAction::SuiToEthTokenTransferV2(transfer) => Ok(
238                SuiBridgeEvent::SuiToEthTokenBridgeV2(EmittedSuiToEthTokenBridgeV2 {
239                    nonce: transfer.nonce,
240                    sui_chain_id: transfer.sui_chain_id,
241                    eth_chain_id: transfer.eth_chain_id,
242                    sui_address: transfer.sui_address,
243                    eth_address: transfer.eth_address,
244                    token_id: transfer.token_id,
245                    amount_sui_adjusted: transfer.amount_adjusted,
246                    timestamp_ms: transfer.timestamp_ms,
247                }),
248            ),
249            _ => Err(crate::error::BridgeError::Generic(format!(
250                "Unexpected action type for source_chain_id {}: {:?}",
251                source_chain_id, action
252            ))),
253        }
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
262    use prometheus::Registry;
263    use sui_types::bridge::{BridgeChainId, MoveTypeBridgeMessage, MoveTypeBridgeRecord};
264    use tokio::time::timeout;
265
266    async fn assert_no_more_events<T: std::fmt::Debug>(
267        interval: Duration,
268        events_rx: &mut mysten_metrics::metered_channel::Receiver<T>,
269    ) {
270        match timeout(interval * 2, events_rx.recv()).await {
271            Err(_e) => (),
272            other => panic!("Should have timed out, but got: {:?}", other),
273        };
274    }
275
276    /// Creates a test bridge record with valid BCS-encoded payload
277    fn create_test_bridge_record(
278        seq_num: u64,
279        source_chain: BridgeChainId,
280        target_chain: BridgeChainId,
281        amount: u64,
282    ) -> MoveTypeBridgeRecord {
283        // Create the payload struct matching SuiToEthOnChainBcsPayload
284        #[derive(serde::Serialize)]
285        struct TestPayload {
286            sui_address: Vec<u8>,
287            target_chain: u8,
288            eth_address: Vec<u8>,
289            token_type: u8,
290            amount: [u8; 8],
291        }
292
293        let payload = TestPayload {
294            sui_address: vec![0u8; 32], // 32-byte SuiAddress
295            target_chain: target_chain as u8,
296            eth_address: vec![0u8; 20], // 20-byte EthAddress
297            token_type: 1,              // SUI token
298            amount: amount.to_be_bytes(),
299        };
300
301        let payload_bytes = bcs::to_bytes(&payload).unwrap();
302
303        MoveTypeBridgeRecord {
304            message: MoveTypeBridgeMessage {
305                message_type: 0, // TokenTransfer
306                message_version: 1,
307                seq_num,
308                source_chain: source_chain as u8,
309                payload: payload_bytes,
310            },
311            verified_signatures: None,
312            claimed: false,
313        }
314    }
315
316    #[tokio::test]
317    async fn test_sui_syncer_grpc_basic() -> anyhow::Result<()> {
318        telemetry_subscribers::init_for_testing();
319        let registry = Registry::new();
320        mysten_metrics::init_metrics(&registry);
321        let metrics = Arc::new(BridgeMetrics::new(&registry));
322        let mock = SuiMockClient::default();
323        let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
324
325        let source_chain_id = BridgeChainId::SuiCustom as u8;
326        let target_modules = HashMap::new(); // Not used for gRPC mode
327
328        let interval = Duration::from_millis(200);
329        let batch_size = 10;
330        let next_sequence_number = 0;
331
332        // Initially, no records on chain
333        mock.set_next_seq_num(source_chain_id, 0);
334
335        let (_handles, mut events_rx) =
336            SuiSyncer::new(client.clone(), target_modules.clone(), metrics.clone())
337                .run_grpc(source_chain_id, next_sequence_number, interval, batch_size)
338                .await
339                .unwrap();
340
341        // Initially there are no records
342        assert_no_more_events(interval, &mut events_rx).await;
343
344        mock.set_latest_checkpoint_sequence_number(1000);
345
346        // Add some bridge records
347        let record_0 =
348            create_test_bridge_record(0, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 1000);
349        let record_1 =
350            create_test_bridge_record(1, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 2000);
351
352        mock.add_bridge_record(source_chain_id, 0, record_0);
353        mock.add_bridge_record(source_chain_id, 1, record_1);
354        mock.set_next_seq_num(source_chain_id, 2); // 2 records available (0 and 1)
355
356        let (next_cursor, received_events) = events_rx.recv().await.unwrap();
357        assert_eq!(received_events.len(), 2);
358        assert_eq!(next_cursor, 2); // Next sequence number to process
359
360        match &received_events[0] {
361            SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
362                assert_eq!(event.nonce, 0);
363                assert_eq!(event.sui_chain_id, BridgeChainId::SuiCustom);
364                assert_eq!(event.eth_chain_id, BridgeChainId::EthCustom);
365                assert_eq!(event.amount_sui_adjusted, 1000);
366            }
367            _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
368        }
369        match &received_events[1] {
370            SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
371                assert_eq!(event.nonce, 1);
372                assert_eq!(event.amount_sui_adjusted, 2000);
373            }
374            _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
375        }
376
377        // No more events should be received
378        assert_no_more_events(interval, &mut events_rx).await;
379        assert_eq!(
380            metrics
381                .last_synced_sui_checkpoints
382                .get_metric_with_label_values(&[&source_chain_id.to_string()])
383                .unwrap()
384                .get(),
385            1000
386        );
387
388        Ok(())
389    }
390}