sui_rpc_loadgen/payload/
validation.rs1use futures::future::join_all;
5use itertools::Itertools;
6use std::collections::HashSet;
7use std::fmt::Debug;
8use sui_json_rpc_types::{
9    SuiObjectDataOptions, SuiObjectResponse, SuiTransactionBlockEffectsAPI,
10    SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
11};
12use sui_sdk::SuiClient;
13use sui_types::base_types::{ObjectID, TransactionDigest};
14use tracing::error;
15use tracing::log::warn;
16
17const LOADGEN_QUERY_MAX_RESULT_LIMIT: usize = 25;
18
19pub(crate) fn cross_validate_entities<U>(entities: &[Vec<U>], entity_name: &str)
20where
21    U: PartialEq + Debug,
22{
23    if entities.len() < 2 {
24        return;
25    }
26
27    let length = entities[0].len();
28    if let Some((vec_index, v)) = entities.iter().enumerate().find(|(_, v)| v.len() != length) {
29        error!(
30            "Entity: {} lengths do not match at index {}: first vec has length {} vs vec {} has length {}",
31            entity_name,
32            vec_index,
33            length,
34            vec_index,
35            v.len()
36        );
37        return;
38    }
39
40    for i in 0..length {
42        let mut iter = entities.iter().map(|v| &v[i]);
44
45        if let Some(first) = iter.next() {
47            for (j, other) in iter.enumerate() {
48                if first != other {
49                    error!(
51                        "Entity: {} mismatch at index {}: expected: {:?}, received {}: {:?}",
52                        entity_name,
53                        i,
54                        first,
55                        format!("{}[{}]", entity_name, j + 1),
56                        other
57                    );
58                }
59            }
60        }
61    }
62}
63
64pub(crate) async fn check_transactions(
65    clients: &[SuiClient],
66    digests: &[TransactionDigest],
67    cross_validate: bool,
68    verify_objects: bool,
69) -> Vec<Vec<SuiTransactionBlockResponse>> {
70    let transactions: Vec<Vec<SuiTransactionBlockResponse>> =
71        join_all(clients.iter().map(|client| async move {
72            client
73                .read_api()
74                .multi_get_transactions_with_options(
75                    digests.to_vec(),
76                    SuiTransactionBlockResponseOptions::full_content(), )
78                .await
79        }))
80        .await
81        .into_iter()
82        .enumerate()
83        .filter_map(|(i, result)| match result {
84            Ok(transactions) => Some(transactions),
85            Err(err) => {
86                warn!(
87                    "Failed to fetch transactions for vec {i}: {:?}. Logging digests, {:?}",
88                    err, digests
89                );
90                None
91            }
92        })
93        .collect();
94
95    if cross_validate {
96        cross_validate_entities(&transactions, "Transactions");
97    }
98
99    if verify_objects {
100        let object_ids = transactions
101            .iter()
102            .flatten()
103            .flat_map(get_all_object_ids)
104            .collect::<HashSet<_>>()
105            .into_iter()
106            .collect::<Vec<_>>();
107
108        check_objects(clients, &object_ids, cross_validate).await;
109    }
110    transactions
111}
112
113pub(crate) fn get_all_object_ids(response: &SuiTransactionBlockResponse) -> Vec<ObjectID> {
114    let objects = match response.effects.as_ref() {
115        Some(effects) => effects.all_changed_objects(),
117        None => {
118            error!(
119                "Effects for transaction digest {} should not be empty",
120                response.digest
121            );
122            vec![]
123        }
124    };
125    objects
126        .iter()
127        .map(|(owned_object_ref, _)| owned_object_ref.object_id())
128        .collect::<Vec<_>>()
129}
130
131pub(crate) fn chunk_entities<U>(entities: &[U], chunk_size: Option<usize>) -> Vec<Vec<U>>
132where
133    U: Clone + PartialEq + Debug,
134{
135    let chunk_size = chunk_size.unwrap_or(LOADGEN_QUERY_MAX_RESULT_LIMIT);
136    entities
137        .iter()
138        .cloned()
139        .chunks(chunk_size)
140        .into_iter()
141        .map(|chunk| chunk.collect())
142        .collect()
143}
144
145pub(crate) async fn check_objects(
146    clients: &[SuiClient],
147    object_ids: &[ObjectID],
148    cross_validate: bool,
149) {
150    let chunks = chunk_entities(object_ids, None);
151    let results = join_all(chunks.iter().map(|chunk| multi_get_object(clients, chunk))).await;
152
153    if cross_validate {
154        for result in results {
155            cross_validate_entities(&result, "Objects");
156        }
157    }
158}
159
160pub(crate) async fn multi_get_object(
161    clients: &[SuiClient],
162    object_ids: &[ObjectID],
163) -> Vec<Vec<SuiObjectResponse>> {
164    let objects: Vec<Vec<SuiObjectResponse>> = join_all(clients.iter().map(|client| async move {
165        let object_ids = if object_ids.len() > LOADGEN_QUERY_MAX_RESULT_LIMIT {
166            warn!(
167                "The input size for multi_get_object_with_options has exceed the query limit\
168         {LOADGEN_QUERY_MAX_RESULT_LIMIT}: {}, time to implement chunking",
169                object_ids.len()
170            );
171            &object_ids[0..LOADGEN_QUERY_MAX_RESULT_LIMIT]
172        } else {
173            object_ids
174        };
175
176        client
177            .read_api()
178            .multi_get_object_with_options(
179                object_ids.to_vec(),
180                SuiObjectDataOptions::full_content(), )
182            .await
183    }))
184    .await
185    .into_iter()
186    .enumerate()
187    .filter_map(|(i, result)| match result {
188        Ok(obj_vec) => Some(obj_vec),
189        Err(err) => {
190            error!(
191                "Failed to fetch objects for vec {i}: {:?}. Logging objectIDs, {:?}",
192                err, object_ids
193            );
194            None
195        }
196    })
197    .collect();
198    objects
199}