1use crate::{
14 error::BridgeResult,
15 events::{EmittedSuiToEthTokenBridgeV1, SuiBridgeEvent},
16 metrics::BridgeMetrics,
17 retry_with_max_elapsed_time,
18 sui_client::{SuiClient, SuiClientInner},
19 types::BridgeAction,
20};
21use mysten_metrics::spawn_logged_monitored_task;
22use std::{collections::HashMap, sync::Arc};
23use sui_json_rpc_types::SuiEvent;
24use sui_types::BRIDGE_PACKAGE_ID;
25use sui_types::{Identifier, event::EventID};
26use tokio::{
27 sync::Notify,
28 task::JoinHandle,
29 time::{self, Duration},
30};
31
32const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
33
34pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
36
37pub type GrpcSyncedEvents = (u64, Vec<SuiBridgeEvent>);
38
39pub struct SuiSyncer<C> {
40 sui_client: Arc<SuiClient<C>>,
41 cursors: SuiTargetModules,
44 metrics: Arc<BridgeMetrics>,
45}
46
47impl<C> SuiSyncer<C>
48where
49 C: SuiClientInner + 'static,
50{
51 pub fn new(
52 sui_client: Arc<SuiClient<C>>,
53 cursors: SuiTargetModules,
54 metrics: Arc<BridgeMetrics>,
55 ) -> Self {
56 Self {
57 sui_client,
58 cursors,
59 metrics,
60 }
61 }
62
63 pub async fn run(
64 self,
65 query_interval: Duration,
66 ) -> BridgeResult<(
67 Vec<JoinHandle<()>>,
68 mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
69 )> {
70 let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
71 SUI_EVENTS_CHANNEL_SIZE,
72 &mysten_metrics::get_metrics()
73 .unwrap()
74 .channel_inflight
75 .with_label_values(&["sui_events_queue"]),
76 );
77
78 let mut task_handles = vec![];
79 for (module, cursor) in self.cursors {
80 let metrics = self.metrics.clone();
81 let events_rx_clone: mysten_metrics::metered_channel::Sender<(
82 Identifier,
83 Vec<SuiEvent>,
84 )> = events_tx.clone();
85 let sui_client_clone = self.sui_client.clone();
86 task_handles.push(spawn_logged_monitored_task!(
87 Self::run_event_listening_task(
88 module,
89 cursor,
90 events_rx_clone,
91 sui_client_clone,
92 query_interval,
93 metrics,
94 )
95 ));
96 }
97 Ok((task_handles, events_rx))
98 }
99
100 async fn run_event_listening_task(
101 module: Identifier,
104 mut cursor: Option<EventID>,
105 events_sender: mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
106 sui_client: Arc<SuiClient<C>>,
107 query_interval: Duration,
108 metrics: Arc<BridgeMetrics>,
109 ) {
110 tracing::info!(?module, ?cursor, "Starting sui events listening task");
111 let mut interval = time::interval(query_interval);
112 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
113
114 let notify = Arc::new(Notify::new());
116 let notify_clone = notify.clone();
117 let sui_client_clone = sui_client.clone();
118 let last_synced_sui_checkpoints_metric = metrics
119 .last_synced_sui_checkpoints
120 .with_label_values(&[&module.to_string()]);
121 spawn_logged_monitored_task!(async move {
122 loop {
123 notify_clone.notified().await;
124 let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
125 sui_client_clone.get_latest_checkpoint_sequence_number(),
126 Duration::from_secs(120)
127 ) else {
128 tracing::error!(
129 "Failed to query latest checkpoint sequence number from sui client after retry"
130 );
131 continue;
132 };
133 last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
134 }
135 });
136
137 loop {
138 interval.tick().await;
139 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
140 sui_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
141 Duration::from_secs(120)
142 ) else {
143 tracing::error!("Failed to query events from sui client after retry");
144 continue;
145 };
146
147 let len = events.data.len();
148 if len != 0 {
149 if !events.has_next_page {
150 notify.notify_one();
153 }
154 events_sender
155 .send((module.clone(), events.data))
156 .await
157 .expect("All Sui event channel receivers are closed");
158 if let Some(next) = events.next_cursor {
159 cursor = Some(next);
160 }
161 tracing::info!(?module, ?cursor, "Observed {len} new Sui events");
162 }
163 }
164 }
165
166 pub async fn run_grpc(
167 self,
168 source_chain_id: u8,
169 next_sequence_number: u64,
170 query_interval: Duration,
171 batch_size: u64,
172 ) -> BridgeResult<(
173 Vec<JoinHandle<()>>,
174 mysten_metrics::metered_channel::Receiver<GrpcSyncedEvents>,
175 )> {
176 let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
177 SUI_EVENTS_CHANNEL_SIZE,
178 &mysten_metrics::get_metrics()
179 .unwrap()
180 .channel_inflight
181 .with_label_values(&["sui_grpc_events_queue"]),
182 );
183
184 let task_handle = spawn_logged_monitored_task!(Self::run_grpc_listening_task(
185 source_chain_id,
186 next_sequence_number,
187 events_tx,
188 self.sui_client.clone(),
189 query_interval,
190 batch_size,
191 self.metrics.clone(),
192 ));
193
194 Ok((vec![task_handle], events_rx))
195 }
196
197 async fn run_grpc_listening_task(
198 source_chain_id: u8,
199 mut next_sequence_cursor: u64,
200 events_sender: mysten_metrics::metered_channel::Sender<GrpcSyncedEvents>,
201 sui_client: Arc<SuiClient<C>>,
202 query_interval: Duration,
203 batch_size: u64,
204 metrics: Arc<BridgeMetrics>,
205 ) {
206 tracing::info!(
207 source_chain_id,
208 next_sequence_cursor,
209 "Starting sui grpc records listening task"
210 );
211 let mut interval = time::interval(query_interval);
212 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
213
214 let notify = Arc::new(Notify::new());
216 let notify_clone = notify.clone();
217 let sui_client_clone = sui_client.clone();
218 let chain_label = source_chain_id.to_string();
219 let last_synced_sui_checkpoints_metric = metrics
220 .last_synced_sui_checkpoints
221 .with_label_values(&[&chain_label]);
222 spawn_logged_monitored_task!(async move {
223 loop {
224 notify_clone.notified().await;
225 let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
226 sui_client_clone.get_latest_checkpoint_sequence_number(),
227 Duration::from_secs(120)
228 ) else {
229 tracing::error!(
230 "Failed to query latest checkpoint sequence number from sui client after retry"
231 );
232 continue;
233 };
234 last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
235 }
236 });
237
238 loop {
239 interval.tick().await;
240 let Ok(Ok(on_chain_next_sequence_index)) = retry_with_max_elapsed_time!(
241 sui_client.get_token_transfer_next_seq_number(source_chain_id),
242 Duration::from_secs(120)
243 ) else {
244 tracing::error!(
245 source_chain_id,
246 "Failed to get next seq num from sui client after retry"
247 );
248 continue;
249 };
250
251 let start_index = next_sequence_cursor;
253 if start_index >= on_chain_next_sequence_index {
254 notify.notify_one();
255 continue;
256 }
257
258 let end_index = std::cmp::min(
259 start_index + batch_size - 1,
260 on_chain_next_sequence_index - 1,
261 );
262
263 let Ok(Ok(records)) = retry_with_max_elapsed_time!(
264 sui_client.get_bridge_records_in_range(source_chain_id, start_index, end_index),
265 Duration::from_secs(120)
266 ) else {
267 tracing::error!(
268 source_chain_id,
269 start_index,
270 end_index,
271 "Failed to get records from sui client after retry"
272 );
273 continue;
274 };
275
276 let len = records.len();
277 if len != 0 {
278 let mut events = Vec::with_capacity(len);
279 let mut batch_last_sequence_index = start_index;
280
281 for (seq_index, record) in records {
282 let event = match Self::bridge_record_to_event(&record, source_chain_id) {
283 Ok(event) => event,
284 Err(e) => {
285 tracing::error!(
286 source_chain_id,
287 seq_index,
288 "Failed to convert record to event: {:?}",
289 e
290 );
291 continue;
292 }
293 };
294
295 events.push(event);
296 batch_last_sequence_index = seq_index;
297 }
298
299 if !events.is_empty() {
300 events_sender
301 .send((batch_last_sequence_index + 1, events))
302 .await
303 .expect("Bridge events channel receiver is closed");
304
305 next_sequence_cursor = batch_last_sequence_index + 1;
306 tracing::info!(
307 source_chain_id,
308 last_processed_seq = batch_last_sequence_index,
309 next_sequence_cursor,
310 "Processed {len} bridge records"
311 );
312 }
313 }
314
315 if end_index >= on_chain_next_sequence_index - 1 {
316 notify.notify_one();
319 }
320 }
321 }
322
323 fn bridge_record_to_event(
324 record: &sui_types::bridge::MoveTypeBridgeRecord,
325 source_chain_id: u8,
326 ) -> Result<SuiBridgeEvent, crate::error::BridgeError> {
327 let action = BridgeAction::try_from_bridge_record(record)?;
328
329 match action {
330 BridgeAction::SuiToEthTokenTransfer(transfer) => Ok(
331 SuiBridgeEvent::SuiToEthTokenBridgeV1(EmittedSuiToEthTokenBridgeV1 {
332 nonce: transfer.nonce,
333 sui_chain_id: transfer.sui_chain_id,
334 eth_chain_id: transfer.eth_chain_id,
335 sui_address: transfer.sui_address,
336 eth_address: transfer.eth_address,
337 token_id: transfer.token_id,
338 amount_sui_adjusted: transfer.amount_adjusted,
339 }),
340 ),
341 _ => Err(crate::error::BridgeError::Generic(format!(
342 "Unexpected action type for source_chain_id {}: {:?}",
343 source_chain_id, action
344 ))),
345 }
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use super::*;
352
353 use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
354 use prometheus::Registry;
355 use sui_json_rpc_types::EventPage;
356 use sui_types::bridge::{BridgeChainId, MoveTypeBridgeMessage, MoveTypeBridgeRecord};
357 use sui_types::{Identifier, digests::TransactionDigest, event::EventID};
358 use tokio::time::timeout;
359
360 #[tokio::test]
361 async fn test_sui_syncer_basic() -> anyhow::Result<()> {
362 telemetry_subscribers::init_for_testing();
363 let registry = Registry::new();
364 mysten_metrics::init_metrics(®istry);
365 let metrics = Arc::new(BridgeMetrics::new(®istry));
366 let mock = SuiMockClient::default();
367 let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
368 let module_foo = Identifier::new("Foo").unwrap();
369 let module_bar = Identifier::new("Bar").unwrap();
370 let empty_events = EventPage::empty();
371 let cursor = EventID {
372 tx_digest: TransactionDigest::random(),
373 event_seq: 0,
374 };
375 add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
376 add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
377
378 let target_modules = HashMap::from_iter(vec![
379 (module_foo.clone(), Some(cursor)),
380 (module_bar.clone(), Some(cursor)),
381 ]);
382 let interval = Duration::from_millis(200);
383 let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules, metrics.clone())
384 .run(interval)
385 .await
386 .unwrap();
387
388 assert_no_more_events(interval, &mut events_rx).await;
390
391 mock.set_latest_checkpoint_sequence_number(999);
392 let mut event_1: SuiEvent = SuiEvent::random_for_testing();
394 let package_id = BRIDGE_PACKAGE_ID;
395 event_1.type_.address = package_id.into();
396 event_1.type_.module = module_foo.clone();
397 let module_foo_events_1: sui_json_rpc_types::Page<SuiEvent, EventID> = EventPage {
398 data: vec![event_1.clone(), event_1.clone()],
399 next_cursor: Some(event_1.id),
400 has_next_page: false,
401 };
402 add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
403 add_event_response(
404 &mock,
405 module_foo.clone(),
406 cursor,
407 module_foo_events_1.clone(),
408 );
409
410 let (identifier, received_events) = events_rx.recv().await.unwrap();
411 assert_eq!(identifier, module_foo);
412 assert_eq!(received_events.len(), 2);
413 assert_eq!(received_events[0].id, event_1.id);
414 assert_eq!(received_events[1].id, event_1.id);
415 assert_no_more_events(interval, &mut events_rx).await;
417 assert_eq!(
418 metrics
419 .last_synced_sui_checkpoints
420 .get_metric_with_label_values(&["Foo"])
421 .unwrap()
422 .get(),
423 999
424 );
425
426 let mut event_2: SuiEvent = SuiEvent::random_for_testing();
428 event_2.type_.address = package_id.into();
429 event_2.type_.module = module_bar.clone();
430 let module_bar_events_1 = EventPage {
431 data: vec![event_2.clone()],
432 next_cursor: Some(event_2.id),
433 has_next_page: true, };
435 add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
436
437 add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
438
439 let (identifier, received_events) = events_rx.recv().await.unwrap();
440 assert_eq!(identifier, module_bar);
441 assert_eq!(received_events.len(), 1);
442 assert_eq!(received_events[0].id, event_2.id);
443 assert_no_more_events(interval, &mut events_rx).await;
445 assert_eq!(
446 metrics
447 .last_synced_sui_checkpoints
448 .get_metric_with_label_values(&["Bar"])
449 .unwrap()
450 .get(),
451 0, );
453
454 Ok(())
455 }
456
457 async fn assert_no_more_events<T: std::fmt::Debug>(
458 interval: Duration,
459 events_rx: &mut mysten_metrics::metered_channel::Receiver<T>,
460 ) {
461 match timeout(interval * 2, events_rx.recv()).await {
462 Err(_e) => (),
463 other => panic!("Should have timed out, but got: {:?}", other),
464 };
465 }
466
467 fn add_event_response(
468 mock: &SuiMockClient,
469 module: Identifier,
470 cursor: EventID,
471 events: EventPage,
472 ) {
473 mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
474 }
475
476 fn create_test_bridge_record(
478 seq_num: u64,
479 source_chain: BridgeChainId,
480 target_chain: BridgeChainId,
481 amount: u64,
482 ) -> MoveTypeBridgeRecord {
483 #[derive(serde::Serialize)]
485 struct TestPayload {
486 sui_address: Vec<u8>,
487 target_chain: u8,
488 eth_address: Vec<u8>,
489 token_type: u8,
490 amount: [u8; 8],
491 }
492
493 let payload = TestPayload {
494 sui_address: vec![0u8; 32], target_chain: target_chain as u8,
496 eth_address: vec![0u8; 20], token_type: 1, amount: amount.to_be_bytes(),
499 };
500
501 let payload_bytes = bcs::to_bytes(&payload).unwrap();
502
503 MoveTypeBridgeRecord {
504 message: MoveTypeBridgeMessage {
505 message_type: 0, message_version: 1,
507 seq_num,
508 source_chain: source_chain as u8,
509 payload: payload_bytes,
510 },
511 verified_signatures: None,
512 claimed: false,
513 }
514 }
515
516 #[tokio::test]
517 async fn test_sui_syncer_grpc_basic() -> anyhow::Result<()> {
518 telemetry_subscribers::init_for_testing();
519 let registry = Registry::new();
520 mysten_metrics::init_metrics(®istry);
521 let metrics = Arc::new(BridgeMetrics::new(®istry));
522 let mock = SuiMockClient::default();
523 let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
524
525 let source_chain_id = BridgeChainId::SuiCustom as u8;
526 let target_modules = HashMap::new(); let interval = Duration::from_millis(200);
529 let batch_size = 10;
530 let next_sequence_number = 0;
531
532 mock.set_next_seq_num(source_chain_id, 0);
534
535 let (_handles, mut events_rx) =
536 SuiSyncer::new(client.clone(), target_modules.clone(), metrics.clone())
537 .run_grpc(source_chain_id, next_sequence_number, interval, batch_size)
538 .await
539 .unwrap();
540
541 assert_no_more_events(interval, &mut events_rx).await;
543
544 mock.set_latest_checkpoint_sequence_number(1000);
545
546 let record_0 =
548 create_test_bridge_record(0, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 1000);
549 let record_1 =
550 create_test_bridge_record(1, BridgeChainId::SuiCustom, BridgeChainId::EthCustom, 2000);
551
552 mock.add_bridge_record(source_chain_id, 0, record_0);
553 mock.add_bridge_record(source_chain_id, 1, record_1);
554 mock.set_next_seq_num(source_chain_id, 2); let (next_cursor, received_events) = events_rx.recv().await.unwrap();
557 assert_eq!(received_events.len(), 2);
558 assert_eq!(next_cursor, 2); match &received_events[0] {
561 SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
562 assert_eq!(event.nonce, 0);
563 assert_eq!(event.sui_chain_id, BridgeChainId::SuiCustom);
564 assert_eq!(event.eth_chain_id, BridgeChainId::EthCustom);
565 assert_eq!(event.amount_sui_adjusted, 1000);
566 }
567 _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
568 }
569 match &received_events[1] {
570 SuiBridgeEvent::SuiToEthTokenBridgeV1(event) => {
571 assert_eq!(event.nonce, 1);
572 assert_eq!(event.amount_sui_adjusted, 2000);
573 }
574 _ => panic!("Expected SuiToEthTokenBridgeV1 event"),
575 }
576
577 assert_no_more_events(interval, &mut events_rx).await;
579 assert_eq!(
580 metrics
581 .last_synced_sui_checkpoints
582 .get_metric_with_label_values(&[&source_chain_id.to_string()])
583 .unwrap()
584 .get(),
585 1000
586 );
587
588 Ok(())
589 }
590}