sui_storage/
write_path_pending_tx_log.rs1use std::collections::HashSet;
10use std::path::PathBuf;
11
12use parking_lot::Mutex;
13use sui_types::base_types::TransactionDigest;
14use sui_types::crypto::EmptySignInfo;
15use sui_types::error::{SuiError, SuiResult};
16use sui_types::message_envelope::TrustedEnvelope;
17use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
18use typed_store::DBMapUtils;
19use typed_store::rocks::MetricConf;
20use typed_store::{rocks::DBMap, traits::Map};
21
22#[derive(DBMapUtils)]
23struct WritePathPendingTransactionTable {
24 logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
25}
26
27pub struct WritePathPendingTransactionLog {
28 pending_transactions: WritePathPendingTransactionTable,
30 transactions_set: Mutex<HashSet<TransactionDigest>>,
32}
33
34impl WritePathPendingTransactionLog {
35 pub fn new(path: PathBuf) -> Self {
36 let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
37 path,
38 MetricConf::new("pending_tx_log"),
39 None,
40 None,
41 );
42 Self {
43 pending_transactions,
44 transactions_set: Mutex::new(HashSet::new()),
45 }
46 }
47
48 pub async fn write_pending_transaction_maybe(
54 &self,
55 tx: &VerifiedTransaction,
56 ) -> SuiResult<bool> {
57 let tx_digest = tx.digest();
58 let mut transactions_set = self.transactions_set.lock();
59 if transactions_set.contains(tx_digest) {
60 return Ok(false);
61 }
62 self.pending_transactions
63 .logs
64 .insert(tx_digest, tx.serializable_ref())?;
65 transactions_set.insert(*tx_digest);
66 Ok(true)
67 }
68
69 pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
70 let mut transactions_set = self.transactions_set.lock();
71 let mut write_batch = self.pending_transactions.logs.batch();
72 write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
73 write_batch.write().map_err(SuiError::from)?;
74 transactions_set.remove(tx);
75 Ok(())
76 }
77
78 pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
79 let mut transactions_set = self.transactions_set.lock();
80 let transactions = self
81 .pending_transactions
82 .logs
83 .safe_iter()
84 .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
85 .collect::<Result<Vec<_>, _>>()?;
86 transactions_set.extend(transactions.iter().map(|t| *t.digest()));
87 Ok(transactions)
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use anyhow;
95 use std::collections::HashSet;
96 use sui_types::utils::create_fake_transaction;
97
98 #[tokio::test]
99 async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
100 let temp_dir = tempfile::tempdir().unwrap();
101 let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
102 let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
103 let tx_digest = *tx.digest();
104 assert!(
105 pending_txes
106 .write_pending_transaction_maybe(&tx)
107 .await
108 .unwrap()
109 );
110 assert!(
112 !pending_txes
113 .write_pending_transaction_maybe(&tx)
114 .await
115 .unwrap()
116 );
117
118 let loaded_txes = pending_txes.load_all_pending_transactions()?;
119 assert_eq!(vec![tx], loaded_txes);
120
121 pending_txes.finish_transaction(&tx_digest).unwrap();
122 let loaded_txes = pending_txes.load_all_pending_transactions()?;
123 assert!(loaded_txes.is_empty());
124
125 pending_txes.finish_transaction(&tx_digest).unwrap();
127
128 let txes: Vec<_> = (0..10)
130 .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
131 .collect();
132 for tx in txes.iter().take(10) {
133 assert!(
134 pending_txes
135 .write_pending_transaction_maybe(tx)
136 .await
137 .unwrap()
138 );
139 }
140 let loaded_tx_digests: HashSet<_> = pending_txes
141 .load_all_pending_transactions()?
142 .iter()
143 .map(|t| *t.digest())
144 .collect();
145 assert_eq!(
146 txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
147 loaded_tx_digests
148 );
149
150 for tx in txes.iter().take(5) {
151 pending_txes.finish_transaction(tx.digest()).unwrap();
152 }
153 let loaded_tx_digests: HashSet<_> = pending_txes
154 .load_all_pending_transactions()?
155 .iter()
156 .map(|t| *t.digest())
157 .collect();
158 assert_eq!(
159 txes.iter()
160 .skip(5)
161 .map(|t| *t.digest())
162 .collect::<HashSet<_>>(),
163 loaded_tx_digests
164 );
165
166 Ok(())
167 }
168}