sui_indexer_alt_jsonrpc/data/
bigtable_reader.rsuse std::fmt::Debug;
use std::future::Future;
use std::time::{Duration, Instant};
use anyhow::anyhow;
use async_graphql::dataloader::DataLoader;
use prometheus::Registry;
use sui_kvstore::{BigTableClient, Checkpoint, KeyValueStoreReader, TransactionData};
use sui_types::digests::TransactionDigest;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::object::Object;
use sui_types::storage::ObjectKey;
use tracing::warn;
use crate::data::error::Error;
#[derive(Clone)]
pub struct BigtableReader {
client: BigTableClient,
slow_request_threshold: Duration,
}
impl BigtableReader {
pub(crate) async fn new(
instance_id: String,
registry: &Registry,
slow_request_threshold: Duration,
) -> Result<Self, Error> {
if std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_err() {
return Err(Error::BigtableCreate(anyhow!(
"Environment variable GOOGLE_APPLICATION_CREDENTIALS is not set"
)));
}
let client = BigTableClient::new_remote(
instance_id,
true,
None,
"indexer-alt-jsonrpc".to_string(),
Some(registry),
)
.await
.map_err(Error::BigtableCreate)?;
Ok(Self {
client,
slow_request_threshold,
})
}
pub(crate) fn as_data_loader(&self) -> DataLoader<Self> {
DataLoader::new(self.clone(), tokio::spawn)
}
pub(crate) async fn checkpoints(
&self,
keys: &[CheckpointSequenceNumber],
) -> Result<Vec<Checkpoint>, Error> {
measure(
self.slow_request_threshold,
"checkpoints",
&keys,
self.client.clone().get_checkpoints(keys),
)
.await
}
pub(crate) async fn transactions(
&self,
keys: &[TransactionDigest],
) -> Result<Vec<TransactionData>, Error> {
measure(
self.slow_request_threshold,
"transactions",
&keys,
self.client.clone().get_transactions(keys),
)
.await
}
pub(crate) async fn objects(&self, keys: &[ObjectKey]) -> Result<Vec<Object>, Error> {
measure(
self.slow_request_threshold,
"objects",
&keys,
self.client.clone().get_objects(keys),
)
.await
}
}
async fn measure<T, A: Debug>(
slow_request_threshold: Duration,
method: &str,
args: &A,
load: impl Future<Output = anyhow::Result<T>>,
) -> Result<T, Error> {
let start = Instant::now();
let result = load.await;
let elapsed = start.elapsed();
if elapsed > slow_request_threshold {
warn!(
elapsed_ms = elapsed.as_millis(),
threshold_ms = slow_request_threshold.as_millis(),
method,
?args,
"Slow Bigtable request"
);
}
result.map_err(Error::BigtableRead)
}