sui_storage/
write_path_pending_tx_log.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! WritePathPendingTransactionLog is used in TransactionOrchestrator
5//! to deduplicate transaction submission processing. It helps to achieve:
6//! 1. At one time, a transaction is only processed once.
7//! 2. When Fullnode crashes and restarts, the pending transaction will be loaded and retried.
8
9use std::collections::HashSet;
10use std::path::PathBuf;
11
12use parking_lot::Mutex;
13use sui_types::base_types::TransactionDigest;
14use sui_types::crypto::EmptySignInfo;
15use sui_types::error::{SuiError, SuiResult};
16use sui_types::message_envelope::TrustedEnvelope;
17use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
18use typed_store::DBMapUtils;
19use typed_store::rocks::MetricConf;
20use typed_store::{rocks::DBMap, traits::Map};
21
22#[derive(DBMapUtils)]
23struct WritePathPendingTransactionTable {
24    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
25}
26
27pub struct WritePathPendingTransactionLog {
28    // Disk storage for pending transactions.
29    pending_transactions: WritePathPendingTransactionTable,
30    // In-memory set of pending transactions.
31    transactions_set: Mutex<HashSet<TransactionDigest>>,
32}
33
34impl WritePathPendingTransactionLog {
35    pub fn new(path: PathBuf) -> Self {
36        let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
37            path,
38            MetricConf::new("pending_tx_log"),
39            None,
40            None,
41        );
42        Self {
43            pending_transactions,
44            transactions_set: Mutex::new(HashSet::new()),
45        }
46    }
47
48    // Returns whether the table currently has this transaction in record.
49    // If not, write the transaction and return true; otherwise return false.
50    // Because the record will be cleaned up when the transaction finishes,
51    // even when it returns true, the callsite of this function should check
52    // the transaction status before doing anything, to avoid duplicates.
53    pub async fn write_pending_transaction_maybe(
54        &self,
55        tx: &VerifiedTransaction,
56    ) -> SuiResult<bool> {
57        let tx_digest = tx.digest();
58        let mut transactions_set = self.transactions_set.lock();
59        if transactions_set.contains(tx_digest) {
60            return Ok(false);
61        }
62        self.pending_transactions
63            .logs
64            .insert(tx_digest, tx.serializable_ref())?;
65        transactions_set.insert(*tx_digest);
66        Ok(true)
67    }
68
69    pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
70        let mut transactions_set = self.transactions_set.lock();
71        let mut write_batch = self.pending_transactions.logs.batch();
72        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
73        write_batch.write().map_err(SuiError::from)?;
74        transactions_set.remove(tx);
75        Ok(())
76    }
77
78    pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
79        let mut transactions_set = self.transactions_set.lock();
80        let transactions = self
81            .pending_transactions
82            .logs
83            .safe_iter()
84            .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
85            .collect::<Result<Vec<_>, _>>()?;
86        transactions_set.extend(transactions.iter().map(|t| *t.digest()));
87        Ok(transactions)
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use anyhow;
95    use std::collections::HashSet;
96    use sui_types::utils::create_fake_transaction;
97
98    #[tokio::test]
99    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
100        let temp_dir = tempfile::tempdir().unwrap();
101        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
102        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
103        let tx_digest = *tx.digest();
104        assert!(
105            pending_txes
106                .write_pending_transaction_maybe(&tx)
107                .await
108                .unwrap()
109        );
110        // The second write will return false
111        assert!(
112            !pending_txes
113                .write_pending_transaction_maybe(&tx)
114                .await
115                .unwrap()
116        );
117
118        let loaded_txes = pending_txes.load_all_pending_transactions()?;
119        assert_eq!(vec![tx], loaded_txes);
120
121        pending_txes.finish_transaction(&tx_digest).unwrap();
122        let loaded_txes = pending_txes.load_all_pending_transactions()?;
123        assert!(loaded_txes.is_empty());
124
125        // It's ok to finish an already finished transaction
126        pending_txes.finish_transaction(&tx_digest).unwrap();
127
128        // Test writing and finishing more transactions
129        let txes: Vec<_> = (0..10)
130            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
131            .collect();
132        for tx in txes.iter().take(10) {
133            assert!(
134                pending_txes
135                    .write_pending_transaction_maybe(tx)
136                    .await
137                    .unwrap()
138            );
139        }
140        let loaded_tx_digests: HashSet<_> = pending_txes
141            .load_all_pending_transactions()?
142            .iter()
143            .map(|t| *t.digest())
144            .collect();
145        assert_eq!(
146            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
147            loaded_tx_digests
148        );
149
150        for tx in txes.iter().take(5) {
151            pending_txes.finish_transaction(tx.digest()).unwrap();
152        }
153        let loaded_tx_digests: HashSet<_> = pending_txes
154            .load_all_pending_transactions()?
155            .iter()
156            .map(|t| *t.digest())
157            .collect();
158        assert_eq!(
159            txes.iter()
160                .skip(5)
161                .map(|t| *t.digest())
162                .collect::<HashSet<_>>(),
163            loaded_tx_digests
164        );
165
166        Ok(())
167    }
168}