sui_storage/
write_path_pending_tx_log.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

//! WritePathPendingTransactionLog is used in the transaction write path (e.g. in
//! TransactionOrchestrator) for transaction submission processing. It helps to achieve:
//! 1. At one time, a transaction is only processed once.
//! 2. When Fullnode crashes and restarts, the pending transaction will be loaded and retried.

use crate::mutex_table::MutexTable;
use std::path::PathBuf;
use sui_types::base_types::TransactionDigest;
use sui_types::crypto::EmptySignInfo;
use sui_types::error::{SuiError, SuiResult};
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
use typed_store::rocks::MetricConf;
use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::DBMapUtils;
use typed_store::{rocks::DBMap, traits::Map};

pub type IsFirstRecord = bool;
const NUM_SHARDS: usize = 4096;

#[derive(DBMapUtils)]
struct WritePathPendingTransactionTable {
    logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
}

pub struct WritePathPendingTransactionLog {
    pending_transactions: WritePathPendingTransactionTable,
    mutex_table: MutexTable<TransactionDigest>,
}

impl WritePathPendingTransactionLog {
    pub fn new(path: PathBuf) -> Self {
        let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
            path,
            MetricConf::new("pending_tx_log"),
            None,
            None,
        );
        Self {
            pending_transactions,
            mutex_table: MutexTable::new(NUM_SHARDS),
        }
    }

    // Returns whether the table currently has this transaction in record.
    // If not, write the transaction and return true; otherwise return false.
    // Because the record will be cleaned up when the transaction finishes,
    // even when it returns true, the callsite of this function should check
    // the transaction status before doing anything, to avoid duplicates.
    pub async fn write_pending_transaction_maybe(
        &self,
        tx: &VerifiedTransaction,
    ) -> SuiResult<IsFirstRecord> {
        let tx_digest = tx.digest();
        let _guard = self.mutex_table.acquire_lock(*tx_digest);
        if self.pending_transactions.logs.contains_key(tx_digest)? {
            Ok(false)
        } else {
            self.pending_transactions
                .logs
                .insert(tx_digest, tx.serializable_ref())?;
            Ok(true)
        }
    }

    // This function does not need to be behind a lock because:
    // 1. there could be more than one callsite but the deletion is idempotent.
    // 2. it does not race with the insert (`write_pending_transaction_maybe`)
    //    in a way that we care.
    //    2.a. for one transaction, `finish_transaction` shouldn't predate
    //        `write_pending_transaction_maybe`.
    //    2.b  for concurrent requests of one transaction, a call to this
    //        function may happen in between hence making the second request
    //        thinks it is the first record. It's preventable by checking this
    //        transaction again after the call of `write_pending_transaction_maybe`.
    pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
        let mut write_batch = self.pending_transactions.logs.batch();
        write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
        write_batch.write().map_err(SuiError::from)
    }

    pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
        Ok(self
            .pending_transactions
            .logs
            .safe_iter()
            .map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
            .collect::<Result<Vec<_>, _>>()?)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use anyhow;
    use std::collections::HashSet;
    use sui_types::utils::create_fake_transaction;

    #[tokio::test]
    async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
        let temp_dir = tempfile::tempdir().unwrap();
        let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
        let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
        let tx_digest = *tx.digest();
        assert!(pending_txes
            .write_pending_transaction_maybe(&tx)
            .await
            .unwrap());
        // The second write will return false
        assert!(!pending_txes
            .write_pending_transaction_maybe(&tx)
            .await
            .unwrap());

        let loaded_txes = pending_txes.load_all_pending_transactions()?;
        assert_eq!(vec![tx], loaded_txes);

        pending_txes.finish_transaction(&tx_digest).unwrap();
        let loaded_txes = pending_txes.load_all_pending_transactions()?;
        assert!(loaded_txes.is_empty());

        // It's ok to finish an already finished transaction
        pending_txes.finish_transaction(&tx_digest).unwrap();

        // Test writing and finishing more transactions
        let txes: Vec<_> = (0..10)
            .map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
            .collect();
        for tx in txes.iter().take(10) {
            assert!(pending_txes
                .write_pending_transaction_maybe(tx)
                .await
                .unwrap());
        }
        let loaded_tx_digests: HashSet<_> = pending_txes
            .load_all_pending_transactions()?
            .iter()
            .map(|t| *t.digest())
            .collect();
        assert_eq!(
            txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
            loaded_tx_digests
        );

        for tx in txes.iter().take(5) {
            pending_txes.finish_transaction(tx.digest()).unwrap();
        }
        let loaded_tx_digests: HashSet<_> = pending_txes
            .load_all_pending_transactions()?
            .iter()
            .map(|t| *t.digest())
            .collect();
        assert_eq!(
            txes.iter()
                .skip(5)
                .map(|t| *t.digest())
                .collect::<HashSet<_>>(),
            loaded_tx_digests
        );

        Ok(())
    }
}