sui_storage/
write_path_pending_tx_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WritePathPendingTransactionLog is used in TransactionOrchestrator
5//! to deduplicate transaction submission processing. It helps to achieve:
6//! 1. At one time, a transaction is only processed once.
7//! 2. When Fullnode crashes and restarts, the pending transaction will be loaded and retried.
8
9use std::collections::HashSet;
10use std::path::PathBuf;
11
12use parking_lot::Mutex;
13use sui_types::base_types::TransactionDigest;
14use sui_types::crypto::EmptySignInfo;
15use sui_types::error::{SuiError, SuiResult};
16use sui_types::message_envelope::TrustedEnvelope;
17use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
18use typed_store::DBMapUtils;
19use typed_store::rocks::MetricConf;
20use typed_store::{rocks::DBMap, traits::Map};
21
22#[derive(DBMapUtils)]
23struct WritePathPendingTransactionTable {
24    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
25}
26
27pub struct WritePathPendingTransactionLog {
28    // Disk storage for pending transactions.
29    pending_transactions: WritePathPendingTransactionTable,
30    // In-memory set of pending transactions.
31    transactions_set: Mutex<HashSet<TransactionDigest>>,
32}
33
34impl WritePathPendingTransactionLog {
35    pub fn new(path: PathBuf) -> Self {
36        let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
37            path,
38            MetricConf::new("pending_tx_log"),
39            None,
40            None,
41        );
42        Self {
43            pending_transactions,
44            transactions_set: Mutex::new(HashSet::new()),
45        }
46    }
47
48    // Returns whether the table currently has this transaction in record.
49    // If not, write the transaction and return true; otherwise return false.
50    pub fn write_pending_transaction_maybe(&self, tx: &VerifiedTransaction) -> bool {
51        let tx_digest = tx.digest();
52        let mut transactions_set = self.transactions_set.lock();
53        if transactions_set.contains(tx_digest) {
54            return false;
55        }
56        // Hold the lock while inserting into the logs to avoid race conditions.
57        self.pending_transactions
58            .logs
59            .insert(tx_digest, tx.serializable_ref())
60            .unwrap();
61        transactions_set.insert(*tx_digest);
62        true
63    }
64
65    pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
66        let mut transactions_set = self.transactions_set.lock();
67        // Hold the lock while removing from the logs to avoid race conditions.
68        let mut write_batch = self.pending_transactions.logs.batch();
69        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
70        write_batch.write().map_err(SuiError::from)?;
71        transactions_set.remove(tx);
72        Ok(())
73    }
74
75    pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
76        let mut transactions_set = self.transactions_set.lock();
77        let transactions = self
78            .pending_transactions
79            .logs
80            .safe_iter()
81            .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
82            .collect::<Result<Vec<_>, _>>()?;
83        transactions_set.extend(transactions.iter().map(|t| *t.digest()));
84        Ok(transactions)
85    }
86
87    pub fn is_empty(&self) -> bool {
88        self.transactions_set.lock().is_empty() && self.pending_transactions.logs.is_empty()
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use anyhow;
96    use std::collections::HashSet;
97    use sui_types::utils::create_fake_transaction;
98
99    #[tokio::test]
100    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
101        let temp_dir = tempfile::tempdir().unwrap();
102        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
103        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
104        let tx_digest = *tx.digest();
105        assert!(pending_txes.write_pending_transaction_maybe(&tx));
106        // The second write will return false
107        assert!(!pending_txes.write_pending_transaction_maybe(&tx));
108
109        let loaded_txes = pending_txes.load_all_pending_transactions()?;
110        assert_eq!(vec![tx], loaded_txes);
111
112        pending_txes.finish_transaction(&tx_digest).unwrap();
113        let loaded_txes = pending_txes.load_all_pending_transactions()?;
114        assert!(loaded_txes.is_empty());
115
116        // It's ok to finish an already finished transaction
117        pending_txes.finish_transaction(&tx_digest).unwrap();
118
119        // Test writing and finishing more transactions
120        let txes: Vec<_> = (0..10)
121            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
122            .collect();
123        for tx in txes.iter().take(10) {
124            assert!(pending_txes.write_pending_transaction_maybe(tx));
125        }
126        let loaded_tx_digests: HashSet<_> = pending_txes
127            .load_all_pending_transactions()?
128            .iter()
129            .map(|t| *t.digest())
130            .collect();
131        assert_eq!(
132            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
133            loaded_tx_digests
134        );
135
136        for tx in txes.iter().take(5) {
137            pending_txes.finish_transaction(tx.digest()).unwrap();
138        }
139        let loaded_tx_digests: HashSet<_> = pending_txes
140            .load_all_pending_transactions()?
141            .iter()
142            .map(|t| *t.digest())
143            .collect();
144        assert_eq!(
145            txes.iter()
146                .skip(5)
147                .map(|t| *t.digest())
148                .collect::<HashSet<_>>(),
149            loaded_tx_digests
150        );
151
152        Ok(())
153    }
154}