sui_synthetic_ingestion/
synthetic_ingestion.rs1use simulacrum::Simulacrum;
5use std::collections::BTreeMap;
6use std::path::PathBuf;
7use sui_storage::blob::Blob;
8use sui_test_transaction_builder::TestTransactionBuilder;
9use sui_types::crypto::get_account_key_pair;
10use sui_types::effects::TransactionEffectsAPI;
11use sui_types::full_checkpoint_content::CheckpointData;
12use sui_types::gas_coin::MIST_PER_SUI;
13use sui_types::utils::to_sender_signed_transaction;
14use tokio::fs;
15use tracing::info;
16
17#[derive(clap::Parser, Debug, Clone)]
18pub struct Config {
19 #[clap(long)]
21 pub ingestion_dir: PathBuf,
22 #[clap(long, default_value_t = 0)]
25 pub starting_checkpoint: u64,
26 #[clap(long, default_value_t = 2000)]
28 pub num_checkpoints: u64,
29 #[clap(long, default_value_t = 200)]
31 pub checkpoint_size: u64,
32}
33
34pub async fn generate_ingestion(config: Config) {
39 info!("Generating synthetic ingestion data. config: {:?}", config);
40 let timer = std::time::Instant::now();
41 let mut sim = Simulacrum::new();
42 let Config {
43 ingestion_dir,
44 checkpoint_size,
45 num_checkpoints,
46 starting_checkpoint,
47 } = config;
48 sim.set_data_ingestion_path(ingestion_dir.clone());
49 fs::remove_file(ingestion_dir.join("0.chk")).await.unwrap();
52
53 let gas_price = sim.reference_gas_price();
54 let (sender, keypair) = get_account_key_pair();
55 let mut gas_object = {
56 let effects = sim.request_gas(sender, MIST_PER_SUI * 1000000).unwrap();
57 sim.create_checkpoint();
60 fs::remove_file(ingestion_dir.join("1.chk")).await.unwrap();
61 effects.created()[0].0
62 };
63 sim.override_next_checkpoint_number(starting_checkpoint);
64
65 let mut tx_count = 0;
66 for i in 0..num_checkpoints {
67 for _ in 0..checkpoint_size {
68 let tx_data = TestTransactionBuilder::new(sender, gas_object, gas_price)
69 .transfer_sui(Some(1), sender)
70 .build();
71 let tx = to_sender_signed_transaction(tx_data, &keypair);
72 let (effects, _) = sim.execute_transaction(tx).unwrap();
73 gas_object = effects.gas_object().0;
74 tx_count += 1;
75 }
76 let checkpoint = sim.create_checkpoint();
77 assert_eq!(checkpoint.sequence_number, i + starting_checkpoint);
78 if (i + 1) % 100 == 0 {
79 info!("Generated {} checkpoints, {} transactions", i + 1, tx_count);
80 }
81 }
82 info!(
83 "Generated {} transactions in {} checkpoints. Total time: {:?}",
84 tx_count,
85 num_checkpoints,
86 timer.elapsed()
87 );
88}
89
90pub async fn read_ingestion_data(path: &PathBuf) -> anyhow::Result<BTreeMap<u64, CheckpointData>> {
91 let mut data = BTreeMap::new();
92 let mut dir = fs::read_dir(path).await?;
93 while let Some(entry) = dir.next_entry().await? {
94 let path = entry.path();
95 let bytes = fs::read(path).await?;
96 let checkpoint_data: CheckpointData = Blob::from_bytes(&bytes)?;
97 data.insert(
98 checkpoint_data.checkpoint_summary.sequence_number,
99 checkpoint_data,
100 );
101 }
102 Ok(data)
103}
104
105#[cfg(test)]
106mod tests {
107 use crate::synthetic_ingestion::generate_ingestion;
108 use std::path::PathBuf;
109 use sui_storage::blob::Blob;
110 use sui_types::full_checkpoint_content::CheckpointData;
111
112 #[tokio::test]
113 async fn test_ingestion_from_zero() {
114 let ingestion_dir = tempfile::tempdir().unwrap().keep();
115 let config = super::Config {
116 ingestion_dir: ingestion_dir.clone(),
117 starting_checkpoint: 0,
118 num_checkpoints: 10,
119 checkpoint_size: 2,
120 };
121 generate_ingestion(config).await;
122 check_checkpoint_data(ingestion_dir, 0, 10, 2).await;
123 }
124
125 #[tokio::test]
126 async fn test_ingestion_from_non_zero() {
127 let ingestion_dir = tempfile::tempdir().unwrap().keep();
128 let config = super::Config {
129 ingestion_dir: ingestion_dir.clone(),
130 starting_checkpoint: 10,
131 num_checkpoints: 10,
132 checkpoint_size: 2,
133 };
134 generate_ingestion(config).await;
135 check_checkpoint_data(ingestion_dir, 10, 10, 2).await;
136 }
137
138 async fn check_checkpoint_data(
139 ingestion_dir: PathBuf,
140 first_checkpoint: u64,
141 num_checkpoints: u64,
142 checkpoint_size: u64,
143 ) {
144 for checkpoint in first_checkpoint..first_checkpoint + num_checkpoints {
145 let path = ingestion_dir.join(format!("{}.chk", checkpoint));
146 let bytes = tokio::fs::read(&path).await.unwrap();
147 let checkpoint_data: CheckpointData = Blob::from_bytes(&bytes).unwrap();
148
149 assert_eq!(
150 checkpoint_data.checkpoint_summary.sequence_number,
151 checkpoint
152 );
153 assert_eq!(
155 checkpoint_data.transactions.len(),
156 1 + checkpoint_size as usize
157 );
158 }
159 }
160}