sui_bridge/
storage.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::HashMap;
5use std::path::Path;
6use std::sync::Arc;
7use sui_types::Identifier;
8
9use sui_types::event::EventID;
10use typed_store::DBMapUtils;
11use typed_store::Map;
12use typed_store::rocks::{DBMap, MetricConf};
13
14use crate::error::{BridgeError, BridgeResult};
15use crate::types::{BridgeAction, BridgeActionDigest};
16
17#[derive(DBMapUtils)]
18pub struct BridgeOrchestratorTables {
19    /// pending BridgeActions that orchestrator received but not yet executed
20    pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
21    /// module identifier to the last processed EventID
22    pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
23    /// contract address to the last processed block
24    pub(crate) eth_syncer_cursors: DBMap<ethers::types::Address, u64>,
25}
26
27impl BridgeOrchestratorTables {
28    pub fn new(path: &Path) -> Arc<Self> {
29        Arc::new(Self::open_tables_read_write(
30            path.to_path_buf(),
31            MetricConf::new("bridge"),
32            None,
33            None,
34        ))
35    }
36
37    pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
38        let mut batch = self.pending_actions.batch();
39        batch
40            .insert_batch(
41                &self.pending_actions,
42                actions.iter().map(|a| (a.digest(), a)),
43            )
44            .map_err(|e| {
45                BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
46            })?;
47        batch
48            .write()
49            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
50    }
51
52    pub(crate) fn remove_pending_actions(
53        &self,
54        actions: &[BridgeActionDigest],
55    ) -> BridgeResult<()> {
56        let mut batch = self.pending_actions.batch();
57        batch
58            .delete_batch(&self.pending_actions, actions)
59            .map_err(|e| {
60                BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
61            })?;
62        batch
63            .write()
64            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
65    }
66
67    pub(crate) fn update_sui_event_cursor(
68        &self,
69        module: Identifier,
70        cursor: EventID,
71    ) -> BridgeResult<()> {
72        let mut batch = self.sui_syncer_cursors.batch();
73
74        batch
75            .insert_batch(&self.sui_syncer_cursors, [(module, cursor)])
76            .map_err(|e| {
77                BridgeError::StorageError(format!(
78                    "Coudln't insert into sui_syncer_cursors: {:?}",
79                    e
80                ))
81            })?;
82        batch
83            .write()
84            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
85    }
86
87    pub(crate) fn update_eth_event_cursor(
88        &self,
89        contract_address: ethers::types::Address,
90        cursor: u64,
91    ) -> BridgeResult<()> {
92        let mut batch = self.eth_syncer_cursors.batch();
93
94        batch
95            .insert_batch(&self.eth_syncer_cursors, [(contract_address, cursor)])
96            .map_err(|e| {
97                BridgeError::StorageError(format!(
98                    "Coudln't insert into eth_syncer_cursors: {:?}",
99                    e
100                ))
101            })?;
102        batch
103            .write()
104            .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
105    }
106
107    pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
108        self.pending_actions
109            .safe_iter()
110            .collect::<Result<HashMap<_, _>, _>>()
111            .expect("failed to get all pending actions")
112    }
113
114    pub fn get_sui_event_cursors(
115        &self,
116        identifiers: &[Identifier],
117    ) -> BridgeResult<Vec<Option<EventID>>> {
118        self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
119            BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
120        })
121    }
122
123    pub fn get_eth_event_cursors(
124        &self,
125        contract_addresses: &[ethers::types::Address],
126    ) -> BridgeResult<Vec<Option<u64>>> {
127        self.eth_syncer_cursors
128            .multi_get(contract_addresses)
129            .map_err(|e| {
130                BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
131            })
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use std::str::FromStr;
138
139    use sui_types::digests::TransactionDigest;
140
141    use crate::test_utils::get_test_sui_to_eth_bridge_action;
142
143    use super::*;
144
145    // async: existing runtime is required with typed-store
146    #[tokio::test]
147    async fn test_bridge_storage_basic() {
148        let temp_dir = tempfile::tempdir().unwrap();
149        let store = BridgeOrchestratorTables::new(temp_dir.path());
150
151        let action1 = get_test_sui_to_eth_bridge_action(
152            None,
153            Some(0),
154            Some(99),
155            Some(10000),
156            None,
157            None,
158            None,
159        );
160
161        let action2 = get_test_sui_to_eth_bridge_action(
162            None,
163            Some(2),
164            Some(100),
165            Some(10000),
166            None,
167            None,
168            None,
169        );
170
171        // in the beginning it's empty
172        let actions = store.get_all_pending_actions();
173        assert!(actions.is_empty());
174
175        // remove non existing entry is ok
176        store.remove_pending_actions(&[action1.digest()]).unwrap();
177
178        store
179            .insert_pending_actions(&vec![action1.clone(), action2.clone()])
180            .unwrap();
181
182        let actions = store.get_all_pending_actions();
183        assert_eq!(
184            actions,
185            HashMap::from_iter(vec![
186                (action1.digest(), action1.clone()),
187                (action2.digest(), action2.clone())
188            ])
189        );
190
191        // insert an existing action is ok
192        store
193            .insert_pending_actions(std::slice::from_ref(&action1))
194            .unwrap();
195        let actions = store.get_all_pending_actions();
196        assert_eq!(
197            actions,
198            HashMap::from_iter(vec![
199                (action1.digest(), action1.clone()),
200                (action2.digest(), action2.clone())
201            ])
202        );
203
204        // remove action 2
205        store.remove_pending_actions(&[action2.digest()]).unwrap();
206        let actions = store.get_all_pending_actions();
207        assert_eq!(
208            actions,
209            HashMap::from_iter(vec![(action1.digest(), action1.clone())])
210        );
211
212        // remove action 1
213        store.remove_pending_actions(&[action1.digest()]).unwrap();
214        let actions = store.get_all_pending_actions();
215        assert!(actions.is_empty());
216
217        // update eth event cursor
218        let eth_contract_address = ethers::types::Address::random();
219        let eth_block_num = 199999u64;
220        assert!(
221            store
222                .get_eth_event_cursors(&[eth_contract_address])
223                .unwrap()[0]
224                .is_none()
225        );
226        store
227            .update_eth_event_cursor(eth_contract_address, eth_block_num)
228            .unwrap();
229        assert_eq!(
230            store
231                .get_eth_event_cursors(&[eth_contract_address])
232                .unwrap()[0]
233                .unwrap(),
234            eth_block_num
235        );
236
237        // update sui event cursor
238        let sui_module = Identifier::from_str("test").unwrap();
239        let sui_cursor = EventID {
240            tx_digest: TransactionDigest::random(),
241            event_seq: 1,
242        };
243        assert!(
244            store
245                .get_sui_event_cursors(std::slice::from_ref(&sui_module))
246                .unwrap()[0]
247                .is_none()
248        );
249        store
250            .update_sui_event_cursor(sui_module.clone(), sui_cursor)
251            .unwrap();
252        assert_eq!(
253            store
254                .get_sui_event_cursors(std::slice::from_ref(&sui_module))
255                .unwrap()[0]
256                .unwrap(),
257            sui_cursor
258        );
259    }
260}