sui_indexer_alt_framework/
cluster.rs1use std::{
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
13use tokio::{signal, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16use url::Url;
17
18use crate::{
19 Indexer, IndexerArgs, Result,
20 ingestion::{ClientArgs, IngestionConfig},
21 metrics::{IndexerMetrics, IngestionMetrics},
22 postgres::{Db, DbArgs},
23};
24
25#[derive(clap::Parser, Debug, Default)]
29pub struct Args {
30 #[clap(flatten)]
32 pub indexer_args: IndexerArgs,
33
34 #[clap(flatten)]
36 pub client_args: Option<ClientArgs>,
37
38 #[clap(flatten)]
40 pub metrics_args: MetricsArgs,
41}
42
43pub struct IndexerCluster {
47 indexer: Indexer<Db>,
48 metrics: MetricsService,
49
50 cancel: CancellationToken,
52}
53
54#[derive(Default)]
56pub struct IndexerClusterBuilder {
57 database_url: Option<Url>,
58 db_args: DbArgs,
59 args: Args,
60 ingestion_config: IngestionConfig,
61 migrations: Option<&'static EmbeddedMigrations>,
62 metrics_prefix: Option<String>,
63}
64
65impl IndexerClusterBuilder {
66 pub fn new() -> Self {
68 Self::default()
69 }
70
71 pub fn with_database_url(mut self, url: Url) -> Self {
76 self.database_url = Some(url);
77 self
78 }
79
80 pub fn with_db_args(mut self, args: DbArgs) -> Self {
85 self.db_args = args;
86 self
87 }
88
89 pub fn with_args(mut self, args: Args) -> Self {
98 self.args = args;
99 self
100 }
101
102 pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
105 self.args.indexer_args = args;
106 self
107 }
108
109 pub fn with_client_args(mut self, args: ClientArgs) -> Self {
112 self.args.client_args = Some(args);
113 self
114 }
115
116 pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
119 self.args.metrics_args = args;
120 self
121 }
122
123 pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
126 self.ingestion_config = config;
127 self
128 }
129
130 pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
134 self.migrations = Some(migrations);
135 self
136 }
137
138 pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
140 self.metrics_prefix = Some(label.into());
141 self
142 }
143
144 pub async fn build(self) -> Result<IndexerCluster> {
151 let database_url = self.database_url.context("database_url is required")?;
152
153 tracing_subscriber::fmt::init();
154
155 let cancel = CancellationToken::new();
156 let registry = Registry::new();
157 let metrics = MetricsService::new(self.args.metrics_args, registry, cancel.child_token());
158 let client_args = self.args.client_args.context("client_args is required")?;
159
160 let indexer = Indexer::new_from_pg(
161 database_url,
162 self.db_args,
163 self.args.indexer_args,
164 client_args,
165 self.ingestion_config,
166 self.migrations,
167 self.metrics_prefix.as_deref(),
168 metrics.registry(),
169 cancel.child_token(),
170 )
171 .await?;
172
173 Ok(IndexerCluster {
174 indexer,
175 metrics,
176 cancel,
177 })
178 }
179}
180
181impl IndexerCluster {
182 pub fn builder() -> IndexerClusterBuilder {
184 IndexerClusterBuilder::new()
185 }
186
187 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
190 self.indexer.indexer_metrics()
191 }
192
193 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
196 self.indexer.ingestion_metrics()
197 }
198
199 pub fn cancel(&self) -> &CancellationToken {
202 &self.cancel
203 }
204
205 pub async fn run(self) -> Result<JoinHandle<()>> {
209 let h_ctrl_c = tokio::spawn({
210 let cancel = self.cancel.clone();
211 async move {
212 tokio::select! {
213 _ = cancel.cancelled() => {}
214 _ = signal::ctrl_c() => {
215 info!("Received Ctrl-C, shutting down...");
216 cancel.cancel();
217 }
218 }
219 }
220 });
221
222 let h_metrics = self.metrics.run().await?;
223 let h_indexer = self.indexer.run().await?;
224
225 Ok(tokio::spawn(async move {
226 let _ = h_indexer.await;
227 self.cancel.cancel();
228 let _ = h_metrics.await;
229 let _ = h_ctrl_c.await;
230 }))
231 }
232}
233
234impl Deref for IndexerCluster {
235 type Target = Indexer<Db>;
236
237 fn deref(&self) -> &Self::Target {
238 &self.indexer
239 }
240}
241
242impl DerefMut for IndexerCluster {
243 fn deref_mut(&mut self) -> &mut Self::Target {
244 &mut self.indexer
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
251
252 use async_trait::async_trait;
253 use diesel::{Insertable, QueryDsl, Queryable};
254 use diesel_async::RunQueryDsl;
255 use sui_synthetic_ingestion::synthetic_ingestion;
256 use tempfile::tempdir;
257
258 use crate::FieldCount;
259 use crate::ingestion::ClientArgs;
260 use crate::ingestion::ingestion_client::IngestionClientArgs;
261 use crate::pipeline::Processor;
262 use crate::pipeline::concurrent::ConcurrentConfig;
263 use crate::postgres::{
264 Connection, Db, DbArgs,
265 temp::{TempDb, get_available_port},
266 };
267 use crate::types::full_checkpoint_content::Checkpoint;
268
269 use super::*;
270
271 diesel::table! {
272 tx_counts (cp_sequence_number) {
274 cp_sequence_number -> BigInt,
275 count -> BigInt,
276 }
277 }
278
279 #[derive(Insertable, Queryable, FieldCount)]
280 #[diesel(table_name = tx_counts)]
281 struct StoredTxCount {
282 cp_sequence_number: i64,
283 count: i64,
284 }
285
286 struct TxCounts;
288
289 #[async_trait]
290 impl Processor for TxCounts {
291 const NAME: &'static str = "tx_counts";
292 type Value = StoredTxCount;
293
294 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
295 Ok(vec![StoredTxCount {
296 cp_sequence_number: checkpoint.summary.sequence_number as i64,
297 count: checkpoint.transactions.len() as i64,
298 }])
299 }
300 }
301
302 #[async_trait]
303 impl crate::postgres::handler::Handler for TxCounts {
304 async fn commit<'a>(
305 values: &[Self::Value],
306 conn: &mut Connection<'a>,
307 ) -> anyhow::Result<usize> {
308 Ok(diesel::insert_into(tx_counts::table)
309 .values(values)
310 .on_conflict_do_nothing()
311 .execute(conn)
312 .await?)
313 }
314 }
315
316 #[tokio::test]
317 async fn test_indexer_cluster() {
318 let db = TempDb::new().expect("Failed to create temporary database");
319 let url = db.database().url();
320
321 let checkpoint_dir = tempdir().unwrap();
323 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
324 ingestion_dir: checkpoint_dir.path().to_owned(),
325 starting_checkpoint: 0,
326 num_checkpoints: 10,
327 checkpoint_size: 2,
328 })
329 .await;
330
331 let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
332 let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
333
334 {
335 let mut conn = writer.connect().await.unwrap();
338 diesel::sql_query(
339 r#"
340 CREATE TABLE tx_counts (
341 cp_sequence_number BIGINT PRIMARY KEY,
342 count BIGINT NOT NULL
343 )
344 "#,
345 )
346 .execute(&mut conn)
347 .await
348 .unwrap();
349 }
350
351 let metrics_address =
352 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
353
354 let args = Args {
355 client_args: Some(ClientArgs {
356 ingestion: IngestionClientArgs {
357 local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
358 ..Default::default()
359 },
360 ..Default::default()
361 }),
362 indexer_args: IndexerArgs {
363 first_checkpoint: Some(0),
364 last_checkpoint: Some(9),
365 ..Default::default()
366 },
367 metrics_args: MetricsArgs { metrics_address },
368 };
369
370 let mut indexer = IndexerCluster::builder()
371 .with_database_url(url.clone())
372 .with_args(args)
373 .build()
374 .await
375 .unwrap();
376
377 indexer
378 .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
379 .await
380 .unwrap();
381
382 let ingestion_metrics = indexer.ingestion_metrics().clone();
383 let indexer_metrics = indexer.indexer_metrics().clone();
384
385 indexer.run().await.unwrap().await.unwrap();
388
389 {
391 let mut conn = reader.connect().await.unwrap();
392 let counts: Vec<StoredTxCount> = tx_counts::table
393 .order_by(tx_counts::cp_sequence_number)
394 .load(&mut conn)
395 .await
396 .unwrap();
397
398 assert_eq!(counts.len(), 10);
399 for (i, count) in counts.iter().enumerate() {
400 assert_eq!(count.cp_sequence_number, i as i64);
401 assert_eq!(count.count, 2);
402 }
403 }
404
405 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
407 assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 20);
408 assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
409
410 macro_rules! assert_pipeline_metric {
411 ($name:ident, $value:expr) => {
412 assert_eq!(
413 indexer_metrics
414 .$name
415 .get_metric_with_label_values(&["tx_counts"])
416 .unwrap()
417 .get(),
418 $value
419 );
420 };
421 }
422
423 assert_pipeline_metric!(total_handler_checkpoints_received, 10);
424 assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
425 assert_pipeline_metric!(total_handler_rows_created, 10);
426 assert_pipeline_metric!(latest_processed_checkpoint, 9);
427 assert_pipeline_metric!(total_collector_checkpoints_received, 10);
428 assert_pipeline_metric!(total_collector_rows_received, 10);
429 assert_pipeline_metric!(latest_collected_checkpoint, 9);
430
431 assert_pipeline_metric!(watermark_checkpoint, 9);
433 assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
434 assert_pipeline_metric!(watermark_transaction, 20);
435 assert_pipeline_metric!(watermark_transaction_in_db, 20);
436 }
437
438 #[test]
439 fn test_individual_methods_override_bundled_args() {
440 let builder = IndexerClusterBuilder::new()
441 .with_args(Args {
442 indexer_args: IndexerArgs {
443 first_checkpoint: Some(100),
444 ..Default::default()
445 },
446 client_args: Some(ClientArgs {
447 ingestion: IngestionClientArgs {
448 local_ingestion_path: Some("/bundled".into()),
449 ..Default::default()
450 },
451 ..Default::default()
452 }),
453 metrics_args: MetricsArgs {
454 metrics_address: "127.0.0.1:8080".parse().unwrap(),
455 },
456 })
457 .with_indexer_args(IndexerArgs {
458 first_checkpoint: Some(200),
459 ..Default::default()
460 })
461 .with_client_args(ClientArgs {
462 ingestion: IngestionClientArgs {
463 local_ingestion_path: Some("/individual".into()),
464 ..Default::default()
465 },
466 ..Default::default()
467 })
468 .with_metrics_args(MetricsArgs {
469 metrics_address: "127.0.0.1:9090".parse().unwrap(),
470 });
471
472 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
473 assert_eq!(
474 builder
475 .args
476 .client_args
477 .unwrap()
478 .ingestion
479 .local_ingestion_path
480 .unwrap()
481 .to_string_lossy(),
482 "/individual"
483 );
484 assert_eq!(
485 builder.args.metrics_args.metrics_address.to_string(),
486 "127.0.0.1:9090"
487 );
488 }
489
490 #[test]
491 fn test_bundled_args_override_individual_methods() {
492 let builder = IndexerClusterBuilder::new()
493 .with_indexer_args(IndexerArgs {
494 first_checkpoint: Some(200),
495 ..Default::default()
496 })
497 .with_client_args(ClientArgs {
498 ingestion: IngestionClientArgs {
499 local_ingestion_path: Some("/individual".into()),
500 ..Default::default()
501 },
502 ..Default::default()
503 })
504 .with_metrics_args(MetricsArgs {
505 metrics_address: "127.0.0.1:9090".parse().unwrap(),
506 })
507 .with_args(Args {
508 indexer_args: IndexerArgs {
509 first_checkpoint: Some(100),
510 ..Default::default()
511 },
512 client_args: Some(ClientArgs {
513 ingestion: IngestionClientArgs {
514 local_ingestion_path: Some("/bundled".into()),
515 ..Default::default()
516 },
517 ..Default::default()
518 }),
519 metrics_args: MetricsArgs {
520 metrics_address: "127.0.0.1:8080".parse().unwrap(),
521 },
522 });
523
524 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
525 assert_eq!(
526 builder
527 .args
528 .client_args
529 .unwrap()
530 .ingestion
531 .local_ingestion_path
532 .unwrap()
533 .to_string_lossy(),
534 "/bundled"
535 );
536 assert_eq!(
537 builder.args.metrics_args.metrics_address.to_string(),
538 "127.0.0.1:8080"
539 );
540 }
541}