1use crate::error::{BridgeError, BridgeResult};
5use crate::types::{BridgeAction, BridgeActionDigest};
6use serde::{Deserialize, Deserializer, Serialize, Serializer};
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10use sui_types::Identifier;
11use sui_types::event::EventID;
12use typed_store::DBMapUtils;
13use typed_store::Map;
14use typed_store::rocks::{DBMap, MetricConf};
15
16#[derive(DBMapUtils)]
17pub struct BridgeOrchestratorTables {
18 pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
20 pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
22 pub(crate) eth_syncer_cursors: DBMap<AlloyAddressSerializedAsEthers, u64>,
24 pub(crate) sui_syncer_sequence_number_cursor: DBMap<(), u64>,
26}
27
28impl BridgeOrchestratorTables {
29 pub fn new(path: &Path) -> Arc<Self> {
30 Arc::new(Self::open_tables_read_write(
31 path.to_path_buf(),
32 MetricConf::new("bridge"),
33 None,
34 None,
35 ))
36 }
37
38 pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
39 let mut batch = self.pending_actions.batch();
40 batch
41 .insert_batch(
42 &self.pending_actions,
43 actions.iter().map(|a| (a.digest(), a)),
44 )
45 .map_err(|e| {
46 BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
47 })?;
48 batch
49 .write()
50 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
51 }
52
53 pub(crate) fn remove_pending_actions(
54 &self,
55 actions: &[BridgeActionDigest],
56 ) -> BridgeResult<()> {
57 let mut batch = self.pending_actions.batch();
58 batch
59 .delete_batch(&self.pending_actions, actions)
60 .map_err(|e| {
61 BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
62 })?;
63 batch
64 .write()
65 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
66 }
67
68 pub(crate) fn update_sui_event_cursor(
69 &self,
70 module: Identifier,
71 cursor: EventID,
72 ) -> BridgeResult<()> {
73 let mut batch = self.sui_syncer_cursors.batch();
74
75 batch
76 .insert_batch(&self.sui_syncer_cursors, [(module, cursor)])
77 .map_err(|e| {
78 BridgeError::StorageError(format!(
79 "Coudln't insert into sui_syncer_cursors: {:?}",
80 e
81 ))
82 })?;
83 batch
84 .write()
85 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
86 }
87
88 pub(crate) fn update_sui_sequence_number_cursor(&self, cursor: u64) -> BridgeResult<()> {
89 let mut batch = self.sui_syncer_sequence_number_cursor.batch();
90
91 batch
92 .insert_batch(&self.sui_syncer_sequence_number_cursor, [((), cursor)])
93 .map_err(|e| {
94 BridgeError::StorageError(format!(
95 "Couldn't insert into sui_syncer_sequence_number_cursor: {:?}",
96 e
97 ))
98 })?;
99 batch
100 .write()
101 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
102 }
103
104 pub(crate) fn update_eth_event_cursor(
105 &self,
106 contract_address: alloy::primitives::Address,
107 cursor: u64,
108 ) -> BridgeResult<()> {
109 let mut batch = self.eth_syncer_cursors.batch();
110
111 batch
112 .insert_batch(
113 &self.eth_syncer_cursors,
114 [(AlloyAddressSerializedAsEthers(contract_address), cursor)],
115 )
116 .map_err(|e| {
117 BridgeError::StorageError(format!(
118 "Coudln't insert into eth_syncer_cursors: {:?}",
119 e
120 ))
121 })?;
122 batch
123 .write()
124 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
125 }
126
127 pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
128 self.pending_actions
129 .safe_iter()
130 .collect::<Result<HashMap<_, _>, _>>()
131 .expect("failed to get all pending actions")
132 }
133
134 pub fn get_sui_event_cursors(
135 &self,
136 identifiers: &[Identifier],
137 ) -> BridgeResult<Vec<Option<EventID>>> {
138 self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
139 BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
140 })
141 }
142
143 pub fn get_sui_sequence_number_cursor(&self) -> BridgeResult<Option<u64>> {
144 self.sui_syncer_sequence_number_cursor
145 .get(&())
146 .map_err(|e| {
147 BridgeError::StorageError(format!(
148 "Couldn't get sui_syncer_sequence_number_cursor: {:?}",
149 e
150 ))
151 })
152 }
153
154 pub fn get_eth_event_cursors(
155 &self,
156 contract_addresses: &[alloy::primitives::Address],
157 ) -> BridgeResult<Vec<Option<u64>>> {
158 let wrapped_addresses: Vec<AlloyAddressSerializedAsEthers> = contract_addresses
159 .iter()
160 .map(|addr| AlloyAddressSerializedAsEthers(*addr))
161 .collect();
162 self.eth_syncer_cursors
163 .multi_get(&wrapped_addresses)
164 .map_err(|e| {
165 BridgeError::StorageError(format!("Couldn't get eth_syncer_cursors: {:?}", e))
166 })
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
173pub struct AlloyAddressSerializedAsEthers(pub alloy::primitives::Address);
174
175impl Serialize for AlloyAddressSerializedAsEthers {
176 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177 where
178 S: Serializer,
179 {
180 let hex_string = format!("0x{:x}", self.0);
181 hex_string.serialize(serializer)
182 }
183}
184
185impl<'de> Deserialize<'de> for AlloyAddressSerializedAsEthers {
186 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
187 where
188 D: Deserializer<'de>,
189 {
190 let s = String::deserialize(deserializer)?;
191 let address = s.parse().map_err(serde::de::Error::custom)?;
192 Ok(AlloyAddressSerializedAsEthers(address))
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::str::FromStr;
199
200 use sui_types::digests::TransactionDigest;
201
202 use crate::test_utils::get_test_sui_to_eth_bridge_action;
203
204 use super::*;
205
206 #[tokio::test]
208 async fn test_bridge_storage_basic() {
209 let temp_dir = tempfile::tempdir().unwrap();
210 let store = BridgeOrchestratorTables::new(temp_dir.path());
211
212 let action1 = get_test_sui_to_eth_bridge_action(
213 None,
214 Some(0),
215 Some(99),
216 Some(10000),
217 None,
218 None,
219 None,
220 );
221
222 let action2 = get_test_sui_to_eth_bridge_action(
223 None,
224 Some(2),
225 Some(100),
226 Some(10000),
227 None,
228 None,
229 None,
230 );
231
232 let actions = store.get_all_pending_actions();
234 assert!(actions.is_empty());
235
236 store.remove_pending_actions(&[action1.digest()]).unwrap();
238
239 store
240 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
241 .unwrap();
242
243 let actions = store.get_all_pending_actions();
244 assert_eq!(
245 actions,
246 HashMap::from_iter(vec![
247 (action1.digest(), action1.clone()),
248 (action2.digest(), action2.clone())
249 ])
250 );
251
252 store
254 .insert_pending_actions(std::slice::from_ref(&action1))
255 .unwrap();
256 let actions = store.get_all_pending_actions();
257 assert_eq!(
258 actions,
259 HashMap::from_iter(vec![
260 (action1.digest(), action1.clone()),
261 (action2.digest(), action2.clone())
262 ])
263 );
264
265 store.remove_pending_actions(&[action2.digest()]).unwrap();
267 let actions = store.get_all_pending_actions();
268 assert_eq!(
269 actions,
270 HashMap::from_iter(vec![(action1.digest(), action1.clone())])
271 );
272
273 store.remove_pending_actions(&[action1.digest()]).unwrap();
275 let actions = store.get_all_pending_actions();
276 assert!(actions.is_empty());
277
278 let eth_contract_address = alloy::primitives::Address::random();
280 let eth_block_num = 199999u64;
281 assert!(
282 store
283 .get_eth_event_cursors(&[eth_contract_address])
284 .unwrap()[0]
285 .is_none()
286 );
287 store
288 .update_eth_event_cursor(eth_contract_address, eth_block_num)
289 .unwrap();
290 assert_eq!(
291 store
292 .get_eth_event_cursors(&[eth_contract_address])
293 .unwrap()[0]
294 .unwrap(),
295 eth_block_num
296 );
297
298 let sui_module = Identifier::from_str("test").unwrap();
300 let sui_cursor = EventID {
301 tx_digest: TransactionDigest::random(),
302 event_seq: 1,
303 };
304 assert!(
305 store
306 .get_sui_event_cursors(std::slice::from_ref(&sui_module))
307 .unwrap()[0]
308 .is_none()
309 );
310 store
311 .update_sui_event_cursor(sui_module.clone(), sui_cursor)
312 .unwrap();
313 assert_eq!(
314 store
315 .get_sui_event_cursors(std::slice::from_ref(&sui_module))
316 .unwrap()[0]
317 .unwrap(),
318 sui_cursor
319 );
320
321 let sui_sequence_number_cursor = 100u64;
323 assert!(store.get_sui_sequence_number_cursor().unwrap().is_none());
324 store
325 .update_sui_sequence_number_cursor(sui_sequence_number_cursor)
326 .unwrap();
327 assert_eq!(
328 store.get_sui_sequence_number_cursor().unwrap().unwrap(),
329 sui_sequence_number_cursor
330 );
331 }
332
333 #[tokio::test]
334 async fn test_address_serialization() {
335 let alloy_address =
336 alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
337 .unwrap();
338 let expected_ethers_serialized = vec![
339 42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
340 51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
341 101, 97, 56, 99, 57, 99, 49,
342 ];
343 let wrapped_address = AlloyAddressSerializedAsEthers(alloy_address);
344 let alloy_serialized = bincode::serialize(&wrapped_address).unwrap();
345 assert_eq!(alloy_serialized, expected_ethers_serialized);
346 }
347
348 #[tokio::test]
349 async fn test_address_deserialization() {
350 let ethers_serialized = vec![
351 42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
352 51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
353 101, 97, 56, 99, 57, 99, 49,
354 ];
355 let wrapped_address: AlloyAddressSerializedAsEthers =
356 bincode::deserialize(ðers_serialized).unwrap();
357 let expected_address =
358 alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
359 .unwrap();
360 assert_eq!(wrapped_address.0, expected_address);
361 }
362}