sui_bridge/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::error::{BridgeError, BridgeResult};
5use crate::types::{BridgeAction, BridgeActionDigest};
6use serde::{Deserialize, Deserializer, Serialize, Serializer};
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::Arc;
10use sui_types::Identifier;
11use sui_types::event::EventID;
12use typed_store::DBMapUtils;
13use typed_store::Map;
14use typed_store::rocks::{DBMap, MetricConf};
15
16#[derive(DBMapUtils)]
17pub struct BridgeOrchestratorTables {
18    /// pending BridgeActions that orchestrator received but not yet executed
19    pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
20    /// module identifier to the last processed EventID
21    pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
22    /// contract address to the last processed block
23    pub(crate) eth_syncer_cursors: DBMap<AlloyAddressSerializedAsEthers, u64>,
24    /// sequence number for the next record to be processed from the bridge records table
25    pub(crate) sui_syncer_sequence_number_cursor: DBMap<(), u64>,
26}
27
28impl BridgeOrchestratorTables {
29    pub fn new(path: &Path) -> Arc<Self> {
30        Arc::new(Self::open_tables_read_write(
31            path.to_path_buf(),
32            MetricConf::new("bridge"),
33            None,
34            None,
35        ))
36    }
37
38    pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
39        let mut batch = self.pending_actions.batch();
40        batch
41            .insert_batch(
42                &self.pending_actions,
43                actions.iter().map(|a| (a.digest(), a)),
44            )
45            .map_err(|e| {
46                BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
47            })?;
48        batch
49            .write()
50            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
51    }
52
53    pub(crate) fn remove_pending_actions(
54        &self,
55        actions: &[BridgeActionDigest],
56    ) -> BridgeResult<()> {
57        let mut batch = self.pending_actions.batch();
58        batch
59            .delete_batch(&self.pending_actions, actions)
60            .map_err(|e| {
61                BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
62            })?;
63        batch
64            .write()
65            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
66    }
67
68    pub(crate) fn update_sui_sequence_number_cursor(&self, cursor: u64) -> BridgeResult<()> {
69        let mut batch = self.sui_syncer_sequence_number_cursor.batch();
70
71        batch
72            .insert_batch(&self.sui_syncer_sequence_number_cursor, [((), cursor)])
73            .map_err(|e| {
74                BridgeError::StorageError(format!(
75                    "Couldn't insert into sui_syncer_sequence_number_cursor: {:?}",
76                    e
77                ))
78            })?;
79        batch
80            .write()
81            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
82    }
83
84    pub(crate) fn update_eth_event_cursor(
85        &self,
86        contract_address: alloy::primitives::Address,
87        cursor: u64,
88    ) -> BridgeResult<()> {
89        let mut batch = self.eth_syncer_cursors.batch();
90
91        batch
92            .insert_batch(
93                &self.eth_syncer_cursors,
94                [(AlloyAddressSerializedAsEthers(contract_address), cursor)],
95            )
96            .map_err(|e| {
97                BridgeError::StorageError(format!(
98                    "Coudln't insert into eth_syncer_cursors: {:?}",
99                    e
100                ))
101            })?;
102        batch
103            .write()
104            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
105    }
106
107    pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
108        self.pending_actions
109            .safe_iter()
110            .collect::<Result<HashMap<_, _>, _>>()
111            .expect("failed to get all pending actions")
112    }
113
114    pub fn get_sui_event_cursors(
115        &self,
116        identifiers: &[Identifier],
117    ) -> BridgeResult<Vec<Option<EventID>>> {
118        self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
119            BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
120        })
121    }
122
123    pub fn get_sui_sequence_number_cursor(&self) -> BridgeResult<Option<u64>> {
124        self.sui_syncer_sequence_number_cursor
125            .get(&())
126            .map_err(|e| {
127                BridgeError::StorageError(format!(
128                    "Couldn't get sui_syncer_sequence_number_cursor: {:?}",
129                    e
130                ))
131            })
132    }
133
134    pub fn get_eth_event_cursors(
135        &self,
136        contract_addresses: &[alloy::primitives::Address],
137    ) -> BridgeResult<Vec<Option<u64>>> {
138        let wrapped_addresses: Vec<AlloyAddressSerializedAsEthers> = contract_addresses
139            .iter()
140            .map(|addr| AlloyAddressSerializedAsEthers(*addr))
141            .collect();
142        self.eth_syncer_cursors
143            .multi_get(&wrapped_addresses)
144            .map_err(|e| {
145                BridgeError::StorageError(format!("Couldn't get eth_syncer_cursors: {:?}", e))
146            })
147    }
148}
149
150/// Wrapper around alloy::primitives::Address that serializes in the same format
151/// as ethers::types::Address (as a hex string) for backward compatibility.
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
153pub struct AlloyAddressSerializedAsEthers(pub alloy::primitives::Address);
154
155impl Serialize for AlloyAddressSerializedAsEthers {
156    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
157    where
158        S: Serializer,
159    {
160        let hex_string = format!("0x{:x}", self.0);
161        hex_string.serialize(serializer)
162    }
163}
164
165impl<'de> Deserialize<'de> for AlloyAddressSerializedAsEthers {
166    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
167    where
168        D: Deserializer<'de>,
169    {
170        let s = String::deserialize(deserializer)?;
171        let address = s.parse().map_err(serde::de::Error::custom)?;
172        Ok(AlloyAddressSerializedAsEthers(address))
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use std::str::FromStr;
179
180    use crate::test_utils::get_test_sui_to_eth_bridge_action;
181
182    use super::*;
183
184    // async: existing runtime is required with typed-store
185    #[tokio::test]
186    async fn test_bridge_storage_basic() {
187        let temp_dir = tempfile::tempdir().unwrap();
188        let store = BridgeOrchestratorTables::new(temp_dir.path());
189
190        let action1 = get_test_sui_to_eth_bridge_action(
191            None,
192            Some(0),
193            Some(99),
194            Some(10000),
195            None,
196            None,
197            None,
198        );
199
200        let action2 = get_test_sui_to_eth_bridge_action(
201            None,
202            Some(2),
203            Some(100),
204            Some(10000),
205            None,
206            None,
207            None,
208        );
209
210        // in the beginning it's empty
211        let actions = store.get_all_pending_actions();
212        assert!(actions.is_empty());
213
214        // remove non existing entry is ok
215        store.remove_pending_actions(&[action1.digest()]).unwrap();
216
217        store
218            .insert_pending_actions(&[action1.clone(), action2.clone()])
219            .unwrap();
220
221        let actions = store.get_all_pending_actions();
222        assert_eq!(
223            actions,
224            HashMap::from_iter(vec![
225                (action1.digest(), action1.clone()),
226                (action2.digest(), action2.clone())
227            ])
228        );
229
230        // insert an existing action is ok
231        store
232            .insert_pending_actions(std::slice::from_ref(&action1))
233            .unwrap();
234        let actions = store.get_all_pending_actions();
235        assert_eq!(
236            actions,
237            HashMap::from_iter(vec![
238                (action1.digest(), action1.clone()),
239                (action2.digest(), action2.clone())
240            ])
241        );
242
243        // remove action 2
244        store.remove_pending_actions(&[action2.digest()]).unwrap();
245        let actions = store.get_all_pending_actions();
246        assert_eq!(
247            actions,
248            HashMap::from_iter(vec![(action1.digest(), action1.clone())])
249        );
250
251        // remove action 1
252        store.remove_pending_actions(&[action1.digest()]).unwrap();
253        let actions = store.get_all_pending_actions();
254        assert!(actions.is_empty());
255
256        // update eth event cursor
257        let eth_contract_address = alloy::primitives::Address::random();
258        let eth_block_num = 199999u64;
259        assert!(
260            store
261                .get_eth_event_cursors(&[eth_contract_address])
262                .unwrap()[0]
263                .is_none()
264        );
265        store
266            .update_eth_event_cursor(eth_contract_address, eth_block_num)
267            .unwrap();
268        assert_eq!(
269            store
270                .get_eth_event_cursors(&[eth_contract_address])
271                .unwrap()[0]
272                .unwrap(),
273            eth_block_num
274        );
275
276        // update sui seq cursor
277        let sui_sequence_number_cursor = 100u64;
278        assert!(store.get_sui_sequence_number_cursor().unwrap().is_none());
279        store
280            .update_sui_sequence_number_cursor(sui_sequence_number_cursor)
281            .unwrap();
282        assert_eq!(
283            store.get_sui_sequence_number_cursor().unwrap().unwrap(),
284            sui_sequence_number_cursor
285        );
286    }
287
288    #[tokio::test]
289    async fn test_address_serialization() {
290        let alloy_address =
291            alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
292                .unwrap();
293        let expected_ethers_serialized = vec![
294            42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
295            51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
296            101, 97, 56, 99, 57, 99, 49,
297        ];
298        let wrapped_address = AlloyAddressSerializedAsEthers(alloy_address);
299        let alloy_serialized = bincode::serialize(&wrapped_address).unwrap();
300        assert_eq!(alloy_serialized, expected_ethers_serialized);
301    }
302
303    #[tokio::test]
304    async fn test_address_deserialization() {
305        let ethers_serialized = vec![
306            42, 0, 0, 0, 0, 0, 0, 0, 48, 120, 57, 48, 102, 56, 98, 102, 54, 97, 52, 55, 57, 102,
307            51, 50, 48, 101, 97, 100, 48, 55, 52, 52, 49, 49, 97, 52, 98, 48, 101, 55, 57, 52, 52,
308            101, 97, 56, 99, 57, 99, 49,
309        ];
310        let wrapped_address: AlloyAddressSerializedAsEthers =
311            bincode::deserialize(&ethers_serialized).unwrap();
312        let expected_address =
313            alloy::primitives::Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1")
314                .unwrap();
315        assert_eq!(wrapped_address.0, expected_address);
316    }
317}