1use crate::{
8    error::BridgeResult,
9    metrics::BridgeMetrics,
10    retry_with_max_elapsed_time,
11    sui_client::{SuiClient, SuiClientInner},
12};
13use mysten_metrics::spawn_logged_monitored_task;
14use std::{collections::HashMap, sync::Arc};
15use sui_json_rpc_types::SuiEvent;
16use sui_types::BRIDGE_PACKAGE_ID;
17use sui_types::{Identifier, event::EventID};
18use tokio::{
19    sync::Notify,
20    task::JoinHandle,
21    time::{self, Duration},
22};
23
24const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
25
26pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
28
29pub struct SuiSyncer<C> {
30    sui_client: Arc<SuiClient<C>>,
31    cursors: SuiTargetModules,
34    metrics: Arc<BridgeMetrics>,
35}
36
37impl<C> SuiSyncer<C>
38where
39    C: SuiClientInner + 'static,
40{
41    pub fn new(
42        sui_client: Arc<SuiClient<C>>,
43        cursors: SuiTargetModules,
44        metrics: Arc<BridgeMetrics>,
45    ) -> Self {
46        Self {
47            sui_client,
48            cursors,
49            metrics,
50        }
51    }
52
53    pub async fn run(
54        self,
55        query_interval: Duration,
56    ) -> BridgeResult<(
57        Vec<JoinHandle<()>>,
58        mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
59    )> {
60        let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
61            SUI_EVENTS_CHANNEL_SIZE,
62            &mysten_metrics::get_metrics()
63                .unwrap()
64                .channel_inflight
65                .with_label_values(&["sui_events_queue"]),
66        );
67
68        let mut task_handles = vec![];
69        for (module, cursor) in self.cursors {
70            let metrics = self.metrics.clone();
71            let events_rx_clone: mysten_metrics::metered_channel::Sender<(
72                Identifier,
73                Vec<SuiEvent>,
74            )> = events_tx.clone();
75            let sui_client_clone = self.sui_client.clone();
76            task_handles.push(spawn_logged_monitored_task!(
77                Self::run_event_listening_task(
78                    module,
79                    cursor,
80                    events_rx_clone,
81                    sui_client_clone,
82                    query_interval,
83                    metrics,
84                )
85            ));
86        }
87        Ok((task_handles, events_rx))
88    }
89
90    async fn run_event_listening_task(
91        module: Identifier,
94        mut cursor: Option<EventID>,
95        events_sender: mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
96        sui_client: Arc<SuiClient<C>>,
97        query_interval: Duration,
98        metrics: Arc<BridgeMetrics>,
99    ) {
100        tracing::info!(?module, ?cursor, "Starting sui events listening task");
101        let mut interval = time::interval(query_interval);
102        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
103
104        let notify = Arc::new(Notify::new());
106        let notify_clone = notify.clone();
107        let sui_client_clone = sui_client.clone();
108        let last_synced_sui_checkpoints_metric = metrics
109            .last_synced_sui_checkpoints
110            .with_label_values(&[&module.to_string()]);
111        spawn_logged_monitored_task!(async move {
112            loop {
113                notify_clone.notified().await;
114                let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
115                    sui_client_clone.get_latest_checkpoint_sequence_number(),
116                    Duration::from_secs(120)
117                ) else {
118                    tracing::error!(
119                        "Failed to query latest checkpoint sequence number from sui client after retry"
120                    );
121                    continue;
122                };
123                last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
124            }
125        });
126
127        loop {
128            interval.tick().await;
129            let Ok(Ok(events)) = retry_with_max_elapsed_time!(
130                sui_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
131                Duration::from_secs(120)
132            ) else {
133                tracing::error!("Failed to query events from sui client after retry");
134                continue;
135            };
136
137            let len = events.data.len();
138            if len != 0 {
139                if !events.has_next_page {
140                    notify.notify_one();
143                }
144                events_sender
145                    .send((module.clone(), events.data))
146                    .await
147                    .expect("All Sui event channel receivers are closed");
148                if let Some(next) = events.next_cursor {
149                    cursor = Some(next);
150                }
151                tracing::info!(?module, ?cursor, "Observed {len} new Sui events");
152            }
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
162    use prometheus::Registry;
163    use sui_json_rpc_types::EventPage;
164    use sui_types::{Identifier, digests::TransactionDigest, event::EventID};
165    use tokio::time::timeout;
166
167    #[tokio::test]
168    async fn test_sui_syncer_basic() -> anyhow::Result<()> {
169        telemetry_subscribers::init_for_testing();
170        let registry = Registry::new();
171        mysten_metrics::init_metrics(®istry);
172        let metrics = Arc::new(BridgeMetrics::new(®istry));
173        let mock = SuiMockClient::default();
174        let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
175        let module_foo = Identifier::new("Foo").unwrap();
176        let module_bar = Identifier::new("Bar").unwrap();
177        let empty_events = EventPage::empty();
178        let cursor = EventID {
179            tx_digest: TransactionDigest::random(),
180            event_seq: 0,
181        };
182        add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
183        add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
184
185        let target_modules = HashMap::from_iter(vec![
186            (module_foo.clone(), Some(cursor)),
187            (module_bar.clone(), Some(cursor)),
188        ]);
189        let interval = Duration::from_millis(200);
190        let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules, metrics.clone())
191            .run(interval)
192            .await
193            .unwrap();
194
195        assert_no_more_events(interval, &mut events_rx).await;
197
198        mock.set_latest_checkpoint_sequence_number(999);
199        let mut event_1: SuiEvent = SuiEvent::random_for_testing();
201        let package_id = BRIDGE_PACKAGE_ID;
202        event_1.type_.address = package_id.into();
203        event_1.type_.module = module_foo.clone();
204        let module_foo_events_1: sui_json_rpc_types::Page<SuiEvent, EventID> = EventPage {
205            data: vec![event_1.clone(), event_1.clone()],
206            next_cursor: Some(event_1.id),
207            has_next_page: false,
208        };
209        add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
210        add_event_response(
211            &mock,
212            module_foo.clone(),
213            cursor,
214            module_foo_events_1.clone(),
215        );
216
217        let (identifier, received_events) = events_rx.recv().await.unwrap();
218        assert_eq!(identifier, module_foo);
219        assert_eq!(received_events.len(), 2);
220        assert_eq!(received_events[0].id, event_1.id);
221        assert_eq!(received_events[1].id, event_1.id);
222        assert_no_more_events(interval, &mut events_rx).await;
224        assert_eq!(
225            metrics
226                .last_synced_sui_checkpoints
227                .get_metric_with_label_values(&["Foo"])
228                .unwrap()
229                .get(),
230            999
231        );
232
233        let mut event_2: SuiEvent = SuiEvent::random_for_testing();
235        event_2.type_.address = package_id.into();
236        event_2.type_.module = module_bar.clone();
237        let module_bar_events_1 = EventPage {
238            data: vec![event_2.clone()],
239            next_cursor: Some(event_2.id),
240            has_next_page: true, };
242        add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
243
244        add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
245
246        let (identifier, received_events) = events_rx.recv().await.unwrap();
247        assert_eq!(identifier, module_bar);
248        assert_eq!(received_events.len(), 1);
249        assert_eq!(received_events[0].id, event_2.id);
250        assert_no_more_events(interval, &mut events_rx).await;
252        assert_eq!(
253            metrics
254                .last_synced_sui_checkpoints
255                .get_metric_with_label_values(&["Bar"])
256                .unwrap()
257                .get(),
258            0, );
260
261        Ok(())
262    }
263
264    async fn assert_no_more_events(
265        interval: Duration,
266        events_rx: &mut mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
267    ) {
268        match timeout(interval * 2, events_rx.recv()).await {
269            Err(_e) => (),
270            other => panic!("Should have timed out, but got: {:?}", other),
271        };
272    }
273
274    fn add_event_response(
275        mock: &SuiMockClient,
276        module: Identifier,
277        cursor: EventID,
278        events: EventPage,
279    ) {
280        mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
281    }
282}