sui_rpc_loadgen/payload/
get_checkpoints.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::payload::validation::check_transactions;
5use crate::payload::{GetCheckpoints, ProcessPayload, RpcCommandProcessor, SignerInfo};
6use anyhow::Result;
7use async_trait::async_trait;
8use dashmap::DashSet;
9use futures::future::join_all;
10use itertools::Itertools;
11use std::sync::Arc;
12
13use crate::payload::checkpoint_utils::get_latest_checkpoint_stats;
14use sui_json_rpc_types::CheckpointId;
15use sui_types::base_types::TransactionDigest;
16use tokio::sync::Mutex;
17use tracing::log::warn;
18use tracing::{debug, error, info};
19
20#[async_trait]
21impl<'a> ProcessPayload<'a, &'a GetCheckpoints> for RpcCommandProcessor {
22    async fn process(
23        &'a self,
24        op: &'a GetCheckpoints,
25        _signer_info: &Option<SignerInfo>,
26    ) -> Result<()> {
27        let clients = self.get_clients().await?;
28
29        let checkpoint_stats = get_latest_checkpoint_stats(&clients, op.end).await;
30        let max_checkpoint = checkpoint_stats.max_latest_checkpoint();
31        debug!("GetCheckpoints({}, {:?})", op.start, max_checkpoint,);
32
33        // TODO(chris): read `cross_validate` from config
34        let cross_validate = true;
35
36        for seq in op.start..=max_checkpoint {
37            let transaction_digests: Arc<Mutex<DashSet<TransactionDigest>>> =
38                Arc::new(Mutex::new(DashSet::new()));
39            let checkpoints = join_all(clients.iter().enumerate().map(|(i, client)| {
40                let transaction_digests = transaction_digests.clone();
41                let end_checkpoint_for_clients = checkpoint_stats.latest_checkpoints.clone();
42                async move {
43                    if end_checkpoint_for_clients[i] < seq {
44                        // TODO(chris) log actual url
45                        warn!(
46                            "The RPC server corresponding to the {i}th url has a outdated checkpoint number {}.\
47                            The latest checkpoint number is {seq}",
48                            end_checkpoint_for_clients[i]
49                        );
50                        return None;
51                    }
52
53                    match client
54                        .read_api()
55                        .get_checkpoint(CheckpointId::SequenceNumber(seq))
56                        .await {
57                        Ok(t) => {
58                            if t.sequence_number != seq {
59                                error!("The RPC server corresponding to the {i}th url has unexpected checkpoint sequence number {}, expected {seq}", t.sequence_number,);
60                            }
61                            for digest in t.transactions.iter() {
62                                transaction_digests.lock().await.insert(*digest);
63                            }
64                            Some(t)
65                        },
66                        Err(err) => {
67                            error!("Failed to fetch checkpoint {seq} on the {i}th url: {err}");
68                            None
69                        }
70                    }
71                }
72            }))
73                .await;
74
75            let transaction_digests = transaction_digests
76                .lock()
77                .await
78                .iter()
79                .map(|digest| *digest)
80                .collect::<Vec<_>>();
81
82            if op.verify_transactions {
83                let transaction_responses = check_transactions(
84                    &clients,
85                    &transaction_digests,
86                    cross_validate,
87                    op.verify_objects,
88                )
89                .await
90                .into_iter()
91                .concat();
92
93                if op.record {
94                    debug!("adding addresses and object ids from response");
95                    self.add_addresses_from_response(&transaction_responses);
96                    self.add_object_ids_from_response(&transaction_responses);
97                };
98            }
99
100            if op.record {
101                debug!("adding transaction digests from response");
102                self.add_transaction_digests(transaction_digests);
103            };
104
105            if cross_validate {
106                let valid_checkpoint = checkpoints.iter().enumerate().find_map(|(i, x)| {
107                    if x.is_some() {
108                        Some((i, x.clone().unwrap()))
109                    } else {
110                        None
111                    }
112                });
113
114                if valid_checkpoint.is_none() {
115                    error!("none of the urls are returning valid checkpoint for seq {seq}");
116                    continue;
117                }
118                // safe to unwrap because we check some above
119                let (valid_checkpoint_idx, valid_checkpoint) = valid_checkpoint.unwrap();
120                for (i, x) in checkpoints.iter().enumerate() {
121                    if i == valid_checkpoint_idx {
122                        continue;
123                    }
124                    // ignore the None value because it's warned above
125                    let eq = x.is_none() || x.as_ref().unwrap() == &valid_checkpoint;
126                    if !eq {
127                        error!(
128                            "getCheckpoint {seq} has a different result between the {valid_checkpoint_idx}th and {i}th URL {:?} {:?}",
129                            x, checkpoints[valid_checkpoint_idx]
130                        )
131                    }
132                }
133            }
134
135            if seq % 10000 == 0 {
136                info!("Finished processing checkpoint {seq}");
137            }
138        }
139
140        Ok(())
141    }
142}