sui_synthetic_ingestion/
synthetic_ingestion.rs1use prost::Message;
5use simulacrum::Simulacrum;
6use std::collections::BTreeMap;
7use std::path::PathBuf;
8use sui_rpc::proto::sui::rpc::v2::Checkpoint as ProtoCheckpoint;
9use sui_test_transaction_builder::TestTransactionBuilder;
10use sui_types::crypto::get_account_key_pair;
11use sui_types::effects::TransactionEffectsAPI;
12use sui_types::full_checkpoint_content::Checkpoint;
13use sui_types::full_checkpoint_content::CheckpointData;
14use sui_types::gas_coin::MIST_PER_SUI;
15use sui_types::utils::to_sender_signed_transaction;
16use tokio::fs;
17use tracing::info;
18
19#[derive(clap::Parser, Debug, Clone)]
20pub struct Config {
21 #[clap(long)]
23 pub ingestion_dir: PathBuf,
24 #[clap(long, default_value_t = 0)]
27 pub starting_checkpoint: u64,
28 #[clap(long, default_value_t = 2000)]
30 pub num_checkpoints: u64,
31 #[clap(long, default_value_t = 200)]
33 pub checkpoint_size: u64,
34}
35
36pub async fn generate_ingestion(config: Config) {
41 info!("Generating synthetic ingestion data. config: {:?}", config);
42 let timer = std::time::Instant::now();
43 let mut sim = Simulacrum::new();
44 let Config {
45 ingestion_dir,
46 checkpoint_size,
47 num_checkpoints,
48 starting_checkpoint,
49 } = config;
50 sim.set_data_ingestion_path(ingestion_dir.clone());
51 fs::remove_file(ingestion_dir.join("0.binpb.zst"))
54 .await
55 .unwrap();
56
57 let gas_price = sim.reference_gas_price();
58 let (sender, keypair) = get_account_key_pair();
59 let mut gas_object = {
60 let effects = sim.request_gas(sender, MIST_PER_SUI * 1000000).unwrap();
61 sim.create_checkpoint();
64 fs::remove_file(ingestion_dir.join("1.binpb.zst"))
65 .await
66 .unwrap();
67 effects.created()[0].0
68 };
69 sim.override_next_checkpoint_number(starting_checkpoint);
70
71 let mut tx_count = 0;
72 for i in 0..num_checkpoints {
73 for _ in 0..checkpoint_size {
74 let tx_data = TestTransactionBuilder::new(sender, gas_object, gas_price)
75 .transfer_sui(Some(1), sender)
76 .build();
77 let tx = to_sender_signed_transaction(tx_data, &keypair);
78 let (effects, _) = sim.execute_transaction(tx).unwrap();
79 gas_object = effects.gas_object().0;
80 tx_count += 1;
81 }
82 let checkpoint = sim.create_checkpoint();
83 assert_eq!(checkpoint.sequence_number, i + starting_checkpoint);
84 if (i + 1) % 100 == 0 {
85 info!("Generated {} checkpoints, {} transactions", i + 1, tx_count);
86 }
87 }
88 info!(
89 "Generated {} transactions in {} checkpoints. Total time: {:?}",
90 tx_count,
91 num_checkpoints,
92 timer.elapsed()
93 );
94}
95
96pub async fn read_ingestion_data(path: &PathBuf) -> anyhow::Result<BTreeMap<u64, CheckpointData>> {
97 let mut data = BTreeMap::new();
98 let mut dir = fs::read_dir(path).await?;
99 while let Some(entry) = dir.next_entry().await? {
100 let path = entry.path();
101 let bytes = fs::read(path).await?;
102 let decompressed = zstd::decode_all(&bytes[..])?;
103 let proto_checkpoint = ProtoCheckpoint::decode(&decompressed[..])?;
104 let checkpoint: Checkpoint = (&proto_checkpoint).try_into()?;
105 let checkpoint_data: CheckpointData = checkpoint.into();
106 data.insert(
107 checkpoint_data.checkpoint_summary.sequence_number,
108 checkpoint_data,
109 );
110 }
111 Ok(data)
112}
113
114#[cfg(test)]
115mod tests {
116 use crate::synthetic_ingestion::generate_ingestion;
117 use prost::Message;
118 use std::path::PathBuf;
119 use sui_rpc::proto::sui::rpc::v2::Checkpoint as ProtoCheckpoint;
120 use sui_types::full_checkpoint_content::Checkpoint;
121 use sui_types::full_checkpoint_content::CheckpointData;
122
123 #[tokio::test]
124 async fn test_ingestion_from_zero() {
125 let ingestion_dir = tempfile::tempdir().unwrap().keep();
126 let config = super::Config {
127 ingestion_dir: ingestion_dir.clone(),
128 starting_checkpoint: 0,
129 num_checkpoints: 10,
130 checkpoint_size: 2,
131 };
132 generate_ingestion(config).await;
133 check_checkpoint_data(ingestion_dir, 0, 10, 2).await;
134 }
135
136 #[tokio::test]
137 async fn test_ingestion_from_non_zero() {
138 let ingestion_dir = tempfile::tempdir().unwrap().keep();
139 let config = super::Config {
140 ingestion_dir: ingestion_dir.clone(),
141 starting_checkpoint: 10,
142 num_checkpoints: 10,
143 checkpoint_size: 2,
144 };
145 generate_ingestion(config).await;
146 check_checkpoint_data(ingestion_dir, 10, 10, 2).await;
147 }
148
149 async fn check_checkpoint_data(
150 ingestion_dir: PathBuf,
151 first_checkpoint: u64,
152 num_checkpoints: u64,
153 checkpoint_size: u64,
154 ) {
155 for checkpoint_num in first_checkpoint..first_checkpoint + num_checkpoints {
156 let path = ingestion_dir.join(format!("{}.binpb.zst", checkpoint_num));
157 let bytes = tokio::fs::read(&path).await.unwrap();
158 let decompressed = zstd::decode_all(&bytes[..]).unwrap();
159 let proto_checkpoint = ProtoCheckpoint::decode(&decompressed[..]).unwrap();
160 let checkpoint: Checkpoint = (&proto_checkpoint).try_into().unwrap();
161 let checkpoint_data: CheckpointData = checkpoint.into();
162
163 assert_eq!(
164 checkpoint_data.checkpoint_summary.sequence_number,
165 checkpoint_num
166 );
167 assert_eq!(
169 checkpoint_data.transactions.len(),
170 1 + checkpoint_size as usize
171 );
172 }
173 }
174}