1use std::collections::HashMap;
5use std::path::Path;
6use std::sync::Arc;
7use sui_types::Identifier;
8
9use sui_types::event::EventID;
10use typed_store::DBMapUtils;
11use typed_store::Map;
12use typed_store::rocks::{DBMap, MetricConf};
13
14use crate::error::{BridgeError, BridgeResult};
15use crate::types::{BridgeAction, BridgeActionDigest};
16
17#[derive(DBMapUtils)]
18pub struct BridgeOrchestratorTables {
19 pub(crate) pending_actions: DBMap<BridgeActionDigest, BridgeAction>,
21 pub(crate) sui_syncer_cursors: DBMap<Identifier, EventID>,
23 pub(crate) eth_syncer_cursors: DBMap<ethers::types::Address, u64>,
25}
26
27impl BridgeOrchestratorTables {
28 pub fn new(path: &Path) -> Arc<Self> {
29 Arc::new(Self::open_tables_read_write(
30 path.to_path_buf(),
31 MetricConf::new("bridge"),
32 None,
33 None,
34 ))
35 }
36
37 pub(crate) fn insert_pending_actions(&self, actions: &[BridgeAction]) -> BridgeResult<()> {
38 let mut batch = self.pending_actions.batch();
39 batch
40 .insert_batch(
41 &self.pending_actions,
42 actions.iter().map(|a| (a.digest(), a)),
43 )
44 .map_err(|e| {
45 BridgeError::StorageError(format!("Couldn't insert into pending_actions: {:?}", e))
46 })?;
47 batch
48 .write()
49 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
50 }
51
52 pub(crate) fn remove_pending_actions(
53 &self,
54 actions: &[BridgeActionDigest],
55 ) -> BridgeResult<()> {
56 let mut batch = self.pending_actions.batch();
57 batch
58 .delete_batch(&self.pending_actions, actions)
59 .map_err(|e| {
60 BridgeError::StorageError(format!("Couldn't delete from pending_actions: {:?}", e))
61 })?;
62 batch
63 .write()
64 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
65 }
66
67 pub(crate) fn update_sui_event_cursor(
68 &self,
69 module: Identifier,
70 cursor: EventID,
71 ) -> BridgeResult<()> {
72 let mut batch = self.sui_syncer_cursors.batch();
73
74 batch
75 .insert_batch(&self.sui_syncer_cursors, [(module, cursor)])
76 .map_err(|e| {
77 BridgeError::StorageError(format!(
78 "Coudln't insert into sui_syncer_cursors: {:?}",
79 e
80 ))
81 })?;
82 batch
83 .write()
84 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
85 }
86
87 pub(crate) fn update_eth_event_cursor(
88 &self,
89 contract_address: ethers::types::Address,
90 cursor: u64,
91 ) -> BridgeResult<()> {
92 let mut batch = self.eth_syncer_cursors.batch();
93
94 batch
95 .insert_batch(&self.eth_syncer_cursors, [(contract_address, cursor)])
96 .map_err(|e| {
97 BridgeError::StorageError(format!(
98 "Coudln't insert into eth_syncer_cursors: {:?}",
99 e
100 ))
101 })?;
102 batch
103 .write()
104 .map_err(|e| BridgeError::StorageError(format!("Couldn't write batch: {:?}", e)))
105 }
106
107 pub fn get_all_pending_actions(&self) -> HashMap<BridgeActionDigest, BridgeAction> {
108 self.pending_actions
109 .safe_iter()
110 .collect::<Result<HashMap<_, _>, _>>()
111 .expect("failed to get all pending actions")
112 }
113
114 pub fn get_sui_event_cursors(
115 &self,
116 identifiers: &[Identifier],
117 ) -> BridgeResult<Vec<Option<EventID>>> {
118 self.sui_syncer_cursors.multi_get(identifiers).map_err(|e| {
119 BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
120 })
121 }
122
123 pub fn get_eth_event_cursors(
124 &self,
125 contract_addresses: &[ethers::types::Address],
126 ) -> BridgeResult<Vec<Option<u64>>> {
127 self.eth_syncer_cursors
128 .multi_get(contract_addresses)
129 .map_err(|e| {
130 BridgeError::StorageError(format!("Couldn't get sui_syncer_cursors: {:?}", e))
131 })
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use std::str::FromStr;
138
139 use sui_types::digests::TransactionDigest;
140
141 use crate::test_utils::get_test_sui_to_eth_bridge_action;
142
143 use super::*;
144
145 #[tokio::test]
147 async fn test_bridge_storage_basic() {
148 let temp_dir = tempfile::tempdir().unwrap();
149 let store = BridgeOrchestratorTables::new(temp_dir.path());
150
151 let action1 = get_test_sui_to_eth_bridge_action(
152 None,
153 Some(0),
154 Some(99),
155 Some(10000),
156 None,
157 None,
158 None,
159 );
160
161 let action2 = get_test_sui_to_eth_bridge_action(
162 None,
163 Some(2),
164 Some(100),
165 Some(10000),
166 None,
167 None,
168 None,
169 );
170
171 let actions = store.get_all_pending_actions();
173 assert!(actions.is_empty());
174
175 store.remove_pending_actions(&[action1.digest()]).unwrap();
177
178 store
179 .insert_pending_actions(&vec![action1.clone(), action2.clone()])
180 .unwrap();
181
182 let actions = store.get_all_pending_actions();
183 assert_eq!(
184 actions,
185 HashMap::from_iter(vec![
186 (action1.digest(), action1.clone()),
187 (action2.digest(), action2.clone())
188 ])
189 );
190
191 store
193 .insert_pending_actions(std::slice::from_ref(&action1))
194 .unwrap();
195 let actions = store.get_all_pending_actions();
196 assert_eq!(
197 actions,
198 HashMap::from_iter(vec![
199 (action1.digest(), action1.clone()),
200 (action2.digest(), action2.clone())
201 ])
202 );
203
204 store.remove_pending_actions(&[action2.digest()]).unwrap();
206 let actions = store.get_all_pending_actions();
207 assert_eq!(
208 actions,
209 HashMap::from_iter(vec![(action1.digest(), action1.clone())])
210 );
211
212 store.remove_pending_actions(&[action1.digest()]).unwrap();
214 let actions = store.get_all_pending_actions();
215 assert!(actions.is_empty());
216
217 let eth_contract_address = ethers::types::Address::random();
219 let eth_block_num = 199999u64;
220 assert!(
221 store
222 .get_eth_event_cursors(&[eth_contract_address])
223 .unwrap()[0]
224 .is_none()
225 );
226 store
227 .update_eth_event_cursor(eth_contract_address, eth_block_num)
228 .unwrap();
229 assert_eq!(
230 store
231 .get_eth_event_cursors(&[eth_contract_address])
232 .unwrap()[0]
233 .unwrap(),
234 eth_block_num
235 );
236
237 let sui_module = Identifier::from_str("test").unwrap();
239 let sui_cursor = EventID {
240 tx_digest: TransactionDigest::random(),
241 event_seq: 1,
242 };
243 assert!(
244 store
245 .get_sui_event_cursors(std::slice::from_ref(&sui_module))
246 .unwrap()[0]
247 .is_none()
248 );
249 store
250 .update_sui_event_cursor(sui_module.clone(), sui_cursor)
251 .unwrap();
252 assert_eq!(
253 store
254 .get_sui_event_cursors(std::slice::from_ref(&sui_module))
255 .unwrap()[0]
256 .unwrap(),
257 sui_cursor
258 );
259 }
260}