1use crate::{
8 error::BridgeResult,
9 metrics::BridgeMetrics,
10 retry_with_max_elapsed_time,
11 sui_client::{SuiClient, SuiClientInner},
12};
13use mysten_metrics::spawn_logged_monitored_task;
14use std::{collections::HashMap, sync::Arc};
15use sui_json_rpc_types::SuiEvent;
16use sui_types::BRIDGE_PACKAGE_ID;
17use sui_types::{Identifier, event::EventID};
18use tokio::{
19 sync::Notify,
20 task::JoinHandle,
21 time::{self, Duration},
22};
23
24const SUI_EVENTS_CHANNEL_SIZE: usize = 1000;
25
26pub type SuiTargetModules = HashMap<Identifier, Option<EventID>>;
28
29pub struct SuiSyncer<C> {
30 sui_client: Arc<SuiClient<C>>,
31 cursors: SuiTargetModules,
34 metrics: Arc<BridgeMetrics>,
35}
36
37impl<C> SuiSyncer<C>
38where
39 C: SuiClientInner + 'static,
40{
41 pub fn new(
42 sui_client: Arc<SuiClient<C>>,
43 cursors: SuiTargetModules,
44 metrics: Arc<BridgeMetrics>,
45 ) -> Self {
46 Self {
47 sui_client,
48 cursors,
49 metrics,
50 }
51 }
52
53 pub async fn run(
54 self,
55 query_interval: Duration,
56 ) -> BridgeResult<(
57 Vec<JoinHandle<()>>,
58 mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
59 )> {
60 let (events_tx, events_rx) = mysten_metrics::metered_channel::channel(
61 SUI_EVENTS_CHANNEL_SIZE,
62 &mysten_metrics::get_metrics()
63 .unwrap()
64 .channel_inflight
65 .with_label_values(&["sui_events_queue"]),
66 );
67
68 let mut task_handles = vec![];
69 for (module, cursor) in self.cursors {
70 let metrics = self.metrics.clone();
71 let events_rx_clone: mysten_metrics::metered_channel::Sender<(
72 Identifier,
73 Vec<SuiEvent>,
74 )> = events_tx.clone();
75 let sui_client_clone = self.sui_client.clone();
76 task_handles.push(spawn_logged_monitored_task!(
77 Self::run_event_listening_task(
78 module,
79 cursor,
80 events_rx_clone,
81 sui_client_clone,
82 query_interval,
83 metrics,
84 )
85 ));
86 }
87 Ok((task_handles, events_rx))
88 }
89
90 async fn run_event_listening_task(
91 module: Identifier,
94 mut cursor: Option<EventID>,
95 events_sender: mysten_metrics::metered_channel::Sender<(Identifier, Vec<SuiEvent>)>,
96 sui_client: Arc<SuiClient<C>>,
97 query_interval: Duration,
98 metrics: Arc<BridgeMetrics>,
99 ) {
100 tracing::info!(?module, ?cursor, "Starting sui events listening task");
101 let mut interval = time::interval(query_interval);
102 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
103
104 let notify = Arc::new(Notify::new());
106 let notify_clone = notify.clone();
107 let sui_client_clone = sui_client.clone();
108 let last_synced_sui_checkpoints_metric = metrics
109 .last_synced_sui_checkpoints
110 .with_label_values(&[&module.to_string()]);
111 spawn_logged_monitored_task!(async move {
112 loop {
113 notify_clone.notified().await;
114 let Ok(Ok(latest_checkpoint_sequence_number)) = retry_with_max_elapsed_time!(
115 sui_client_clone.get_latest_checkpoint_sequence_number(),
116 Duration::from_secs(120)
117 ) else {
118 tracing::error!(
119 "Failed to query latest checkpoint sequence number from sui client after retry"
120 );
121 continue;
122 };
123 last_synced_sui_checkpoints_metric.set(latest_checkpoint_sequence_number as i64);
124 }
125 });
126
127 loop {
128 interval.tick().await;
129 let Ok(Ok(events)) = retry_with_max_elapsed_time!(
130 sui_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor),
131 Duration::from_secs(120)
132 ) else {
133 tracing::error!("Failed to query events from sui client after retry");
134 continue;
135 };
136
137 let len = events.data.len();
138 if len != 0 {
139 if !events.has_next_page {
140 notify.notify_one();
143 }
144 events_sender
145 .send((module.clone(), events.data))
146 .await
147 .expect("All Sui event channel receivers are closed");
148 if let Some(next) = events.next_cursor {
149 cursor = Some(next);
150 }
151 tracing::info!(?module, ?cursor, "Observed {len} new Sui events");
152 }
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 use crate::{sui_client::SuiClient, sui_mock_client::SuiMockClient};
162 use prometheus::Registry;
163 use sui_json_rpc_types::EventPage;
164 use sui_types::{Identifier, digests::TransactionDigest, event::EventID};
165 use tokio::time::timeout;
166
167 #[tokio::test]
168 async fn test_sui_syncer_basic() -> anyhow::Result<()> {
169 telemetry_subscribers::init_for_testing();
170 let registry = Registry::new();
171 mysten_metrics::init_metrics(®istry);
172 let metrics = Arc::new(BridgeMetrics::new(®istry));
173 let mock = SuiMockClient::default();
174 let client = Arc::new(SuiClient::new_for_testing(mock.clone()));
175 let module_foo = Identifier::new("Foo").unwrap();
176 let module_bar = Identifier::new("Bar").unwrap();
177 let empty_events = EventPage::empty();
178 let cursor = EventID {
179 tx_digest: TransactionDigest::random(),
180 event_seq: 0,
181 };
182 add_event_response(&mock, module_foo.clone(), cursor, empty_events.clone());
183 add_event_response(&mock, module_bar.clone(), cursor, empty_events.clone());
184
185 let target_modules = HashMap::from_iter(vec![
186 (module_foo.clone(), Some(cursor)),
187 (module_bar.clone(), Some(cursor)),
188 ]);
189 let interval = Duration::from_millis(200);
190 let (_handles, mut events_rx) = SuiSyncer::new(client, target_modules, metrics.clone())
191 .run(interval)
192 .await
193 .unwrap();
194
195 assert_no_more_events(interval, &mut events_rx).await;
197
198 mock.set_latest_checkpoint_sequence_number(999);
199 let mut event_1: SuiEvent = SuiEvent::random_for_testing();
201 let package_id = BRIDGE_PACKAGE_ID;
202 event_1.type_.address = package_id.into();
203 event_1.type_.module = module_foo.clone();
204 let module_foo_events_1: sui_json_rpc_types::Page<SuiEvent, EventID> = EventPage {
205 data: vec![event_1.clone(), event_1.clone()],
206 next_cursor: Some(event_1.id),
207 has_next_page: false,
208 };
209 add_event_response(&mock, module_foo.clone(), event_1.id, empty_events.clone());
210 add_event_response(
211 &mock,
212 module_foo.clone(),
213 cursor,
214 module_foo_events_1.clone(),
215 );
216
217 let (identifier, received_events) = events_rx.recv().await.unwrap();
218 assert_eq!(identifier, module_foo);
219 assert_eq!(received_events.len(), 2);
220 assert_eq!(received_events[0].id, event_1.id);
221 assert_eq!(received_events[1].id, event_1.id);
222 assert_no_more_events(interval, &mut events_rx).await;
224 assert_eq!(
225 metrics
226 .last_synced_sui_checkpoints
227 .get_metric_with_label_values(&["Foo"])
228 .unwrap()
229 .get(),
230 999
231 );
232
233 let mut event_2: SuiEvent = SuiEvent::random_for_testing();
235 event_2.type_.address = package_id.into();
236 event_2.type_.module = module_bar.clone();
237 let module_bar_events_1 = EventPage {
238 data: vec![event_2.clone()],
239 next_cursor: Some(event_2.id),
240 has_next_page: true, };
242 add_event_response(&mock, module_bar.clone(), event_2.id, empty_events.clone());
243
244 add_event_response(&mock, module_bar.clone(), cursor, module_bar_events_1);
245
246 let (identifier, received_events) = events_rx.recv().await.unwrap();
247 assert_eq!(identifier, module_bar);
248 assert_eq!(received_events.len(), 1);
249 assert_eq!(received_events[0].id, event_2.id);
250 assert_no_more_events(interval, &mut events_rx).await;
252 assert_eq!(
253 metrics
254 .last_synced_sui_checkpoints
255 .get_metric_with_label_values(&["Bar"])
256 .unwrap()
257 .get(),
258 0, );
260
261 Ok(())
262 }
263
264 async fn assert_no_more_events(
265 interval: Duration,
266 events_rx: &mut mysten_metrics::metered_channel::Receiver<(Identifier, Vec<SuiEvent>)>,
267 ) {
268 match timeout(interval * 2, events_rx.recv()).await {
269 Err(_e) => (),
270 other => panic!("Should have timed out, but got: {:?}", other),
271 };
272 }
273
274 fn add_event_response(
275 mock: &SuiMockClient,
276 module: Identifier,
277 cursor: EventID,
278 events: EventPage,
279 ) {
280 mock.add_event_response(BRIDGE_PACKAGE_ID, module.clone(), cursor, events.clone());
281 }
282}