sui_bridge_indexer/
sui_transaction_queries.rsuse std::time::Duration;
use sui_json_rpc_types::SuiTransactionBlockResponseOptions;
use sui_json_rpc_types::SuiTransactionBlockResponseQuery;
use sui_json_rpc_types::TransactionFilter;
use sui_sdk::SuiClient;
use sui_types::digests::TransactionDigest;
use sui_types::SUI_BRIDGE_OBJECT_ID;
use sui_bridge::retry_with_max_elapsed_time;
use tracing::{error, info};
use crate::types::RetrievedTransaction;
const QUERY_DURATION: Duration = Duration::from_secs(1);
const SLEEP_DURATION: Duration = Duration::from_secs(5);
pub async fn start_sui_tx_polling_task(
sui_client: SuiClient,
mut cursor: Option<TransactionDigest>,
tx: mysten_metrics::metered_channel::Sender<(
Vec<RetrievedTransaction>,
Option<TransactionDigest>,
)>,
) {
info!("Starting SUI transaction polling task from {:?}", cursor);
loop {
let Ok(Ok(results)) = retry_with_max_elapsed_time!(
sui_client.read_api().query_transaction_blocks(
SuiTransactionBlockResponseQuery {
filter: Some(TransactionFilter::InputObject(SUI_BRIDGE_OBJECT_ID)),
options: Some(SuiTransactionBlockResponseOptions::full_content()),
},
cursor,
None,
false,
),
Duration::from_secs(600)
) else {
error!("Failed to query bridge transactions after retry");
continue;
};
info!("Retrieved {} bridge transactions", results.data.len());
let txes = match results
.data
.into_iter()
.map(RetrievedTransaction::try_from)
.collect::<anyhow::Result<Vec<_>>>()
{
Ok(data) => data,
Err(e) => {
error!(
"Failed to convert retrieved transactions to sanitized format: {}",
e
);
tokio::time::sleep(SLEEP_DURATION).await;
continue;
}
};
if txes.is_empty() {
tokio::time::sleep(QUERY_DURATION).await;
continue;
}
tx.send((txes, results.next_cursor))
.await
.expect("Failed to send transaction block to process");
cursor = results.next_cursor;
}
}