sui_indexer_alt_framework/
postgres.rsuse anyhow::{Context, Result};
use diesel_migrations::EmbeddedMigrations;
use prometheus::Registry;
use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
use sui_pg_db::temp::TempDb;
use tempfile::tempdir;
use tokio_util::sync::CancellationToken;
use url::Url;
use crate::{
ingestion::{ClientArgs, IngestionConfig},
Indexer, IndexerArgs,
};
pub use sui_pg_db::*;
impl Indexer<Db> {
pub async fn new_from_pg(
database_url: Url,
db_args: DbArgs,
indexer_args: IndexerArgs,
client_args: ClientArgs,
ingestion_config: IngestionConfig,
migrations: Option<&'static EmbeddedMigrations>,
metrics_prefix: Option<&str>,
registry: &Registry,
cancel: CancellationToken,
) -> Result<Self> {
let store = Db::for_write(database_url, db_args) .await
.context("Failed to connect to database")?;
store
.run_migrations(migrations)
.await
.context("Failed to run pending migrations")?;
registry.register(Box::new(DbConnectionStatsCollector::new(
Some("indexer_db"),
store.clone(),
)))?;
Indexer::new(
store,
indexer_args,
client_args,
ingestion_config,
metrics_prefix,
registry,
cancel,
)
.await
}
pub async fn new_for_testing(migrations: &'static EmbeddedMigrations) -> (Indexer<Db>, TempDb) {
let temp_db = TempDb::new().unwrap();
let store = Db::for_write(temp_db.database().url().clone(), DbArgs::default())
.await
.unwrap();
store.run_migrations(Some(migrations)).await.unwrap();
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
remote_store_url: None,
local_ingestion_path: Some(tempdir().unwrap().keep()),
rpc_api_url: None,
rpc_username: None,
rpc_password: None,
},
IngestionConfig::default(),
None,
&Registry::new(),
CancellationToken::new(),
)
.await
.unwrap();
(indexer, temp_db)
}
}
#[cfg(test)]
pub mod tests {
use async_trait::async_trait;
use std::sync::Arc;
use sui_indexer_alt_framework_store_traits::{CommitterWatermark, Store};
use sui_types::full_checkpoint_content::CheckpointData;
use super::*;
use crate::pipeline::concurrent;
use crate::{pipeline::Processor, store::Connection, ConcurrentConfig, FieldCount};
#[derive(FieldCount)]
struct V {
_v: u64,
}
macro_rules! define_test_concurrent_pipeline {
($name:ident) => {
struct $name;
impl Processor for $name {
const NAME: &'static str = stringify!($name);
type Value = V;
fn process(
&self,
_checkpoint: &Arc<CheckpointData>,
) -> anyhow::Result<Vec<Self::Value>> {
todo!()
}
}
#[async_trait]
impl concurrent::Handler for $name {
type Store = Db;
async fn commit<'a>(
_values: &[Self::Value],
_conn: &mut <Self::Store as Store>::Connection<'a>,
) -> anyhow::Result<usize> {
todo!()
}
}
};
}
define_test_concurrent_pipeline!(ConcurrentPipeline1);
define_test_concurrent_pipeline!(ConcurrentPipeline2);
#[tokio::test]
async fn test_add_new_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 0);
}
#[tokio::test]
async fn test_add_existing_pipeline() {
let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
{
let watermark = CommitterWatermark::new_for_testing(10);
let mut conn = indexer.store().connect().await.unwrap();
assert!(conn
.set_committer_watermark(ConcurrentPipeline1::NAME, watermark)
.await
.unwrap());
}
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}
#[tokio::test]
async fn test_add_multiple_pipelines() {
let (mut indexer, _temp_db) = Indexer::new_for_testing(&MIGRATIONS).await;
{
let watermark1 = CommitterWatermark::new_for_testing(10);
let mut conn = indexer.store().connect().await.unwrap();
assert!(conn
.set_committer_watermark(ConcurrentPipeline1::NAME, watermark1)
.await
.unwrap());
let watermark2 = CommitterWatermark::new_for_testing(20);
assert!(conn
.set_committer_watermark(ConcurrentPipeline2::NAME, watermark2)
.await
.unwrap());
}
indexer
.concurrent_pipeline(ConcurrentPipeline2, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 21);
indexer
.concurrent_pipeline(ConcurrentPipeline1, ConcurrentConfig::default())
.await
.unwrap();
assert_eq!(indexer.first_checkpoint_from_watermark, 11);
}
}