sui_rpc_loadgen/payload/
query_transactions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5
6use crate::payload::validation::cross_validate_entities;
7use crate::payload::{
8    AddressQueryType, ProcessPayload, QueryTransactionBlocks, RpcCommandProcessor, SignerInfo,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use sui_json_rpc_types::{
13    Page, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
14    SuiTransactionBlockResponseQuery, TransactionBlocksPage, TransactionFilter,
15};
16use sui_sdk::SuiClient;
17use sui_types::base_types::TransactionDigest;
18use tracing::log::warn;
19
20#[async_trait]
21impl<'a> ProcessPayload<'a, &'a QueryTransactionBlocks> for RpcCommandProcessor {
22    async fn process(
23        &'a self,
24        op: &'a QueryTransactionBlocks,
25        _signer_info: &Option<SignerInfo>,
26    ) -> Result<()> {
27        let clients = self.get_clients().await?;
28        let address_type = &op.address_type;
29        if op.addresses.is_empty() {
30            warn!("No addresses provided, skipping query");
31            return Ok(());
32        }
33        let filters = {
34            let mut from: Vec<Option<TransactionFilter>> = op
35                .addresses
36                .iter()
37                .map(|address| Some(TransactionFilter::FromAddress(*address)))
38                .collect();
39
40            let mut to = op
41                .addresses
42                .iter()
43                .map(|address| Some(TransactionFilter::ToAddress(*address)))
44                .collect();
45
46            match address_type {
47                AddressQueryType::From => from,
48                AddressQueryType::To => to,
49                AddressQueryType::Both => from.drain(..).chain(to.drain(..)).collect(),
50            }
51        };
52
53        let queries: Vec<SuiTransactionBlockResponseQuery> = filters
54            .into_iter()
55            .map(|filter| SuiTransactionBlockResponseQuery {
56                filter,
57                options: Some(SuiTransactionBlockResponseOptions::full_content()),
58            })
59            .collect();
60
61        // todo: can map this
62        for query in queries {
63            let mut results: Vec<TransactionBlocksPage> = Vec::new();
64
65            // Paginate results, if any
66            while results.is_empty() || results.iter().any(|r| r.has_next_page) {
67                let cursor = if results.is_empty() {
68                    None
69                } else {
70                    match (
71                        results.first().unwrap().next_cursor,
72                        results.get(1).unwrap().next_cursor,
73                    ) {
74                        (Some(first_cursor), Some(second_cursor)) => {
75                            if first_cursor != second_cursor {
76                                warn!(
77                                    "Cursors are not the same, received {} vs {}. Selecting the first cursor to continue",
78                                    first_cursor, second_cursor
79                                );
80                            }
81                            Some(first_cursor)
82                        }
83                        (Some(cursor), None) | (None, Some(cursor)) => Some(cursor),
84                        (None, None) => None,
85                    }
86                };
87
88                results = join_all(clients.iter().map(|client| {
89                    let with_query = query.clone();
90                    async move {
91                        query_transaction_blocks(client, with_query, cursor, None)
92                            .await
93                            .unwrap()
94                    }
95                }))
96                .await;
97
98                let transactions: Vec<Vec<SuiTransactionBlockResponse>> =
99                    results.iter().map(|page| page.data.clone()).collect();
100                cross_validate_entities(&transactions, "Transactions");
101            }
102        }
103        Ok(())
104    }
105}
106
107async fn query_transaction_blocks(
108    client: &SuiClient,
109    query: SuiTransactionBlockResponseQuery,
110    cursor: Option<TransactionDigest>,
111    limit: Option<usize>, // TODO: we should probably set a limit and paginate
112) -> Result<Page<SuiTransactionBlockResponse, TransactionDigest>> {
113    let transactions = client
114        .read_api()
115        .query_transaction_blocks(query, cursor, limit, true)
116        .await
117        .unwrap();
118    Ok(transactions)
119}