sui_rpc_loadgen/payload/
validation.rs1use futures::future::join_all;
5use itertools::Itertools;
6use std::collections::HashSet;
7use std::fmt::Debug;
8use sui_json_rpc_types::{
9 SuiObjectDataOptions, SuiObjectResponse, SuiTransactionBlockEffectsAPI,
10 SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
11};
12use sui_sdk::SuiClient;
13use sui_types::base_types::{ObjectID, TransactionDigest};
14use tracing::error;
15use tracing::log::warn;
16
17const LOADGEN_QUERY_MAX_RESULT_LIMIT: usize = 25;
18
19pub(crate) fn cross_validate_entities<U>(entities: &[Vec<U>], entity_name: &str)
20where
21 U: PartialEq + Debug,
22{
23 if entities.len() < 2 {
24 return;
25 }
26
27 let length = entities[0].len();
28 if let Some((vec_index, v)) = entities.iter().enumerate().find(|(_, v)| v.len() != length) {
29 error!(
30 "Entity: {} lengths do not match at index {}: first vec has length {} vs vec {} has length {}",
31 entity_name,
32 vec_index,
33 length,
34 vec_index,
35 v.len()
36 );
37 return;
38 }
39
40 for i in 0..length {
42 let mut iter = entities.iter().map(|v| &v[i]);
44
45 if let Some(first) = iter.next() {
47 for (j, other) in iter.enumerate() {
48 if first != other {
49 error!(
51 "Entity: {} mismatch at index {}: expected: {:?}, received {}: {:?}",
52 entity_name,
53 i,
54 first,
55 format!("{}[{}]", entity_name, j + 1),
56 other
57 );
58 }
59 }
60 }
61 }
62}
63
64pub(crate) async fn check_transactions(
65 clients: &[SuiClient],
66 digests: &[TransactionDigest],
67 cross_validate: bool,
68 verify_objects: bool,
69) -> Vec<Vec<SuiTransactionBlockResponse>> {
70 let transactions: Vec<Vec<SuiTransactionBlockResponse>> =
71 join_all(clients.iter().map(|client| async move {
72 client
73 .read_api()
74 .multi_get_transactions_with_options(
75 digests.to_vec(),
76 SuiTransactionBlockResponseOptions::full_content(), )
78 .await
79 }))
80 .await
81 .into_iter()
82 .enumerate()
83 .filter_map(|(i, result)| match result {
84 Ok(transactions) => Some(transactions),
85 Err(err) => {
86 warn!(
87 "Failed to fetch transactions for vec {i}: {:?}. Logging digests, {:?}",
88 err, digests
89 );
90 None
91 }
92 })
93 .collect();
94
95 if cross_validate {
96 cross_validate_entities(&transactions, "Transactions");
97 }
98
99 if verify_objects {
100 let object_ids = transactions
101 .iter()
102 .flatten()
103 .flat_map(get_all_object_ids)
104 .collect::<HashSet<_>>()
105 .into_iter()
106 .collect::<Vec<_>>();
107
108 check_objects(clients, &object_ids, cross_validate).await;
109 }
110 transactions
111}
112
113pub(crate) fn get_all_object_ids(response: &SuiTransactionBlockResponse) -> Vec<ObjectID> {
114 let objects = match response.effects.as_ref() {
115 Some(effects) => effects.all_changed_objects(),
117 None => {
118 error!(
119 "Effects for transaction digest {} should not be empty",
120 response.digest
121 );
122 vec![]
123 }
124 };
125 objects
126 .iter()
127 .map(|(owned_object_ref, _)| owned_object_ref.object_id())
128 .collect::<Vec<_>>()
129}
130
131pub(crate) fn chunk_entities<U>(entities: &[U], chunk_size: Option<usize>) -> Vec<Vec<U>>
132where
133 U: Clone + PartialEq + Debug,
134{
135 let chunk_size = chunk_size.unwrap_or(LOADGEN_QUERY_MAX_RESULT_LIMIT);
136 entities
137 .iter()
138 .cloned()
139 .chunks(chunk_size)
140 .into_iter()
141 .map(|chunk| chunk.collect())
142 .collect()
143}
144
145pub(crate) async fn check_objects(
146 clients: &[SuiClient],
147 object_ids: &[ObjectID],
148 cross_validate: bool,
149) {
150 let chunks = chunk_entities(object_ids, None);
151 let results = join_all(chunks.iter().map(|chunk| multi_get_object(clients, chunk))).await;
152
153 if cross_validate {
154 for result in results {
155 cross_validate_entities(&result, "Objects");
156 }
157 }
158}
159
160pub(crate) async fn multi_get_object(
161 clients: &[SuiClient],
162 object_ids: &[ObjectID],
163) -> Vec<Vec<SuiObjectResponse>> {
164 let objects: Vec<Vec<SuiObjectResponse>> = join_all(clients.iter().map(|client| async move {
165 let object_ids = if object_ids.len() > LOADGEN_QUERY_MAX_RESULT_LIMIT {
166 warn!(
167 "The input size for multi_get_object_with_options has exceed the query limit\
168 {LOADGEN_QUERY_MAX_RESULT_LIMIT}: {}, time to implement chunking",
169 object_ids.len()
170 );
171 &object_ids[0..LOADGEN_QUERY_MAX_RESULT_LIMIT]
172 } else {
173 object_ids
174 };
175
176 client
177 .read_api()
178 .multi_get_object_with_options(
179 object_ids.to_vec(),
180 SuiObjectDataOptions::full_content(), )
182 .await
183 }))
184 .await
185 .into_iter()
186 .enumerate()
187 .filter_map(|(i, result)| match result {
188 Ok(obj_vec) => Some(obj_vec),
189 Err(err) => {
190 error!(
191 "Failed to fetch objects for vec {i}: {:?}. Logging objectIDs, {:?}",
192 err, object_ids
193 );
194 None
195 }
196 })
197 .collect();
198 objects
199}