sui_bridge/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error::{BridgeError, BridgeResult};
5use crate::types::{BridgeAction, BridgeActionDigest};
6use serde::{Deserialize, Deserializer, Serialize, Serializer};
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10use sui_types::Identifier;
11use sui_types::event::EventID;
12use typed_store::DBMapUtils;
13use typed_store::Map;
14use typed_store::rocks::{DBMap, MetricConf};
15
16#[derive(DBMapUtils)]
17pub struct BridgeOrchestratorTables {
18    /// pending BridgeActions that orchestrator received but not yet executed
19    pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
20    /// module identifier to the last processed EventID
21    pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
22    /// contract address to the last processed block
23    pub(crate) eth_syncer_cursors: DBMap<AlloyAddressSerializedAsEthers, u64>,
24    /// sequence number for the next record to be processed from the bridge records table
25    pub(crate) sui_syncer_sequence_number_cursor: DBMap<(), u64>,
26}
27
28impl BridgeOrchestratorTables {
29    pub fn new(path: &Path) -> Arc<Self> {
30        Arc::new(Self::open_tables_read_write(
31            path.to_path_buf(),
32            MetricConf::new("bridge"),
33            None,
34            None,
35        ))
36    }
37
38    pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
39        let mut batch = self.pending_actions.batch();
40        batch
41            .insert_batch(
42                &self.pending_actions,
43                actions.iter().map(|a| (a.digest(), a)),
44            )
45            .map_err(|e| {
46                BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
47            })?;
48        batch
49            .write()
50            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
51    }
52
53    pub(crate) fn remove_pending_actions(
54        &self,
55        actions: &[BridgeActionDigest],
56    ) -> BridgeResult<()> {
57        let mut batch = self.pending_actions.batch();
58        batch
59            .delete_batch(&self.pending_actions, actions)
60            .map_err(|e| {
61                BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
62            })?;
63        batch
64            .write()
65            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
66    }
67
68    pub(crate) fn update_sui_event_cursor(
69        &self,
70        module: Identifier,
71        cursor: EventID,
72    ) -> BridgeResult<()> {
73        let mut batch = self.sui_syncer_cursors.batch();
74
75        batch
76            .insert_batch(&self.sui_syncer_cursors, [(module, cursor)])
77            .map_err(|e| {
78                BridgeError::StorageError(format!(
79                    "Coudln't insert into sui_syncer_cursors: {:?}",
80                    e
81                ))
82            })?;
83        batch
84            .write()
85            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
86    }
87
88    pub(crate) fn update_sui_sequence_number_cursor(&self, cursor: u64) -> BridgeResult<()> {
89        let mut batch = self.sui_syncer_sequence_number_cursor.batch();
90
91        batch
92            .insert_batch(&self.sui_syncer_sequence_number_cursor, [((), cursor)])
93            .map_err(|e| {
94                BridgeError::StorageError(format!(
95                    "Couldn't insert into sui_syncer_sequence_number_cursor: {:?}",
96                    e
97                ))
98            })?;
99        batch
100            .write()
101            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
102    }
103
104    pub(crate) fn update_eth_event_cursor(
105        &self,
106        contract_address: alloy::primitives::Address,
107        cursor: u64,
108    ) -> BridgeResult<()> {
109        let mut batch = self.eth_syncer_cursors.batch();
110
111        batch
112            .insert_batch(
113                &self.eth_syncer_cursors,
114                [(AlloyAddressSerializedAsEthers(contract_address), cursor)],
115            )
116            .map_err(|e| {
117                BridgeError::StorageError(format!(
118                    "Coudln't insert into eth_syncer_cursors: {:?}",
119                    e
120                ))
121            })?;
122        batch
123            .write()
124            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
125    }
126
127    pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
128        self.pending_actions
129            .safe_iter()
130            .collect::<Result<HashMap<_, _>, _>>()
131            .expect("failed to get all pending actions")
132    }
133
134    pub fn get_sui_event_cursors(
135        &self,
136        identifiers: &[Identifier],
137    ) -> BridgeResult<Vec<Option<EventID>>> {
138        self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
139            BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
140        })
141    }
142
143    pub fn get_sui_sequence_number_cursor(&self) -> BridgeResult<Option<u64>> {
144        self.sui_syncer_sequence_number_cursor
145            .get(&())
146            .map_err(|e| {
147                BridgeError::StorageError(format!(
148                    "Couldn't get sui_syncer_sequence_number_cursor: {:?}",
149                    e
150                ))
151            })
152    }
153
154    pub fn get_eth_event_cursors(
155        &self,
156        contract_addresses: &[alloy::primitives::Address],
157    ) -> BridgeResult<Vec<Option<u64>>> {
158        let wrapped_addresses: Vec<AlloyAddressSerializedAsEthers> = contract_addresses
159            .iter()
160            .map(|addr| AlloyAddressSerializedAsEthers(*addr))
161            .collect();
162        self.eth_syncer_cursors
163            .multi_get(&wrapped_addresses)
164            .map_err(|e| {
165                BridgeError::StorageError(format!("Couldn't get eth_syncer_cursors: {:?}", e))
166            })
167    }
168}
169
170/// Wrapper around alloy::primitives::Address that serializes in the same format
171/// as ethers::types::Address (as a hex string) for backward compatibility.
172#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
173pub struct AlloyAddressSerializedAsEthers(pub alloy::primitives::Address);
174
175impl Serialize for AlloyAddressSerializedAsEthers {
176    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177    where
178        S: Serializer,
179    {
180        let hex_string = format!("0x{:x}", self.0);
181        hex_string.serialize(serializer)
182    }
183}
184
185impl<'de> Deserialize<'de> for AlloyAddressSerializedAsEthers {
186    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
187    where
188        D: Deserializer<'de>,
189    {
190        let s = String::deserialize(deserializer)?;
191        let address = s.parse().map_err(serde::de::Error::custom)?;
192        Ok(AlloyAddressSerializedAsEthers(address))
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::str::FromStr;
199
200    use sui_types::digests::TransactionDigest;
201
202    use crate::test_utils::get_test_sui_to_eth_bridge_action;
203
204    use super::*;
205
206    // async: existing runtime is required with typed-store
207    #[tokio::test]
208    async fn test_bridge_storage_basic() {
209        let temp_dir = tempfile::tempdir().unwrap();
210        let store = BridgeOrchestratorTables::new(temp_dir.path());
211
212        let action1 = get_test_sui_to_eth_bridge_action(
213            None,
214            Some(0),
215            Some(99),
216            Some(10000),
217            None,
218            None,
219            None,
220        );
221
222        let action2 = get_test_sui_to_eth_bridge_action(
223            None,
224            Some(2),
225            Some(100),
226            Some(10000),
227            None,
228            None,
229            None,
230        );
231
232        // in the beginning it's empty
233        let actions = store.get_all_pending_actions();
234        assert!(actions.is_empty());
235
236        // remove non existing entry is ok
237        store.remove_pending_actions(&[action1.digest()]).unwrap();
238
239        store
240            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
241            .unwrap();
242
243        let actions = store.get_all_pending_actions();
244        assert_eq!(
245            actions,
246            HashMap::from_iter(vec![
247                (action1.digest(), action1.clone()),
248                (action2.digest(), action2.clone())
249            ])
250        );
251
252        // insert an existing action is ok
253        store
254            .insert_pending_actions(std::slice::from_ref(&action1))
255            .unwrap();
256        let actions = store.get_all_pending_actions();
257        assert_eq!(
258            actions,
259            HashMap::from_iter(vec![
260                (action1.digest(), action1.clone()),
261                (action2.digest(), action2.clone())
262            ])
263        );
264
265        // remove action 2
266        store.remove_pending_actions(&[action2.digest()]).unwrap();
267        let actions = store.get_all_pending_actions();
268        assert_eq!(
269            actions,
270            HashMap::from_iter(vec![(action1.digest(), action1.clone())])
271        );
272
273        // remove action 1
274        store.remove_pending_actions(&[action1.digest()]).unwrap();
275        let actions = store.get_all_pending_actions();
276        assert!(actions.is_empty());
277
278        // update eth event cursor
279        let eth_contract_address = alloy::primitives::Address::random();
280        let eth_block_num = 199999u64;
281        assert!(
282            store
283                .get_eth_event_cursors(&[eth_contract_address])
284                .unwrap()[0]
285                .is_none()
286        );
287        store
288            .update_eth_event_cursor(eth_contract_address, eth_block_num)
289            .unwrap();
290        assert_eq!(
291            store
292                .get_eth_event_cursors(&[eth_contract_address])
293                .unwrap()[0]
294                .unwrap(),
295            eth_block_num
296        );
297
298        // update sui event cursor
299        let sui_module = Identifier::from_str("test").unwrap();
300        let sui_cursor = EventID {
301            tx_digest: TransactionDigest::random(),
302            event_seq: 1,
303        };
304        assert!(
305            store
306                .get_sui_event_cursors(std::slice::from_ref(&sui_module))
307                .unwrap()[0]
308                .is_none()
309        );
310        store
311            .update_sui_event_cursor(sui_module.clone(), sui_cursor)
312            .unwrap();
313        assert_eq!(
314            store
315                .get_sui_event_cursors(std::slice::from_ref(&sui_module))
316                .unwrap()[0]
317                .unwrap(),
318            sui_cursor
319        );
320
321        // update sui seq cursor
322        let sui_sequence_number_cursor = 100u64;
323        assert!(store.get_sui_sequence_number_cursor().unwrap().is_none());
324        store
325            .update_sui_sequence_number_cursor(sui_sequence_number_cursor)
326            .unwrap();
327        assert_eq!(
328            store.get_sui_sequence_number_cursor().unwrap().unwrap(),
329            sui_sequence_number_cursor
330        );
331    }
332
333    #[tokio::test]
334    async fn test_address_serialization() {
335        let alloy_address =
336            alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
337                .unwrap();
338        let expected_ethers_serialized = vec![
339            42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
340            51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
341            101, 97, 56, 99, 57, 99, 49,
342        ];
343        let wrapped_address = AlloyAddressSerializedAsEthers(alloy_address);
344        let alloy_serialized = bincode::serialize(&wrapped_address).unwrap();
345        assert_eq!(alloy_serialized, expected_ethers_serialized);
346    }
347
348    #[tokio::test]
349    async fn test_address_deserialization() {
350        let ethers_serialized = vec![
351            42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
352            51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
353            101, 97, 56, 99, 57, 99, 49,
354        ];
355        let wrapped_address: AlloyAddressSerializedAsEthers =
356            bincode::deserialize(&ethers_serialized).unwrap();
357        let expected_address =
358            alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
359                .unwrap();
360        assert_eq!(wrapped_address.0, expected_address);
361    }
362}