sui_replay/
transaction_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    data_fetcher::{DataFetcher, RemoteFetcher},
6    types::{MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD, ReplayEngineError},
7};
8use std::{collections::VecDeque, fmt::Formatter};
9use std::{fmt::Debug, str::FromStr};
10use sui_sdk::SuiClientBuilder;
11use sui_types::digests::TransactionDigest;
12use tracing::info;
13
14const VALID_CHECKPOINT_START: u64 = 1;
15
16#[derive(Clone, Debug)]
17pub enum TransactionSource {
18    /// Fetch a random transaction from the network
19    Random,
20    /// Fetch a transaction from the network with a specific checkpoint ID
21    FromCheckpoint(u64),
22    /// Use the latest transaction from the network
23    TailLatest { start: Option<FuzzStartPoint> },
24    /// Use a random transaction from an inclusive range of checkpoint IDs
25    FromInclusiveCheckpointRange {
26        checkpoint_start: u64,
27        checkpoint_end: u64,
28    },
29}
30
31#[derive(Clone)]
32pub struct TransactionProvider {
33    pub fetcher: RemoteFetcher,
34    pub source: TransactionSource,
35    pub last_checkpoint: Option<u64>,
36    pub transactions_left: VecDeque<TransactionDigest>,
37}
38
39#[derive(Eq, PartialEq, Clone, Copy, PartialOrd, Ord, Hash, Debug)]
40pub enum FuzzStartPoint {
41    Checkpoint(u64),
42    TxDigest(TransactionDigest),
43}
44
45impl FromStr for FuzzStartPoint {
46    type Err = anyhow::Error;
47
48    fn from_str(s: &str) -> Result<Self, Self::Err> {
49        match s.parse::<u64>() {
50            Ok(n) => Ok(FuzzStartPoint::Checkpoint(n)),
51            Err(u64_err) => match TransactionDigest::from_str(s) {
52                Ok(d) => Ok(FuzzStartPoint::TxDigest(d)),
53                Err(tx_err) => {
54                    info!(
55                        "{} is not a valid checkpoint (err: {:?}) or transaction digest (err: {:?})",
56                        s, u64_err, tx_err
57                    );
58                    Err(tx_err)
59                }
60            },
61        }
62    }
63}
64
65impl Debug for TransactionProvider {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("TransactionProvider")
68            // TODO: impl Debug for fetcher
69            //.field("fetcher", &self.fetcher)
70            .field("source", &self.source)
71            .field("last_checkpoint", &self.last_checkpoint)
72            .field("transactions_left", &self.transactions_left)
73            .finish()
74    }
75}
76
77impl TransactionProvider {
78    pub async fn new(http_url: &str, source: TransactionSource) -> Result<Self, ReplayEngineError> {
79        Ok(Self {
80            fetcher: RemoteFetcher::new(
81                SuiClientBuilder::default()
82                    .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
83                    .max_concurrent_requests(MAX_CONCURRENT_REQUESTS)
84                    .build(http_url)
85                    .await?,
86            ),
87            source,
88            last_checkpoint: None,
89            transactions_left: VecDeque::new(),
90        })
91    }
92
93    pub async fn next(&mut self) -> Result<Option<TransactionDigest>, ReplayEngineError> {
94        let tx = match self.source {
95            TransactionSource::Random => {
96                let tx = self.fetcher.fetch_random_transaction(None, None).await?;
97                Some(tx)
98            }
99            TransactionSource::FromCheckpoint(checkpoint_id) => {
100                let tx = self
101                    .fetcher
102                    .fetch_random_transaction(Some(checkpoint_id), Some(checkpoint_id))
103                    .await?;
104                Some(tx)
105            }
106            TransactionSource::TailLatest { start } => {
107                if let Some(tx) = self.transactions_left.pop_front() {
108                    Some(tx)
109                } else {
110                    let next_checkpoint = match start {
111                        Some(x) => match x {
112                            // Checkpoint specified
113                            FuzzStartPoint::Checkpoint(checkpoint_id) => {
114                                self.source = TransactionSource::TailLatest {
115                                    start: Some(FuzzStartPoint::Checkpoint(checkpoint_id + 1)),
116                                };
117                                Some(checkpoint_id)
118                            }
119                            // Digest specified. Find the checkpoint for the digest
120                            FuzzStartPoint::TxDigest(tx_digest) => {
121                                let ch = self
122                                    .fetcher
123                                    .get_transaction(&tx_digest)
124                                    .await?
125                                    .checkpoint
126                                    .expect("Transaction must have a checkpoint");
127                                // For the next round
128                                self.source = TransactionSource::TailLatest {
129                                    start: Some(FuzzStartPoint::Checkpoint(ch + 1)),
130                                };
131                                Some(ch)
132                            }
133                        },
134                        // Advance to next checkpoint if available
135                        None => self.last_checkpoint.map(|c| c + 1),
136                    }
137                    .unwrap_or(VALID_CHECKPOINT_START);
138
139                    self.transactions_left = self
140                        .fetcher
141                        .get_checkpoint_txs(next_checkpoint)
142                        .await?
143                        .into();
144                    self.last_checkpoint = Some(next_checkpoint);
145                    self.transactions_left.pop_front()
146                }
147            }
148            TransactionSource::FromInclusiveCheckpointRange {
149                checkpoint_start,
150                checkpoint_end,
151            } => {
152                let tx = self
153                    .fetcher
154                    .fetch_random_transaction(Some(checkpoint_start), Some(checkpoint_end))
155                    .await?;
156                Some(tx)
157            }
158        };
159
160        Ok(tx)
161    }
162}