sui_storage/
write_path_pending_tx_log.rs1use std::collections::HashSet;
10use std::path::PathBuf;
11
12use parking_lot::Mutex;
13use sui_types::base_types::TransactionDigest;
14use sui_types::crypto::EmptySignInfo;
15use sui_types::error::{SuiError, SuiResult};
16use sui_types::message_envelope::TrustedEnvelope;
17use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
18use typed_store::DBMapUtils;
19use typed_store::rocks::MetricConf;
20use typed_store::{rocks::DBMap, traits::Map};
21
22#[derive(DBMapUtils)]
23struct WritePathPendingTransactionTable {
24 logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
25}
26
27pub struct WritePathPendingTransactionLog {
28 pending_transactions: WritePathPendingTransactionTable,
30 transactions_set: Mutex<HashSet<TransactionDigest>>,
32}
33
34impl WritePathPendingTransactionLog {
35 pub fn new(path: PathBuf) -> Self {
36 let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
37 path,
38 MetricConf::new("pending_tx_log"),
39 None,
40 None,
41 );
42 Self {
43 pending_transactions,
44 transactions_set: Mutex::new(HashSet::new()),
45 }
46 }
47
48 pub fn write_pending_transaction_maybe(&self, tx: &VerifiedTransaction) -> bool {
51 let tx_digest = tx.digest();
52 let mut transactions_set = self.transactions_set.lock();
53 if transactions_set.contains(tx_digest) {
54 return false;
55 }
56 self.pending_transactions
58 .logs
59 .insert(tx_digest, tx.serializable_ref())
60 .unwrap();
61 transactions_set.insert(*tx_digest);
62 true
63 }
64
65 pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
66 let mut transactions_set = self.transactions_set.lock();
67 let mut write_batch = self.pending_transactions.logs.batch();
69 write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
70 write_batch.write().map_err(SuiError::from)?;
71 transactions_set.remove(tx);
72 Ok(())
73 }
74
75 pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
76 let mut transactions_set = self.transactions_set.lock();
77 let transactions = self
78 .pending_transactions
79 .logs
80 .safe_iter()
81 .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
82 .collect::<Result<Vec<_>, _>>()?;
83 transactions_set.extend(transactions.iter().map(|t| *t.digest()));
84 Ok(transactions)
85 }
86
87 pub fn is_empty(&self) -> bool {
88 self.transactions_set.lock().is_empty() && self.pending_transactions.logs.is_empty()
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use anyhow;
96 use std::collections::HashSet;
97 use sui_types::utils::create_fake_transaction;
98
99 #[tokio::test]
100 async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
101 let temp_dir = tempfile::tempdir().unwrap();
102 let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
103 let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
104 let tx_digest = *tx.digest();
105 assert!(pending_txes.write_pending_transaction_maybe(&tx));
106 assert!(!pending_txes.write_pending_transaction_maybe(&tx));
108
109 let loaded_txes = pending_txes.load_all_pending_transactions()?;
110 assert_eq!(vec![tx], loaded_txes);
111
112 pending_txes.finish_transaction(&tx_digest).unwrap();
113 let loaded_txes = pending_txes.load_all_pending_transactions()?;
114 assert!(loaded_txes.is_empty());
115
116 pending_txes.finish_transaction(&tx_digest).unwrap();
118
119 let txes: Vec<_> = (0..10)
121 .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
122 .collect();
123 for tx in txes.iter().take(10) {
124 assert!(pending_txes.write_pending_transaction_maybe(tx));
125 }
126 let loaded_tx_digests: HashSet<_> = pending_txes
127 .load_all_pending_transactions()?
128 .iter()
129 .map(|t| *t.digest())
130 .collect();
131 assert_eq!(
132 txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
133 loaded_tx_digests
134 );
135
136 for tx in txes.iter().take(5) {
137 pending_txes.finish_transaction(tx.digest()).unwrap();
138 }
139 let loaded_tx_digests: HashSet<_> = pending_txes
140 .load_all_pending_transactions()?
141 .iter()
142 .map(|t| *t.digest())
143 .collect();
144 assert_eq!(
145 txes.iter()
146 .skip(5)
147 .map(|t| *t.digest())
148 .collect::<HashSet<_>>(),
149 loaded_tx_digests
150 );
151
152 Ok(())
153 }
154}