sui_replay/
transaction_provider.rs1use crate::{
5 data_fetcher::{DataFetcher, RemoteFetcher},
6 types::{MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD, ReplayEngineError},
7};
8use std::{collections::VecDeque, fmt::Formatter};
9use std::{fmt::Debug, str::FromStr};
10use sui_sdk::SuiClientBuilder;
11use sui_types::digests::TransactionDigest;
12use tracing::info;
13
14const VALID_CHECKPOINT_START: u64 = 1;
15
16#[derive(Clone, Debug)]
17pub enum TransactionSource {
18 Random,
20 FromCheckpoint(u64),
22 TailLatest { start: Option<FuzzStartPoint> },
24 FromInclusiveCheckpointRange {
26 checkpoint_start: u64,
27 checkpoint_end: u64,
28 },
29}
30
31#[derive(Clone)]
32pub struct TransactionProvider {
33 pub fetcher: RemoteFetcher,
34 pub source: TransactionSource,
35 pub last_checkpoint: Option<u64>,
36 pub transactions_left: VecDeque<TransactionDigest>,
37}
38
39#[derive(Eq, PartialEq, Clone, Copy, PartialOrd, Ord, Hash, Debug)]
40pub enum FuzzStartPoint {
41 Checkpoint(u64),
42 TxDigest(TransactionDigest),
43}
44
45impl FromStr for FuzzStartPoint {
46 type Err = anyhow::Error;
47
48 fn from_str(s: &str) -> Result<Self, Self::Err> {
49 match s.parse::<u64>() {
50 Ok(n) => Ok(FuzzStartPoint::Checkpoint(n)),
51 Err(u64_err) => match TransactionDigest::from_str(s) {
52 Ok(d) => Ok(FuzzStartPoint::TxDigest(d)),
53 Err(tx_err) => {
54 info!(
55 "{} is not a valid checkpoint (err: {:?}) or transaction digest (err: {:?})",
56 s, u64_err, tx_err
57 );
58 Err(tx_err)
59 }
60 },
61 }
62 }
63}
64
65impl Debug for TransactionProvider {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("TransactionProvider")
68 .field("source", &self.source)
71 .field("last_checkpoint", &self.last_checkpoint)
72 .field("transactions_left", &self.transactions_left)
73 .finish()
74 }
75}
76
77impl TransactionProvider {
78 pub async fn new(http_url: &str, source: TransactionSource) -> Result<Self, ReplayEngineError> {
79 Ok(Self {
80 fetcher: RemoteFetcher::new(
81 SuiClientBuilder::default()
82 .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
83 .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
84 .build(http_url)
85 .await?,
86 ),
87 source,
88 last_checkpoint: None,
89 transactions_left: VecDeque::new(),
90 })
91 }
92
93 pub async fn next(&mut self) -> Result<Option<TransactionDigest>, ReplayEngineError> {
94 let tx = match self.source {
95 TransactionSource::Random => {
96 let tx = self.fetcher.fetch_random_transaction(None, None).await?;
97 Some(tx)
98 }
99 TransactionSource::FromCheckpoint(checkpoint_id) => {
100 let tx = self
101 .fetcher
102 .fetch_random_transaction(Some(checkpoint_id), Some(checkpoint_id))
103 .await?;
104 Some(tx)
105 }
106 TransactionSource::TailLatest { start } => {
107 if let Some(tx) = self.transactions_left.pop_front() {
108 Some(tx)
109 } else {
110 let next_checkpoint = match start {
111 Some(x) => match x {
112 FuzzStartPoint::Checkpoint(checkpoint_id) => {
114 self.source = TransactionSource::TailLatest {
115 start: Some(FuzzStartPoint::Checkpoint(checkpoint_id + 1)),
116 };
117 Some(checkpoint_id)
118 }
119 FuzzStartPoint::TxDigest(tx_digest) => {
121 let ch = self
122 .fetcher
123 .get_transaction(&tx_digest)
124 .await?
125 .checkpoint
126 .expect("Transaction must have a checkpoint");
127 self.source = TransactionSource::TailLatest {
129 start: Some(FuzzStartPoint::Checkpoint(ch + 1)),
130 };
131 Some(ch)
132 }
133 },
134 None => self.last_checkpoint.map(|c| c + 1),
136 }
137 .unwrap_or(VALID_CHECKPOINT_START);
138
139 self.transactions_left = self
140 .fetcher
141 .get_checkpoint_txs(next_checkpoint)
142 .await?
143 .into();
144 self.last_checkpoint = Some(next_checkpoint);
145 self.transactions_left.pop_front()
146 }
147 }
148 TransactionSource::FromInclusiveCheckpointRange {
149 checkpoint_start,
150 checkpoint_end,
151 } => {
152 let tx = self
153 .fetcher
154 .fetch_random_transaction(Some(checkpoint_start), Some(checkpoint_end))
155 .await?;
156 Some(tx)
157 }
158 };
159
160 Ok(tx)
161 }
162}