sui_indexer_alt_framework/ingestion/
client.rsuse std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use backoff::backoff::Constant;
use backoff::Error as BE;
use backoff::ExponentialBackoff;
use sui_rpc_api::client::AuthInterceptor;
use sui_rpc_api::Client;
use sui_storage::blob::Blob;
use tokio_util::bytes::Bytes;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use url::Url;
use crate::ingestion::local_client::LocalIngestionClient;
use crate::ingestion::remote_client::RemoteIngestionClient;
use crate::ingestion::Error as IngestionError;
use crate::ingestion::Result as IngestionResult;
use crate::metrics::CheckpointLagMetricReporter;
use crate::metrics::IndexerMetrics;
use crate::types::full_checkpoint_content::CheckpointData;
const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);
#[async_trait::async_trait]
pub(crate) trait IngestionClientTrait: Send + Sync {
async fn fetch(&self, checkpoint: u64) -> FetchResult;
}
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
#[error("Checkpoint not found")]
NotFound,
#[error("Failed to fetch checkpoint due to permanent error: {0}")]
Permanent(#[from] anyhow::Error),
#[error("Failed to fetch checkpoint due to {reason}: {error}")]
Transient {
reason: &'static str,
#[source]
error: anyhow::Error,
},
}
pub type FetchResult = Result<FetchData, FetchError>;
pub enum FetchData {
Raw(Bytes),
CheckpointData(CheckpointData),
}
#[derive(Clone)]
pub struct IngestionClient {
client: Arc<dyn IngestionClientTrait>,
metrics: Arc<IndexerMetrics>,
checkpoint_lag_reporter: Arc<CheckpointLagMetricReporter>,
}
impl IngestionClient {
pub(crate) fn new_remote(url: Url, metrics: Arc<IndexerMetrics>) -> IngestionResult<Self> {
let client = Arc::new(RemoteIngestionClient::new(url)?);
Ok(Self::new_impl(client, metrics))
}
pub(crate) fn new_local(path: PathBuf, metrics: Arc<IndexerMetrics>) -> Self {
let client = Arc::new(LocalIngestionClient::new(path));
Self::new_impl(client, metrics)
}
pub(crate) fn new_rpc(
url: Url,
username: Option<String>,
password: Option<String>,
metrics: Arc<IndexerMetrics>,
) -> IngestionResult<Self> {
let client = if let Some(username) = username {
Client::new(url.to_string())?.with_auth(AuthInterceptor::basic(username, password))
} else {
Client::new(url.to_string())?
};
Ok(Self::new_impl(Arc::new(client), metrics))
}
fn new_impl(client: Arc<dyn IngestionClientTrait>, metrics: Arc<IndexerMetrics>) -> Self {
let checkpoint_lag_reporter = CheckpointLagMetricReporter::new(
metrics.ingested_checkpoint_timestamp_lag.clone(),
metrics.latest_ingested_checkpoint_timestamp_lag_ms.clone(),
metrics.latest_ingested_checkpoint.clone(),
);
IngestionClient {
client,
metrics,
checkpoint_lag_reporter,
}
}
pub async fn wait_for(
&self,
checkpoint: u64,
retry_interval: Duration,
cancel: &CancellationToken,
) -> IngestionResult<Arc<CheckpointData>> {
let backoff = Constant::new(retry_interval);
let fetch = || async move {
use backoff::Error as BE;
if cancel.is_cancelled() {
return Err(BE::permanent(IngestionError::Cancelled));
}
self.fetch(checkpoint, cancel).await.map_err(|e| match e {
IngestionError::NotFound(checkpoint) => {
debug!(checkpoint, "Checkpoint not found, retrying...");
self.metrics.total_ingested_not_found_retries.inc();
BE::transient(e)
}
e => BE::permanent(e),
})
};
backoff::future::retry(backoff, fetch).await
}
pub(crate) async fn fetch(
&self,
checkpoint: u64,
cancel: &CancellationToken,
) -> IngestionResult<Arc<CheckpointData>> {
let client = self.client.clone();
let request = move || {
let client = client.clone();
async move {
if cancel.is_cancelled() {
return Err(BE::permanent(IngestionError::Cancelled));
}
let fetch_data = client.fetch(checkpoint).await.map_err(|err| match err {
FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
FetchError::Permanent(error) => {
BE::permanent(IngestionError::FetchError(checkpoint, error))
}
FetchError::Transient { reason, error } => self.metrics.inc_retry(
checkpoint,
reason,
IngestionError::FetchError(checkpoint, error),
),
})?;
Ok(match fetch_data {
FetchData::Raw(bytes) => {
self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
Blob::from_bytes(&bytes).map_err(|e| {
self.metrics.inc_retry(
checkpoint,
"deserialization",
IngestionError::DeserializationError(checkpoint, e),
)
})?
}
FetchData::CheckpointData(data) => {
data
}
})
}
};
let backoff = ExponentialBackoff {
max_interval: MAX_TRANSIENT_RETRY_INTERVAL,
max_elapsed_time: None,
..Default::default()
};
let guard = self.metrics.ingested_checkpoint_latency.start_timer();
let data = backoff::future::retry(backoff, request).await?;
let elapsed = guard.stop_and_record();
debug!(
checkpoint,
elapsed_ms = elapsed * 1000.0,
"Fetched checkpoint"
);
self.checkpoint_lag_reporter
.report_lag(checkpoint, data.checkpoint_summary.timestamp_ms);
self.metrics.total_ingested_checkpoints.inc();
self.metrics
.total_ingested_transactions
.inc_by(data.transactions.len() as u64);
self.metrics.total_ingested_events.inc_by(
data.transactions
.iter()
.map(|tx| tx.events.as_ref().map_or(0, |evs| evs.data.len()) as u64)
.sum(),
);
self.metrics.total_ingested_inputs.inc_by(
data.transactions
.iter()
.map(|tx| tx.input_objects.len() as u64)
.sum(),
);
self.metrics.total_ingested_outputs.inc_by(
data.transactions
.iter()
.map(|tx| tx.output_objects.len() as u64)
.sum(),
);
Ok(Arc::new(data))
}
}