1use crate::error::{BridgeError, BridgeResult};
5use crate::types::{BridgeAction, BridgeActionDigest};
6use serde::{Deserialize, Deserializer, Serialize, Serializer};
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10use sui_types::Identifier;
11use sui_types::event::EventID;
12use typed_store::DBMapUtils;
13use typed_store::Map;
14use typed_store::rocks::{DBMap, MetricConf};
15
16#[derive(DBMapUtils)]
17pub struct BridgeOrchestratorTables {
18 pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
20 pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
22 pub(crate) eth_syncer_cursors: DBMap<AlloyAddressSerializedAsEthers, u64>,
24 pub(crate) sui_syncer_sequence_number_cursor: DBMap<(), u64>,
26}
27
28impl BridgeOrchestratorTables {
29 pub fn new(path: &Path) -> Arc<Self> {
30 Arc::new(Self::open_tables_read_write(
31 path.to_path_buf(),
32 MetricConf::new("bridge"),
33 None,
34 None,
35 ))
36 }
37
38 pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
39 let mut batch = self.pending_actions.batch();
40 batch
41 .insert_batch(
42 &self.pending_actions,
43 actions.iter().map(|a| (a.digest(), a)),
44 )
45 .map_err(|e| {
46 BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
47 })?;
48 batch
49 .write()
50 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
51 }
52
53 pub(crate) fn remove_pending_actions(
54 &self,
55 actions: &[BridgeActionDigest],
56 ) -> BridgeResult<()> {
57 let mut batch = self.pending_actions.batch();
58 batch
59 .delete_batch(&self.pending_actions, actions)
60 .map_err(|e| {
61 BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
62 })?;
63 batch
64 .write()
65 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
66 }
67
68 pub(crate) fn update_sui_sequence_number_cursor(&self, cursor: u64) -> BridgeResult<()> {
69 let mut batch = self.sui_syncer_sequence_number_cursor.batch();
70
71 batch
72 .insert_batch(&self.sui_syncer_sequence_number_cursor, [((), cursor)])
73 .map_err(|e| {
74 BridgeError::StorageError(format!(
75 "Couldn't insert into sui_syncer_sequence_number_cursor: {:?}",
76 e
77 ))
78 })?;
79 batch
80 .write()
81 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
82 }
83
84 pub(crate) fn update_eth_event_cursor(
85 &self,
86 contract_address: alloy::primitives::Address,
87 cursor: u64,
88 ) -> BridgeResult<()> {
89 let mut batch = self.eth_syncer_cursors.batch();
90
91 batch
92 .insert_batch(
93 &self.eth_syncer_cursors,
94 [(AlloyAddressSerializedAsEthers(contract_address), cursor)],
95 )
96 .map_err(|e| {
97 BridgeError::StorageError(format!(
98 "Coudln't insert into eth_syncer_cursors: {:?}",
99 e
100 ))
101 })?;
102 batch
103 .write()
104 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
105 }
106
107 pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
108 self.pending_actions
109 .safe_iter()
110 .collect::<Result<HashMap<_, _>, _>>()
111 .expect("failed to get all pending actions")
112 }
113
114 pub fn get_sui_event_cursors(
115 &self,
116 identifiers: &[Identifier],
117 ) -> BridgeResult<Vec<Option<EventID>>> {
118 self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
119 BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
120 })
121 }
122
123 pub fn get_sui_sequence_number_cursor(&self) -> BridgeResult<Option<u64>> {
124 self.sui_syncer_sequence_number_cursor
125 .get(&())
126 .map_err(|e| {
127 BridgeError::StorageError(format!(
128 "Couldn't get sui_syncer_sequence_number_cursor: {:?}",
129 e
130 ))
131 })
132 }
133
134 pub fn get_eth_event_cursors(
135 &self,
136 contract_addresses: &[alloy::primitives::Address],
137 ) -> BridgeResult<Vec<Option<u64>>> {
138 let wrapped_addresses: Vec<AlloyAddressSerializedAsEthers> = contract_addresses
139 .iter()
140 .map(|addr| AlloyAddressSerializedAsEthers(*addr))
141 .collect();
142 self.eth_syncer_cursors
143 .multi_get(&wrapped_addresses)
144 .map_err(|e| {
145 BridgeError::StorageError(format!("Couldn't get eth_syncer_cursors: {:?}", e))
146 })
147 }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
153pub struct AlloyAddressSerializedAsEthers(pub alloy::primitives::Address);
154
155impl Serialize for AlloyAddressSerializedAsEthers {
156 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
157 where
158 S: Serializer,
159 {
160 let hex_string = format!("0x{:x}", self.0);
161 hex_string.serialize(serializer)
162 }
163}
164
165impl<'de> Deserialize<'de> for AlloyAddressSerializedAsEthers {
166 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
167 where
168 D: Deserializer<'de>,
169 {
170 let s = String::deserialize(deserializer)?;
171 let address = s.parse().map_err(serde::de::Error::custom)?;
172 Ok(AlloyAddressSerializedAsEthers(address))
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use std::str::FromStr;
179
180 use crate::test_utils::get_test_sui_to_eth_bridge_action;
181
182 use super::*;
183
184 #[tokio::test]
186 async fn test_bridge_storage_basic() {
187 let temp_dir = tempfile::tempdir().unwrap();
188 let store = BridgeOrchestratorTables::new(temp_dir.path());
189
190 let action1 = get_test_sui_to_eth_bridge_action(
191 None,
192 Some(0),
193 Some(99),
194 Some(10000),
195 None,
196 None,
197 None,
198 );
199
200 let action2 = get_test_sui_to_eth_bridge_action(
201 None,
202 Some(2),
203 Some(100),
204 Some(10000),
205 None,
206 None,
207 None,
208 );
209
210 let actions = store.get_all_pending_actions();
212 assert!(actions.is_empty());
213
214 store.remove_pending_actions(&[action1.digest()]).unwrap();
216
217 store
218 .insert_pending_actions(&[action1.clone(), action2.clone()])
219 .unwrap();
220
221 let actions = store.get_all_pending_actions();
222 assert_eq!(
223 actions,
224 HashMap::from_iter(vec![
225 (action1.digest(), action1.clone()),
226 (action2.digest(), action2.clone())
227 ])
228 );
229
230 store
232 .insert_pending_actions(std::slice::from_ref(&action1))
233 .unwrap();
234 let actions = store.get_all_pending_actions();
235 assert_eq!(
236 actions,
237 HashMap::from_iter(vec![
238 (action1.digest(), action1.clone()),
239 (action2.digest(), action2.clone())
240 ])
241 );
242
243 store.remove_pending_actions(&[action2.digest()]).unwrap();
245 let actions = store.get_all_pending_actions();
246 assert_eq!(
247 actions,
248 HashMap::from_iter(vec![(action1.digest(), action1.clone())])
249 );
250
251 store.remove_pending_actions(&[action1.digest()]).unwrap();
253 let actions = store.get_all_pending_actions();
254 assert!(actions.is_empty());
255
256 let eth_contract_address = alloy::primitives::Address::random();
258 let eth_block_num = 199999u64;
259 assert!(
260 store
261 .get_eth_event_cursors(&[eth_contract_address])
262 .unwrap()[0]
263 .is_none()
264 );
265 store
266 .update_eth_event_cursor(eth_contract_address, eth_block_num)
267 .unwrap();
268 assert_eq!(
269 store
270 .get_eth_event_cursors(&[eth_contract_address])
271 .unwrap()[0]
272 .unwrap(),
273 eth_block_num
274 );
275
276 let sui_sequence_number_cursor = 100u64;
278 assert!(store.get_sui_sequence_number_cursor().unwrap().is_none());
279 store
280 .update_sui_sequence_number_cursor(sui_sequence_number_cursor)
281 .unwrap();
282 assert_eq!(
283 store.get_sui_sequence_number_cursor().unwrap().unwrap(),
284 sui_sequence_number_cursor
285 );
286 }
287
288 #[tokio::test]
289 async fn test_address_serialization() {
290 let alloy_address =
291 alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
292 .unwrap();
293 let expected_ethers_serialized = vec![
294 42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
295 51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
296 101, 97, 56, 99, 57, 99, 49,
297 ];
298 let wrapped_address = AlloyAddressSerializedAsEthers(alloy_address);
299 let alloy_serialized = bincode::serialize(&wrapped_address).unwrap();
300 assert_eq!(alloy_serialized, expected_ethers_serialized);
301 }
302
303 #[tokio::test]
304 async fn test_address_deserialization() {
305 let ethers_serialized = vec![
306 42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
307 51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
308 101, 97, 56, 99, 57, 99, 49,
309 ];
310 let wrapped_address: AlloyAddressSerializedAsEthers =
311 bincode::deserialize(ðers_serialized).unwrap();
312 let expected_address =
313 alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
314 .unwrap();
315 assert_eq!(wrapped_address.0, expected_address);
316 }
317}