sui_storage/
write_path_pending_tx_log.rsuse crate::mutex_table::MutexTable;
use std::path::PathBuf;
use sui_types::base_types::TransactionDigest;
use sui_types::crypto::EmptySignInfo;
use sui_types::error::{SuiError, SuiResult};
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::transaction::{SenderSignedData, VerifiedTransaction};
use typed_store::rocks::MetricConf;
use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::DBMapUtils;
use typed_store::{rocks::DBMap, traits::Map};
pub type IsFirstRecord = bool;
const NUM_SHARDS: usize = 4096;
#[derive(DBMapUtils)]
struct WritePathPendingTransactionTable {
logs: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, EmptySignInfo>>,
}
pub struct WritePathPendingTransactionLog {
pending_transactions: WritePathPendingTransactionTable,
mutex_table: MutexTable<TransactionDigest>,
}
impl WritePathPendingTransactionLog {
pub fn new(path: PathBuf) -> Self {
let pending_transactions = WritePathPendingTransactionTable::open_tables_read_write(
path,
MetricConf::new("pending_tx_log"),
None,
None,
);
Self {
pending_transactions,
mutex_table: MutexTable::new(NUM_SHARDS),
}
}
pub async fn write_pending_transaction_maybe(
&self,
tx: &VerifiedTransaction,
) -> SuiResult<IsFirstRecord> {
let tx_digest = tx.digest();
let _guard = self.mutex_table.acquire_lock(*tx_digest);
if self.pending_transactions.logs.contains_key(tx_digest)? {
Ok(false)
} else {
self.pending_transactions
.logs
.insert(tx_digest, tx.serializable_ref())?;
Ok(true)
}
}
pub fn finish_transaction(&self, tx: &TransactionDigest) -> SuiResult {
let mut write_batch = self.pending_transactions.logs.batch();
write_batch.delete_batch(&self.pending_transactions.logs, std::iter::once(tx))?;
write_batch.write().map_err(SuiError::from)
}
pub fn load_all_pending_transactions(&self) -> SuiResult<Vec<VerifiedTransaction>> {
Ok(self
.pending_transactions
.logs
.safe_iter()
.map(|item| item.map(|(_tx_digest, tx)| VerifiedTransaction::from(tx)))
.collect::<Result<Vec<_>, _>>()?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow;
use std::collections::HashSet;
use sui_types::utils::create_fake_transaction;
#[tokio::test]
async fn test_pending_tx_log_basic() -> anyhow::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let pending_txes = WritePathPendingTransactionLog::new(temp_dir.path().to_path_buf());
let tx = VerifiedTransaction::new_unchecked(create_fake_transaction());
let tx_digest = *tx.digest();
assert!(pending_txes
.write_pending_transaction_maybe(&tx)
.await
.unwrap());
assert!(!pending_txes
.write_pending_transaction_maybe(&tx)
.await
.unwrap());
let loaded_txes = pending_txes.load_all_pending_transactions()?;
assert_eq!(vec![tx], loaded_txes);
pending_txes.finish_transaction(&tx_digest).unwrap();
let loaded_txes = pending_txes.load_all_pending_transactions()?;
assert!(loaded_txes.is_empty());
pending_txes.finish_transaction(&tx_digest).unwrap();
let txes: Vec<_> = (0..10)
.map(|_| VerifiedTransaction::new_unchecked(create_fake_transaction()))
.collect();
for tx in txes.iter().take(10) {
assert!(pending_txes
.write_pending_transaction_maybe(tx)
.await
.unwrap());
}
let loaded_tx_digests: HashSet<_> = pending_txes
.load_all_pending_transactions()?
.iter()
.map(|t| *t.digest())
.collect();
assert_eq!(
txes.iter().map(|t| *t.digest()).collect::<HashSet<_>>(),
loaded_tx_digests
);
for tx in txes.iter().take(5) {
pending_txes.finish_transaction(tx.digest()).unwrap();
}
let loaded_tx_digests: HashSet<_> = pending_txes
.load_all_pending_transactions()?
.iter()
.map(|t| *t.digest())
.collect();
assert_eq!(
txes.iter()
.skip(5)
.map(|t| *t.digest())
.collect::<HashSet<_>>(),
loaded_tx_digests
);
Ok(())
}
}