1use crate::{
14 error::BridgeResult,
15 events::{EmittedSuiToEthTokenBridgeV1, EmittedSuiToEthTokenBridgeV2, SuiBridgeEvent},
16 metrics::BridgeMetrics,
17 retry_with_max_elapsed_time,
18 sui_client::{SuiClient, SuiClientInner},
19 types::BridgeAction,
20};
21use mysten_metrics::spawn_logged_monitored_task;
22use std::{collections::HashMap, sync::Arc};
23use sui_types::{Identifier, event::EventID};
24use tokio::{
25 sync::Notify,
26 task::JoinHandle,
27 time::{self, Duration},
28};
29
30const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
31
32pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
34
35pub type GrpcSyncedEvents = (u64, Vec<SuiBridgeEvent>);
36
37pub struct SuiSyncer<C> {
38 sui_client: Arc<SuiClient<C>>,
39 #[allow(unused)]
42 cursors: SuiTargetModules,
43 metrics: Arc<BridgeMetrics>,
44}
45
46impl<C> SuiSyncer<C>
47where
48 C: SuiClientInner + 'static,
49{
50 pub fn new(
51 sui_client: Arc<SuiClient<C>>,
52 cursors: SuiTargetModules,
53 metrics: Arc<BridgeMetrics>,
54 ) -> Self {
55 Self {
56 sui_client,
57 cursors,
58 metrics,
59 }
60 }
61
62 pub async fn run_grpc(
63 self,
64 source_chain_id: u8,
65 next_sequence_number: u64,
66 query_interval: Duration,
67 batch_size: u64,
68 ) -> BridgeResult<(
69 Vec<JoinHandle<()>>,
70 mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
71 )> {
72 let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
73 SUI_EVENTS_CHANNEL_SIZE,
74 &mysten_metrics::get_metrics()
75 .unwrap()
76 .channel_inflight
77 .with_label_values(&["sui_grpc_events_queue"]),
78 );
79
80 let task_handle = spawn_logged_monitored_task!(Self::run_grpc_listening_task(
81 source_chain_id,
82 next_sequence_number,
83 events_tx,
84 self.sui_client.clone(),
85 query_interval,
86 batch_size,
87 self.metrics.clone(),
88 ));
89
90 Ok((vec![task_handle], events_rx))
91 }
92
93 async fn run_grpc_listening_task(
94 source_chain_id: u8,
95 mut next_sequence_cursor: u64,
96 events_sender: mysten_metrics::metered_channel::Sender<GrpcSyncedEvents>,
97 sui_client: Arc<SuiClient<C>>,
98 query_interval: Duration,
99 batch_size: u64,
100 metrics: Arc<BridgeMetrics>,
101 ) {
102 tracing::info!(
103 source_chain_id,
104 next_sequence_cursor,
105 "Starting sui grpc records listening task"
106 );
107 let mut interval = time::interval(query_interval);
108 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
109
110 let notify = Arc::new(Notify::new());
112 let notify_clone = notify.clone();
113 let sui_client_clone = sui_client.clone();
114 let chain_label = source_chain_id.to_string();
115 let last_synced_sui_checkpoints_metric = metrics
116 .last_synced_sui_checkpoints
117 .with_label_values(&[&chain_label]);
118 spawn_logged_monitored_task!(async move {
119 loop {
120 notify_clone.notified().await;
121 let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
122 sui_client_clone.get_latest_checkpoint_sequence_number(),
123 Duration::from_secs(120)
124 ) else {
125 tracing::error!(
126 "Failed to query latest checkpoint sequence number from sui client after retry"
127 );
128 continue;
129 };
130 last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
131 }
132 });
133
134 loop {
135 interval.tick().await;
136 let Ok(Ok(on_chain_next_sequence_index)) = retry_with_max_elapsed_time!(
137 sui_client.get_token_transfer_next_seq_number(source_chain_id),
138 Duration::from_secs(120)
139 ) else {
140 tracing::error!(
141 source_chain_id,
142 "Failed to get next seq num from sui client after retry"
143 );
144 continue;
145 };
146
147 let start_index = next_sequence_cursor;
149 if start_index >= on_chain_next_sequence_index {
150 notify.notify_one();
151 continue;
152 }
153
154 let end_index = std::cmp::min(
155 start_index + batch_size - 1,
156 on_chain_next_sequence_index - 1,
157 );
158
159 let Ok(Ok(records)) = retry_with_max_elapsed_time!(
160 sui_client.get_bridge_records_in_range(source_chain_id, start_index, end_index),
161 Duration::from_secs(120)
162 ) else {
163 tracing::error!(
164 source_chain_id,
165 start_index,
166 end_index,
167 "Failed to get records from sui client after retry"
168 );
169 continue;
170 };
171
172 let len = records.len();
173 if len != 0 {
174 let mut events = Vec::with_capacity(len);
175 let mut batch_last_sequence_index = start_index;
176
177 for (seq_index, record) in records {
178 let event = match Self::bridge_record_to_event(&record, source_chain_id) {
179 Ok(event) => event,
180 Err(e) => {
181 tracing::error!(
182 source_chain_id,
183 seq_index,
184 "Failed to convert record to event: {:?}",
185 e
186 );
187 continue;
188 }
189 };
190
191 events.push(event);
192 batch_last_sequence_index = seq_index;
193 }
194
195 if !events.is_empty() {
196 events_sender
197 .send((batch_last_sequence_index + 1, events))
198 .await
199 .expect("Bridge events channel receiver is closed");
200
201 next_sequence_cursor = batch_last_sequence_index + 1;
202 tracing::info!(
203 source_chain_id,
204 last_processed_seq = batch_last_sequence_index,
205 next_sequence_cursor,
206 "Processed {len} bridge records"
207 );
208 }
209 }
210
211 if end_index >= on_chain_next_sequence_index - 1 {
212 notify.notify_one();
215 }
216 }
217 }
218
219 fn bridge_record_to_event(
220 record: &sui_types::bridge::MoveTypeBridgeRecord,
221 source_chain_id: u8,
222 ) -> Result<SuiBridgeEvent, crate::error::BridgeError> {
223 let action = BridgeAction::try_from_bridge_record(record)?;
224
225 match action {
226 BridgeAction::SuiToEthTokenTransfer(transfer) => Ok(
227 SuiBridgeEvent::SuiToEthTokenBridgeV1(EmittedSuiToEthTokenBridgeV1 {
228 nonce: transfer.nonce,
229 sui_chain_id: transfer.sui_chain_id,
230 eth_chain_id: transfer.eth_chain_id,
231 sui_address: transfer.sui_address,
232 eth_address: transfer.eth_address,
233 token_id: transfer.token_id,
234 amount_sui_adjusted: transfer.amount_adjusted,
235 }),
236 ),
237 BridgeAction::SuiToEthTokenTransferV2(transfer) => Ok(
238 SuiBridgeEvent::SuiToEthTokenBridgeV2(EmittedSuiToEthTokenBridgeV2 {
239 nonce: transfer.nonce,
240 sui_chain_id: transfer.sui_chain_id,
241 eth_chain_id: transfer.eth_chain_id,
242 sui_address: transfer.sui_address,
243 eth_address: transfer.eth_address,
244 token_id: transfer.token_id,
245 amount_sui_adjusted: transfer.amount_adjusted,
246 timestamp_ms: transfer.timestamp_ms,
247 }),
248 ),
249 _ => Err(crate::error::BridgeError::Generic(format!(
250 "Unexpected action type for source_chain_id {}: {:?}",
251 source_chain_id, action
252 ))),
253 }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
262 use prometheus::Registry;
263 use sui_types::bridge::{BridgeChainId, MoveTypeBridgeMessage, MoveTypeBridgeRecord};
264 use tokio::time::timeout;
265
266 async fn assert_no_more_events<T: std::fmt::Debug>(
267 interval: Duration,
268 events_rx: &mut mysten_metrics::metered_channel::Receiver<T>,
269 ) {
270 match timeout(interval * 2, events_rx.recv()).await {
271 Err(_e) => (),
272 other => panic!("Should have timed out, but got: {:?}", other),
273 };
274 }
275
276 fn create_test_bridge_record(
278 seq_num: u64,
279 source_chain: BridgeChainId,
280 target_chain: BridgeChainId,
281 amount: u64,
282 ) -> MoveTypeBridgeRecord {
283 #[derive(serde::Serialize)]
285 struct TestPayload {
286 sui_address: Vec<u8>,
287 target_chain: u8,
288 eth_address: Vec<u8>,
289 token_type: u8,
290 amount: [u8; 8],
291 }
292
293 let payload = TestPayload {
294 sui_address: vec![0u8; 32], target_chain: target_chain as u8,
296 eth_address: vec![0u8; 20], token_type: 1, amount: amount.to_be_bytes(),
299 };
300
301 let payload_bytes = bcs::to_bytes(&payload).unwrap();
302
303 MoveTypeBridgeRecord {
304 message: MoveTypeBridgeMessage {
305 message_type: 0, message_version: 1,
307 seq_num,
308 source_chain: source_chain as u8,
309 payload: payload_bytes,
310 },
311 verified_signatures: None,
312 claimed: false,
313 }
314 }
315
316 #[tokio::test]
317 async fn test_sui_syncer_grpc_basic() -> anyhow::Result<()> {
318 telemetry_subscribers::init_for_testing();
319 let registry = Registry::new();
320 mysten_metrics::init_metrics(®istry);
321 let metrics = Arc::new(BridgeMetrics::new(®istry));
322 let mock = SuiMockClient::default();
323 let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
324
325 let source_chain_id = BridgeChainId::SuiCustom as u8;
326 let target_modules = HashMap::new(); let interval = Duration::from_millis(200);
329 let batch_size = 10;
330 let next_sequence_number = 0;
331
332 mock.set_next_seq_num(source_chain_id, 0);
334
335 let (_handles, mut events_rx) =
336 SuiSyncer::new(client.clone(), target_modules.clone(), metrics.clone())
337 .run_grpc(source_chain_id, next_sequence_number, interval, batch_size)
338 .await
339 .unwrap();
340
341 assert_no_more_events(interval, &mut events_rx).await;
343
344 mock.set_latest_checkpoint_sequence_number(1000);
345
346 let record_0 =
348 create_test_bridge_record(0, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 1000);
349 let record_1 =
350 create_test_bridge_record(1, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 2000);
351
352 mock.add_bridge_record(source_chain_id, 0, record_0);
353 mock.add_bridge_record(source_chain_id, 1, record_1);
354 mock.set_next_seq_num(source_chain_id, 2); let (next_cursor, received_events) = events_rx.recv().await.unwrap();
357 assert_eq!(received_events.len(), 2);
358 assert_eq!(next_cursor, 2); match &received_events[0] {
361 SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
362 assert_eq!(event.nonce, 0);
363 assert_eq!(event.sui_chain_id, BridgeChainId::SuiCustom);
364 assert_eq!(event.eth_chain_id, BridgeChainId::EthCustom);
365 assert_eq!(event.amount_sui_adjusted, 1000);
366 }
367 _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
368 }
369 match &received_events[1] {
370 SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
371 assert_eq!(event.nonce, 1);
372 assert_eq!(event.amount_sui_adjusted, 2000);
373 }
374 _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
375 }
376
377 assert_no_more_events(interval, &mut events_rx).await;
379 assert_eq!(
380 metrics
381 .last_synced_sui_checkpoints
382 .get_metric_with_label_values(&[&source_chain_id.to_string()])
383 .unwrap()
384 .get(),
385 1000
386 );
387
388 Ok(())
389 }
390}