sui_synthetic_ingestion/
synthetic_ingestion.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use simulacrum::Simulacrum;
5use std::collections::BTreeMap;
6use std::path::PathBuf;
7use sui_storage::blob::Blob;
8use sui_test_transaction_builder::TestTransactionBuilder;
9use sui_types::crypto::get_account_key_pair;
10use sui_types::effects::TransactionEffectsAPI;
11use sui_types::full_checkpoint_content::CheckpointData;
12use sui_types::gas_coin::MIST_PER_SUI;
13use sui_types::utils::to_sender_signed_transaction;
14use tokio::fs;
15use tracing::info;
16
17#[derive(clap::Parser, Debug, Clone)]
18pub struct Config {
19    /// Directory to write the ingestion data to.
20    #[clap(long)]
21    pub ingestion_dir: PathBuf,
22    /// Customize the first checkpoint sequence number in the workload.
23    /// This is useful if we want to generate workload to benchmark a non-empty database.
24    #[clap(long, default_value_t = 0)]
25    pub starting_checkpoint: u64,
26    /// Total number of synthetic checkpoints to generate.
27    #[clap(long, default_value_t = 2000)]
28    pub num_checkpoints: u64,
29    /// Number of transactions in a checkpoint.
30    #[clap(long, default_value_t = 200)]
31    pub checkpoint_size: u64,
32}
33
34// TODO: Simulacrum does serial execution which could be slow if
35// we need to generate a large number of transactions.
36// We may want to make Simulacrum support parallel execution.
37
38pub async fn generate_ingestion(config: Config) {
39    info!("Generating synthetic ingestion data. config: {:?}", config);
40    let timer = std::time::Instant::now();
41    let mut sim = Simulacrum::new();
42    let Config {
43        ingestion_dir,
44        checkpoint_size,
45        num_checkpoints,
46        starting_checkpoint,
47    } = config;
48    sim.set_data_ingestion_path(ingestion_dir.clone());
49    // Simulacrum will generate 0.chk as the genesis checkpoint.
50    // We do not need it and might even override if starting_checkpoint is 0.
51    fs::remove_file(ingestion_dir.join("0.chk")).await.unwrap();
52
53    let gas_price = sim.reference_gas_price();
54    let (sender, keypair) = get_account_key_pair();
55    let mut gas_object = {
56        let effects = sim.request_gas(sender, MIST_PER_SUI * 1000000).unwrap();
57        // `request_gas` will create a transaction, which we don't want to include in the benchmark.
58        // Put it in a checkpoint and then remove the checkpoint file.
59        sim.create_checkpoint();
60        fs::remove_file(ingestion_dir.join("1.chk")).await.unwrap();
61        effects.created()[0].0
62    };
63    sim.override_next_checkpoint_number(starting_checkpoint);
64
65    let mut tx_count = 0;
66    for i in 0..num_checkpoints {
67        for _ in 0..checkpoint_size {
68            let tx_data = TestTransactionBuilder::new(sender, gas_object, gas_price)
69                .transfer_sui(Some(1), sender)
70                .build();
71            let tx = to_sender_signed_transaction(tx_data, &keypair);
72            let (effects, _) = sim.execute_transaction(tx).unwrap();
73            gas_object = effects.gas_object().0;
74            tx_count += 1;
75        }
76        let checkpoint = sim.create_checkpoint();
77        assert_eq!(checkpoint.sequence_number, i + starting_checkpoint);
78        if (i + 1) % 100 == 0 {
79            info!("Generated {} checkpoints, {} transactions", i + 1, tx_count);
80        }
81    }
82    info!(
83        "Generated {} transactions in {} checkpoints. Total time: {:?}",
84        tx_count,
85        num_checkpoints,
86        timer.elapsed()
87    );
88}
89
90pub async fn read_ingestion_data(path: &PathBuf) -> anyhow::Result<BTreeMap<u64, CheckpointData>> {
91    let mut data = BTreeMap::new();
92    let mut dir = fs::read_dir(path).await?;
93    while let Some(entry) = dir.next_entry().await? {
94        let path = entry.path();
95        let bytes = fs::read(path).await?;
96        let checkpoint_data: CheckpointData = Blob::from_bytes(&bytes)?;
97        data.insert(
98            checkpoint_data.checkpoint_summary.sequence_number,
99            checkpoint_data,
100        );
101    }
102    Ok(data)
103}
104
105#[cfg(test)]
106mod tests {
107    use crate::synthetic_ingestion::generate_ingestion;
108    use std::path::PathBuf;
109    use sui_storage::blob::Blob;
110    use sui_types::full_checkpoint_content::CheckpointData;
111
112    #[tokio::test]
113    async fn test_ingestion_from_zero() {
114        let ingestion_dir = tempfile::tempdir().unwrap().keep();
115        let config = super::Config {
116            ingestion_dir: ingestion_dir.clone(),
117            starting_checkpoint: 0,
118            num_checkpoints: 10,
119            checkpoint_size: 2,
120        };
121        generate_ingestion(config).await;
122        check_checkpoint_data(ingestion_dir, 0, 10, 2).await;
123    }
124
125    #[tokio::test]
126    async fn test_ingestion_from_non_zero() {
127        let ingestion_dir = tempfile::tempdir().unwrap().keep();
128        let config = super::Config {
129            ingestion_dir: ingestion_dir.clone(),
130            starting_checkpoint: 10,
131            num_checkpoints: 10,
132            checkpoint_size: 2,
133        };
134        generate_ingestion(config).await;
135        check_checkpoint_data(ingestion_dir, 10, 10, 2).await;
136    }
137
138    async fn check_checkpoint_data(
139        ingestion_dir: PathBuf,
140        first_checkpoint: u64,
141        num_checkpoints: u64,
142        checkpoint_size: u64,
143    ) {
144        for checkpoint in first_checkpoint..first_checkpoint + num_checkpoints {
145            let path = ingestion_dir.join(format!("{}.chk", checkpoint));
146            let bytes = tokio::fs::read(&path).await.unwrap();
147            let checkpoint_data: CheckpointData = Blob::from_bytes(&bytes).unwrap();
148
149            assert_eq!(
150                checkpoint_data.checkpoint_summary.sequence_number,
151                checkpoint
152            );
153            // there is one additional transaction (the settlement transaction)
154            assert_eq!(
155                checkpoint_data.transactions.len(),
156                1 + checkpoint_size as usize
157            );
158        }
159    }
160}