sui_indexer_alt_jsonrpc/data/
pg_reader.rsuse std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use async_graphql::dataloader::DataLoader;
use diesel::deserialize::FromSqlRow;
use diesel::expression::QueryMetadata;
use diesel::pg::Pg;
use diesel::query_builder::{Query, QueryFragment, QueryId};
use diesel::query_dsl::methods::LimitDsl;
use diesel::query_dsl::CompatibleType;
use diesel_async::RunQueryDsl;
use prometheus::Registry;
use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
use sui_pg_db as db;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use url::Url;
use crate::{data::error::Error, metrics::RpcMetrics};
#[derive(Clone)]
pub(crate) struct PgReader {
db: Option<db::Db>,
metrics: Arc<RpcMetrics>,
cancel: CancellationToken,
slow_query_threshold: Duration,
}
pub(crate) struct Connection<'p> {
conn: db::Connection<'p>,
metrics: Arc<RpcMetrics>,
slow_query_threshold: Duration,
}
impl PgReader {
pub(crate) async fn new(
database_url: Option<Url>,
db_args: db::DbArgs,
metrics: Arc<RpcMetrics>,
registry: &Registry,
cancel: CancellationToken,
slow_query_threshold: Duration,
) -> Result<Self, Error> {
let db = if let Some(database_url) = database_url {
let db = db::Db::for_read(database_url, db_args)
.await
.map_err(Error::PgCreate)?;
registry
.register(Box::new(DbConnectionStatsCollector::new(
Some("rpc_db"),
db.clone(),
)))
.map_err(|e| Error::PgCreate(e.into()))?;
Some(db)
} else {
None
};
Ok(Self {
db,
metrics,
cancel,
slow_query_threshold,
})
}
pub(crate) fn as_data_loader(&self) -> DataLoader<Self> {
DataLoader::new(self.clone(), tokio::spawn)
}
pub(crate) async fn connect(&self) -> Result<Connection<'_>, Error> {
let Some(db) = &self.db else {
return Err(Error::PgConnect(anyhow!("No database to connect to")));
};
tokio::select! {
_ = self.cancel.cancelled() => {
Err(Error::PgConnect(anyhow!("Cancelled while connecting to the database")))
}
conn = db.connect() => {
Ok(Connection {
conn: conn.map_err(Error::PgConnect)?,
metrics: self.metrics.clone(),
slow_query_threshold: self.slow_query_threshold,
})
}
}
}
}
impl Connection<'_> {
pub(crate) async fn first<'q, Q, ST, U>(&mut self, query: Q) -> Result<U, Error>
where
Q: LimitDsl,
Q::Output: Query + QueryFragment<Pg> + QueryId + Send + 'q,
<Q::Output as Query>::SqlType: CompatibleType<U, Pg, SqlType = ST>,
U: Send + FromSqlRow<ST, Pg> + 'static,
Pg: QueryMetadata<<Q::Output as Query>::SqlType>,
ST: 'static,
{
let query = query.limit(1);
let query_debug = diesel::debug_query(&query).to_string();
debug!("{query_debug}");
let timer = self.metrics.db_latency.start_timer();
let res = query.get_result(&mut self.conn).await;
let elapsed_ms = timer.stop_and_record() * 1000.0;
let threshold_ms = self.slow_query_threshold.as_millis() as f64;
if elapsed_ms > threshold_ms {
warn!(
elapsed_ms,
threshold_ms,
query = query_debug,
"Slow database query detected!",
);
}
if res.is_ok() {
self.metrics.db_requests_succeeded.inc();
} else {
self.metrics.db_requests_failed.inc();
}
Ok(res?)
}
pub(crate) async fn results<'q, Q, ST, U>(&mut self, query: Q) -> Result<Vec<U>, Error>
where
Q: Query + QueryFragment<Pg> + QueryId + Send + 'q,
Q::SqlType: CompatibleType<U, Pg, SqlType = ST>,
U: Send + FromSqlRow<ST, Pg> + 'static,
Pg: QueryMetadata<Q::SqlType>,
ST: 'static,
{
let query_debug = diesel::debug_query(&query).to_string();
debug!("{query_debug}");
let timer = self.metrics.db_latency.start_timer();
let res = query.get_results(&mut self.conn).await;
let elapsed_ms = timer.stop_and_record() * 1000.0;
let threshold_ms = self.slow_query_threshold.as_millis() as f64;
if elapsed_ms > threshold_ms {
warn!(
elapsed_ms,
threshold_ms,
query = query_debug,
"Slow database query detected!",
);
}
if res.is_ok() {
self.metrics.db_requests_succeeded.inc();
} else {
self.metrics.db_requests_failed.inc();
}
Ok(res?)
}
}