sui_rpc_loadgen/payload/
get_checkpoints.rs1use crate::payload::validation::check_transactions;
5use crate::payload::{GetCheckpoints, ProcessPayload, RpcCommandProcessor, SignerInfo};
6use anyhow::Result;
7use async_trait::async_trait;
8use dashmap::DashSet;
9use futures::future::join_all;
10use itertools::Itertools;
11use std::sync::Arc;
12
13use crate::payload::checkpoint_utils::get_latest_checkpoint_stats;
14use sui_json_rpc_types::CheckpointId;
15use sui_types::base_types::TransactionDigest;
16use tokio::sync::Mutex;
17use tracing::log::warn;
18use tracing::{debug, error, info};
19
20#[async_trait]
21impl<'a> ProcessPayload<'a, &'a GetCheckpoints> for RpcCommandProcessor {
22 async fn process(
23 &'a self,
24 op: &'a GetCheckpoints,
25 _signer_info: &Option<SignerInfo>,
26 ) -> Result<()> {
27 let clients = self.get_clients().await?;
28
29 let checkpoint_stats = get_latest_checkpoint_stats(&clients, op.end).await;
30 let max_checkpoint = checkpoint_stats.max_latest_checkpoint();
31 debug!("GetCheckpoints({}, {:?})", op.start, max_checkpoint,);
32
33 let cross_validate = true;
35
36 for seq in op.start..=max_checkpoint {
37 let transaction_digests: Arc<Mutex<DashSet<TransactionDigest>>> =
38 Arc::new(Mutex::new(DashSet::new()));
39 let checkpoints = join_all(clients.iter().enumerate().map(|(i, client)| {
40 let transaction_digests = transaction_digests.clone();
41 let end_checkpoint_for_clients = checkpoint_stats.latest_checkpoints.clone();
42 async move {
43 if end_checkpoint_for_clients[i] < seq {
44 warn!(
46 "The RPC server corresponding to the {i}th url has a outdated checkpoint number {}.\
47 The latest checkpoint number is {seq}",
48 end_checkpoint_for_clients[i]
49 );
50 return None;
51 }
52
53 match client
54 .read_api()
55 .get_checkpoint(CheckpointId::SequenceNumber(seq))
56 .await {
57 Ok(t) => {
58 if t.sequence_number != seq {
59 error!("The RPC server corresponding to the {i}th url has unexpected checkpoint sequence number {}, expected {seq}", t.sequence_number,);
60 }
61 for digest in t.transactions.iter() {
62 transaction_digests.lock().await.insert(*digest);
63 }
64 Some(t)
65 },
66 Err(err) => {
67 error!("Failed to fetch checkpoint {seq} on the {i}th url: {err}");
68 None
69 }
70 }
71 }
72 }))
73 .await;
74
75 let transaction_digests = transaction_digests
76 .lock()
77 .await
78 .iter()
79 .map(|digest| *digest)
80 .collect::<Vec<_>>();
81
82 if op.verify_transactions {
83 let transaction_responses = check_transactions(
84 &clients,
85 &transaction_digests,
86 cross_validate,
87 op.verify_objects,
88 )
89 .await
90 .into_iter()
91 .concat();
92
93 if op.record {
94 debug!("adding addresses and object ids from response");
95 self.add_addresses_from_response(&transaction_responses);
96 self.add_object_ids_from_response(&transaction_responses);
97 };
98 }
99
100 if op.record {
101 debug!("adding transaction digests from response");
102 self.add_transaction_digests(transaction_digests);
103 };
104
105 if cross_validate {
106 let valid_checkpoint = checkpoints.iter().enumerate().find_map(|(i, x)| {
107 if x.is_some() {
108 Some((i, x.clone().unwrap()))
109 } else {
110 None
111 }
112 });
113
114 if valid_checkpoint.is_none() {
115 error!("none of the urls are returning valid checkpoint for seq {seq}");
116 continue;
117 }
118 let (valid_checkpoint_idx, valid_checkpoint) = valid_checkpoint.unwrap();
120 for (i, x) in checkpoints.iter().enumerate() {
121 if i == valid_checkpoint_idx {
122 continue;
123 }
124 let eq = x.is_none() || x.as_ref().unwrap() == &valid_checkpoint;
126 if !eq {
127 error!(
128 "getCheckpoint {seq} has a different result between the {valid_checkpoint_idx}th and {i}th URL {:?} {:?}",
129 x, checkpoints[valid_checkpoint_idx]
130 )
131 }
132 }
133 }
134
135 if seq % 10000 == 0 {
136 info!("Finished processing checkpoint {seq}");
137 }
138 }
139
140 Ok(())
141 }
142}