sui_rpc_loadgen/payload/
query_transactions.rs1use anyhow::Result;
5
6use crate::payload::validation::cross_validate_entities;
7use crate::payload::{
8 AddressQueryType, ProcessPayload, QueryTransactionBlocks, RpcCommandProcessor, SignerInfo,
9};
10use async_trait::async_trait;
11use futures::future::join_all;
12use sui_json_rpc_types::{
13 Page, SuiTransactionBlockResponse, SuiTransactionBlockResponseOptions,
14 SuiTransactionBlockResponseQuery, TransactionBlocksPage, TransactionFilter,
15};
16use sui_sdk::SuiClient;
17use sui_types::base_types::TransactionDigest;
18use tracing::log::warn;
19
20#[async_trait]
21impl<'a> ProcessPayload<'a, &'a QueryTransactionBlocks> for RpcCommandProcessor {
22 async fn process(
23 &'a self,
24 op: &'a QueryTransactionBlocks,
25 _signer_info: &Option<SignerInfo>,
26 ) -> Result<()> {
27 let clients = self.get_clients().await?;
28 let address_type = &op.address_type;
29 if op.addresses.is_empty() {
30 warn!("No addresses provided, skipping query");
31 return Ok(());
32 }
33 let filters = {
34 let mut from: Vec<Option<TransactionFilter>> = op
35 .addresses
36 .iter()
37 .map(|address| Some(TransactionFilter::FromAddress(*address)))
38 .collect();
39
40 let mut to = op
41 .addresses
42 .iter()
43 .map(|address| Some(TransactionFilter::ToAddress(*address)))
44 .collect();
45
46 match address_type {
47 AddressQueryType::From => from,
48 AddressQueryType::To => to,
49 AddressQueryType::Both => from.drain(..).chain(to.drain(..)).collect(),
50 }
51 };
52
53 let queries: Vec<SuiTransactionBlockResponseQuery> = filters
54 .into_iter()
55 .map(|filter| SuiTransactionBlockResponseQuery {
56 filter,
57 options: Some(SuiTransactionBlockResponseOptions::full_content()),
58 })
59 .collect();
60
61 for query in queries {
63 let mut results: Vec<TransactionBlocksPage> = Vec::new();
64
65 while results.is_empty() || results.iter().any(|r| r.has_next_page) {
67 let cursor = if results.is_empty() {
68 None
69 } else {
70 match (
71 results.first().unwrap().next_cursor,
72 results.get(1).unwrap().next_cursor,
73 ) {
74 (Some(first_cursor), Some(second_cursor)) => {
75 if first_cursor != second_cursor {
76 warn!(
77 "Cursors are not the same, received {} vs {}. Selecting the first cursor to continue",
78 first_cursor, second_cursor
79 );
80 }
81 Some(first_cursor)
82 }
83 (Some(cursor), None) | (None, Some(cursor)) => Some(cursor),
84 (None, None) => None,
85 }
86 };
87
88 results = join_all(clients.iter().map(|client| {
89 let with_query = query.clone();
90 async move {
91 query_transaction_blocks(client, with_query, cursor, None)
92 .await
93 .unwrap()
94 }
95 }))
96 .await;
97
98 let transactions: Vec<Vec<SuiTransactionBlockResponse>> =
99 results.iter().map(|page| page.data.clone()).collect();
100 cross_validate_entities(&transactions, "Transactions");
101 }
102 }
103 Ok(())
104 }
105}
106
107async fn query_transaction_blocks(
108 client: &SuiClient,
109 query: SuiTransactionBlockResponseQuery,
110 cursor: Option<TransactionDigest>,
111 limit: Option<usize>, ) -> Result<Page<SuiTransactionBlockResponse, TransactionDigest>> {
113 let transactions = client
114 .read_api()
115 .query_transaction_blocks(query, cursor, limit, true)
116 .await
117 .unwrap();
118 Ok(transactions)
119}