sui_synthetic_ingestion/
synthetic_ingestion.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use prost::Message;
5use simulacrum::Simulacrum;
6use std::collections::BTreeMap;
7use std::path::PathBuf;
8use sui_rpc::proto::sui::rpc::v2::Checkpoint as ProtoCheckpoint;
9use sui_test_transaction_builder::TestTransactionBuilder;
10use sui_types::crypto::get_account_key_pair;
11use sui_types::effects::TransactionEffectsAPI;
12use sui_types::full_checkpoint_content::Checkpoint;
13use sui_types::full_checkpoint_content::CheckpointData;
14use sui_types::gas_coin::MIST_PER_SUI;
15use sui_types::utils::to_sender_signed_transaction;
16use tokio::fs;
17use tracing::info;
18
19#[derive(clap::Parser, Debug, Clone)]
20pub struct Config {
21    /// Directory to write the ingestion data to.
22    #[clap(long)]
23    pub ingestion_dir: PathBuf,
24    /// Customize the first checkpoint sequence number in the workload.
25    /// This is useful if we want to generate workload to benchmark a non-empty database.
26    #[clap(long, default_value_t = 0)]
27    pub starting_checkpoint: u64,
28    /// Total number of synthetic checkpoints to generate.
29    #[clap(long, default_value_t = 2000)]
30    pub num_checkpoints: u64,
31    /// Number of transactions in a checkpoint.
32    #[clap(long, default_value_t = 200)]
33    pub checkpoint_size: u64,
34}
35
36// TODO: Simulacrum does serial execution which could be slow if
37// we need to generate a large number of transactions.
38// We may want to make Simulacrum support parallel execution.
39
40pub async fn generate_ingestion(config: Config) {
41    info!("Generating synthetic ingestion data. config: {:?}", config);
42    let timer = std::time::Instant::now();
43    let mut sim = Simulacrum::new();
44    let Config {
45        ingestion_dir,
46        checkpoint_size,
47        num_checkpoints,
48        starting_checkpoint,
49    } = config;
50    sim.set_data_ingestion_path(ingestion_dir.clone());
51    // Simulacrum will generate 0.binpb.zst as the genesis checkpoint.
52    // We do not need it and might even override if starting_checkpoint is 0.
53    fs::remove_file(ingestion_dir.join("0.binpb.zst"))
54        .await
55        .unwrap();
56
57    let gas_price = sim.reference_gas_price();
58    let (sender, keypair) = get_account_key_pair();
59    let mut gas_object = {
60        let effects = sim.request_gas(sender, MIST_PER_SUI * 1000000).unwrap();
61        // `request_gas` will create a transaction, which we don't want to include in the benchmark.
62        // Put it in a checkpoint and then remove the checkpoint file.
63        sim.create_checkpoint();
64        fs::remove_file(ingestion_dir.join("1.binpb.zst"))
65            .await
66            .unwrap();
67        effects.created()[0].0
68    };
69    sim.override_next_checkpoint_number(starting_checkpoint);
70
71    let mut tx_count = 0;
72    for i in 0..num_checkpoints {
73        for _ in 0..checkpoint_size {
74            let tx_data = TestTransactionBuilder::new(sender, gas_object, gas_price)
75                .transfer_sui(Some(1), sender)
76                .build();
77            let tx = to_sender_signed_transaction(tx_data, &keypair);
78            let (effects, _) = sim.execute_transaction(tx).unwrap();
79            gas_object = effects.gas_object().0;
80            tx_count += 1;
81        }
82        let checkpoint = sim.create_checkpoint();
83        assert_eq!(checkpoint.sequence_number, i + starting_checkpoint);
84        if (i + 1) % 100 == 0 {
85            info!("Generated {} checkpoints, {} transactions", i + 1, tx_count);
86        }
87    }
88    info!(
89        "Generated {} transactions in {} checkpoints. Total time: {:?}",
90        tx_count,
91        num_checkpoints,
92        timer.elapsed()
93    );
94}
95
96pub async fn read_ingestion_data(path: &PathBuf) -> anyhow::Result<BTreeMap<u64, CheckpointData>> {
97    let mut data = BTreeMap::new();
98    let mut dir = fs::read_dir(path).await?;
99    while let Some(entry) = dir.next_entry().await? {
100        let path = entry.path();
101        let bytes = fs::read(path).await?;
102        let decompressed = zstd::decode_all(&bytes[..])?;
103        let proto_checkpoint = ProtoCheckpoint::decode(&decompressed[..])?;
104        let checkpoint: Checkpoint = (&proto_checkpoint).try_into()?;
105        let checkpoint_data: CheckpointData = checkpoint.into();
106        data.insert(
107            checkpoint_data.checkpoint_summary.sequence_number,
108            checkpoint_data,
109        );
110    }
111    Ok(data)
112}
113
114#[cfg(test)]
115mod tests {
116    use crate::synthetic_ingestion::generate_ingestion;
117    use prost::Message;
118    use std::path::PathBuf;
119    use sui_rpc::proto::sui::rpc::v2::Checkpoint as ProtoCheckpoint;
120    use sui_types::full_checkpoint_content::Checkpoint;
121    use sui_types::full_checkpoint_content::CheckpointData;
122
123    #[tokio::test]
124    async fn test_ingestion_from_zero() {
125        let ingestion_dir = tempfile::tempdir().unwrap().keep();
126        let config = super::Config {
127            ingestion_dir: ingestion_dir.clone(),
128            starting_checkpoint: 0,
129            num_checkpoints: 10,
130            checkpoint_size: 2,
131        };
132        generate_ingestion(config).await;
133        check_checkpoint_data(ingestion_dir, 0, 10, 2).await;
134    }
135
136    #[tokio::test]
137    async fn test_ingestion_from_non_zero() {
138        let ingestion_dir = tempfile::tempdir().unwrap().keep();
139        let config = super::Config {
140            ingestion_dir: ingestion_dir.clone(),
141            starting_checkpoint: 10,
142            num_checkpoints: 10,
143            checkpoint_size: 2,
144        };
145        generate_ingestion(config).await;
146        check_checkpoint_data(ingestion_dir, 10, 10, 2).await;
147    }
148
149    async fn check_checkpoint_data(
150        ingestion_dir: PathBuf,
151        first_checkpoint: u64,
152        num_checkpoints: u64,
153        checkpoint_size: u64,
154    ) {
155        for checkpoint_num in first_checkpoint..first_checkpoint + num_checkpoints {
156            let path = ingestion_dir.join(format!("{}.binpb.zst", checkpoint_num));
157            let bytes = tokio::fs::read(&path).await.unwrap();
158            let decompressed = zstd::decode_all(&bytes[..]).unwrap();
159            let proto_checkpoint = ProtoCheckpoint::decode(&decompressed[..]).unwrap();
160            let checkpoint: Checkpoint = (&proto_checkpoint).try_into().unwrap();
161            let checkpoint_data: CheckpointData = checkpoint.into();
162
163            assert_eq!(
164                checkpoint_data.checkpoint_summary.sequence_number,
165                checkpoint_num
166            );
167            // there is one additional transaction (the settlement transaction)
168            assert_eq!(
169                checkpoint_data.transactions.len(),
170                1 + checkpoint_size as usize
171            );
172        }
173    }
174}