sui_indexer_alt_framework/
cluster.rs1use std::{
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_futures::service::Service;
13use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
14use url::Url;
15
16use crate::{
17 Indexer, IndexerArgs, Result,
18 ingestion::{ClientArgs, IngestionConfig},
19 metrics::{IndexerMetrics, IngestionMetrics},
20 postgres::{Db, DbArgs},
21};
22
23#[derive(clap::Parser, Debug, Default)]
27pub struct Args {
28 #[clap(flatten)]
30 pub indexer_args: IndexerArgs,
31
32 #[clap(flatten)]
34 pub client_args: Option<ClientArgs>,
35
36 #[clap(flatten)]
38 pub metrics_args: MetricsArgs,
39}
40
41pub struct IndexerCluster {
45 indexer: Indexer<Db>,
46 metrics: MetricsService,
47}
48
49#[derive(Default)]
51pub struct IndexerClusterBuilder {
52 database_url: Option<Url>,
53 db_args: DbArgs,
54 args: Args,
55 ingestion_config: IngestionConfig,
56 migrations: Option<&'static EmbeddedMigrations>,
57 metrics_prefix: Option<String>,
58}
59
60impl IndexerClusterBuilder {
61 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn with_database_url(mut self, url: Url) -> Self {
71 self.database_url = Some(url);
72 self
73 }
74
75 pub fn with_db_args(mut self, args: DbArgs) -> Self {
80 self.db_args = args;
81 self
82 }
83
84 pub fn with_args(mut self, args: Args) -> Self {
93 self.args = args;
94 self
95 }
96
97 pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
100 self.args.indexer_args = args;
101 self
102 }
103
104 pub fn with_client_args(mut self, args: ClientArgs) -> Self {
107 self.args.client_args = Some(args);
108 self
109 }
110
111 pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
114 self.args.metrics_args = args;
115 self
116 }
117
118 pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
121 self.ingestion_config = config;
122 self
123 }
124
125 pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
129 self.migrations = Some(migrations);
130 self
131 }
132
133 pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
135 self.metrics_prefix = Some(label.into());
136 self
137 }
138
139 pub async fn build(self) -> Result<IndexerCluster> {
146 let database_url = self.database_url.context("database_url is required")?;
147
148 tracing_subscriber::fmt::init();
149
150 let registry = Registry::new();
151 let metrics = MetricsService::new(self.args.metrics_args, registry);
152 let client_args = self.args.client_args.context("client_args is required")?;
153
154 let indexer = Indexer::new_from_pg(
155 database_url,
156 self.db_args,
157 self.args.indexer_args,
158 client_args,
159 self.ingestion_config,
160 self.migrations,
161 self.metrics_prefix.as_deref(),
162 metrics.registry(),
163 )
164 .await?;
165
166 Ok(IndexerCluster { indexer, metrics })
167 }
168}
169
170impl IndexerCluster {
171 pub fn builder() -> IndexerClusterBuilder {
173 IndexerClusterBuilder::new()
174 }
175
176 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
179 self.indexer.indexer_metrics()
180 }
181
182 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
185 self.indexer.ingestion_metrics()
186 }
187
188 pub async fn run(self) -> Result<Service> {
192 let s_indexer = self.indexer.run().await?;
193 let s_metrics = self.metrics.run().await?;
194
195 Ok(s_indexer.attach(s_metrics))
196 }
197}
198
199impl Deref for IndexerCluster {
200 type Target = Indexer<Db>;
201
202 fn deref(&self) -> &Self::Target {
203 &self.indexer
204 }
205}
206
207impl DerefMut for IndexerCluster {
208 fn deref_mut(&mut self) -> &mut Self::Target {
209 &mut self.indexer
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
216
217 use async_trait::async_trait;
218 use diesel::{Insertable, QueryDsl, Queryable};
219 use diesel_async::RunQueryDsl;
220 use sui_synthetic_ingestion::synthetic_ingestion;
221 use tempfile::tempdir;
222
223 use crate::FieldCount;
224 use crate::ingestion::ClientArgs;
225 use crate::ingestion::ingestion_client::IngestionClientArgs;
226 use crate::pipeline::Processor;
227 use crate::pipeline::concurrent::ConcurrentConfig;
228 use crate::postgres::{
229 Connection, Db, DbArgs,
230 temp::{TempDb, get_available_port},
231 };
232 use crate::types::full_checkpoint_content::Checkpoint;
233
234 use super::*;
235
236 diesel::table! {
237 tx_counts (cp_sequence_number) {
239 cp_sequence_number -> BigInt,
240 count -> BigInt,
241 }
242 }
243
244 #[derive(Insertable, Queryable, FieldCount)]
245 #[diesel(table_name = tx_counts)]
246 struct StoredTxCount {
247 cp_sequence_number: i64,
248 count: i64,
249 }
250
251 struct TxCounts;
253
254 #[async_trait]
255 impl Processor for TxCounts {
256 const NAME: &'static str = "tx_counts";
257 type Value = StoredTxCount;
258
259 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
260 Ok(vec![StoredTxCount {
261 cp_sequence_number: checkpoint.summary.sequence_number as i64,
262 count: checkpoint.transactions.len() as i64,
263 }])
264 }
265 }
266
267 #[async_trait]
268 impl crate::postgres::handler::Handler for TxCounts {
269 async fn commit<'a>(
270 values: &[Self::Value],
271 conn: &mut Connection<'a>,
272 ) -> anyhow::Result<usize> {
273 Ok(diesel::insert_into(tx_counts::table)
274 .values(values)
275 .on_conflict_do_nothing()
276 .execute(conn)
277 .await?)
278 }
279 }
280
281 #[tokio::test]
282 async fn test_indexer_cluster() {
283 let db = TempDb::new().expect("Failed to create temporary database");
284 let url = db.database().url();
285
286 let checkpoint_dir = tempdir().unwrap();
288 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
289 ingestion_dir: checkpoint_dir.path().to_owned(),
290 starting_checkpoint: 0,
291 num_checkpoints: 10,
292 checkpoint_size: 2,
293 })
294 .await;
295
296 let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
297 let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
298
299 {
300 let mut conn = writer.connect().await.unwrap();
303 diesel::sql_query(
304 r#"
305 CREATE TABLE tx_counts (
306 cp_sequence_number BIGINT PRIMARY KEY,
307 count BIGINT NOT NULL
308 )
309 "#,
310 )
311 .execute(&mut conn)
312 .await
313 .unwrap();
314 }
315
316 let metrics_address =
317 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
318
319 let args = Args {
320 client_args: Some(ClientArgs {
321 ingestion: IngestionClientArgs {
322 local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
323 ..Default::default()
324 },
325 ..Default::default()
326 }),
327 indexer_args: IndexerArgs {
328 first_checkpoint: Some(0),
329 last_checkpoint: Some(9),
330 ..Default::default()
331 },
332 metrics_args: MetricsArgs { metrics_address },
333 };
334
335 let mut indexer = IndexerCluster::builder()
336 .with_database_url(url.clone())
337 .with_args(args)
338 .build()
339 .await
340 .unwrap();
341
342 indexer
343 .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
344 .await
345 .unwrap();
346
347 let ingestion_metrics = indexer.ingestion_metrics().clone();
348 let indexer_metrics = indexer.indexer_metrics().clone();
349
350 indexer.run().await.unwrap().join().await.unwrap();
353
354 {
356 let mut conn = reader.connect().await.unwrap();
357 let counts: Vec<StoredTxCount> = tx_counts::table
358 .order_by(tx_counts::cp_sequence_number)
359 .load(&mut conn)
360 .await
361 .unwrap();
362
363 assert_eq!(counts.len(), 10);
364 for (i, count) in counts.iter().enumerate() {
365 assert_eq!(count.cp_sequence_number, i as i64);
366 assert_eq!(count.count, 3); }
368 }
369
370 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
372 assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
374 assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
375
376 macro_rules! assert_pipeline_metric {
377 ($name:ident, $value:expr) => {
378 assert_eq!(
379 indexer_metrics
380 .$name
381 .get_metric_with_label_values(&["tx_counts"])
382 .unwrap()
383 .get(),
384 $value
385 );
386 };
387 }
388
389 assert_pipeline_metric!(total_handler_checkpoints_received, 10);
390 assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
391 assert_pipeline_metric!(total_handler_rows_created, 10);
392 assert_pipeline_metric!(latest_processed_checkpoint, 9);
393 assert_pipeline_metric!(total_collector_checkpoints_received, 10);
394 assert_pipeline_metric!(total_collector_rows_received, 10);
395 assert_pipeline_metric!(latest_collected_checkpoint, 9);
396
397 assert_pipeline_metric!(watermark_checkpoint, 9);
399 assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
400 assert_pipeline_metric!(watermark_transaction, 30);
402 assert_pipeline_metric!(watermark_transaction_in_db, 30);
403 }
404
405 #[test]
406 fn test_individual_methods_override_bundled_args() {
407 let builder = IndexerClusterBuilder::new()
408 .with_args(Args {
409 indexer_args: IndexerArgs {
410 first_checkpoint: Some(100),
411 ..Default::default()
412 },
413 client_args: Some(ClientArgs {
414 ingestion: IngestionClientArgs {
415 local_ingestion_path: Some("/bundled".into()),
416 ..Default::default()
417 },
418 ..Default::default()
419 }),
420 metrics_args: MetricsArgs {
421 metrics_address: "127.0.0.1:8080".parse().unwrap(),
422 },
423 })
424 .with_indexer_args(IndexerArgs {
425 first_checkpoint: Some(200),
426 ..Default::default()
427 })
428 .with_client_args(ClientArgs {
429 ingestion: IngestionClientArgs {
430 local_ingestion_path: Some("/individual".into()),
431 ..Default::default()
432 },
433 ..Default::default()
434 })
435 .with_metrics_args(MetricsArgs {
436 metrics_address: "127.0.0.1:9090".parse().unwrap(),
437 });
438
439 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
440 assert_eq!(
441 builder
442 .args
443 .client_args
444 .unwrap()
445 .ingestion
446 .local_ingestion_path
447 .unwrap()
448 .to_string_lossy(),
449 "/individual"
450 );
451 assert_eq!(
452 builder.args.metrics_args.metrics_address.to_string(),
453 "127.0.0.1:9090"
454 );
455 }
456
457 #[test]
458 fn test_bundled_args_override_individual_methods() {
459 let builder = IndexerClusterBuilder::new()
460 .with_indexer_args(IndexerArgs {
461 first_checkpoint: Some(200),
462 ..Default::default()
463 })
464 .with_client_args(ClientArgs {
465 ingestion: IngestionClientArgs {
466 local_ingestion_path: Some("/individual".into()),
467 ..Default::default()
468 },
469 ..Default::default()
470 })
471 .with_metrics_args(MetricsArgs {
472 metrics_address: "127.0.0.1:9090".parse().unwrap(),
473 })
474 .with_args(Args {
475 indexer_args: IndexerArgs {
476 first_checkpoint: Some(100),
477 ..Default::default()
478 },
479 client_args: Some(ClientArgs {
480 ingestion: IngestionClientArgs {
481 local_ingestion_path: Some("/bundled".into()),
482 ..Default::default()
483 },
484 ..Default::default()
485 }),
486 metrics_args: MetricsArgs {
487 metrics_address: "127.0.0.1:8080".parse().unwrap(),
488 },
489 });
490
491 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
492 assert_eq!(
493 builder
494 .args
495 .client_args
496 .unwrap()
497 .ingestion
498 .local_ingestion_path
499 .unwrap()
500 .to_string_lossy(),
501 "/bundled"
502 );
503 assert_eq!(
504 builder.args.metrics_args.metrics_address.to_string(),
505 "127.0.0.1:8080"
506 );
507 }
508}