sui_indexer_alt_framework/
cluster.rs1use std::{
5 ops::{Deref, DerefMut},
6 sync::Arc,
7};
8
9use anyhow::Context;
10use diesel_migrations::EmbeddedMigrations;
11use prometheus::Registry;
12use sui_indexer_alt_metrics::{MetricsArgs, MetricsService};
13use tokio::{signal, task::JoinHandle};
14use tokio_util::sync::CancellationToken;
15use tracing::info;
16use url::Url;
17
18use crate::postgres::{Db, DbArgs};
19use crate::{
20 Indexer, IndexerArgs, IndexerMetrics, Result,
21 ingestion::{ClientArgs, IngestionConfig},
22};
23
24#[derive(clap::Parser, Debug, Default)]
28pub struct Args {
29 #[clap(flatten)]
31 pub indexer_args: IndexerArgs,
32
33 #[clap(flatten)]
35 pub client_args: Option<ClientArgs>,
36
37 #[clap(flatten)]
39 pub metrics_args: MetricsArgs,
40}
41
42pub struct IndexerCluster {
46 indexer: Indexer<Db>,
47 metrics: MetricsService,
48
49 cancel: CancellationToken,
51}
52
53#[derive(Default)]
55pub struct IndexerClusterBuilder {
56 database_url: Option<Url>,
57 db_args: DbArgs,
58 args: Args,
59 ingestion_config: IngestionConfig,
60 migrations: Option<&'static EmbeddedMigrations>,
61 metrics_prefix: Option<String>,
62}
63
64impl IndexerClusterBuilder {
65 pub fn new() -> Self {
67 Self::default()
68 }
69
70 pub fn with_database_url(mut self, url: Url) -> Self {
75 self.database_url = Some(url);
76 self
77 }
78
79 pub fn with_db_args(mut self, args: DbArgs) -> Self {
84 self.db_args = args;
85 self
86 }
87
88 pub fn with_args(mut self, args: Args) -> Self {
97 self.args = args;
98 self
99 }
100
101 pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
104 self.args.indexer_args = args;
105 self
106 }
107
108 pub fn with_client_args(mut self, args: ClientArgs) -> Self {
111 self.args.client_args = Some(args);
112 self
113 }
114
115 pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
118 self.args.metrics_args = args;
119 self
120 }
121
122 pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
125 self.ingestion_config = config;
126 self
127 }
128
129 pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
133 self.migrations = Some(migrations);
134 self
135 }
136
137 pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
139 self.metrics_prefix = Some(label.into());
140 self
141 }
142
143 pub async fn build(self) -> Result<IndexerCluster> {
150 let database_url = self.database_url.context("database_url is required")?;
151
152 tracing_subscriber::fmt::init();
153
154 let cancel = CancellationToken::new();
155 let registry = Registry::new();
156 let metrics = MetricsService::new(self.args.metrics_args, registry, cancel.child_token());
157 let client_args = self.args.client_args.context("client_args is required")?;
158
159 let indexer = Indexer::new_from_pg(
160 database_url,
161 self.db_args,
162 self.args.indexer_args,
163 client_args,
164 self.ingestion_config,
165 self.migrations,
166 self.metrics_prefix.as_deref(),
167 metrics.registry(),
168 cancel.child_token(),
169 )
170 .await?;
171
172 Ok(IndexerCluster {
173 indexer,
174 metrics,
175 cancel,
176 })
177 }
178}
179
180impl IndexerCluster {
181 pub fn builder() -> IndexerClusterBuilder {
183 IndexerClusterBuilder::new()
184 }
185
186 pub fn metrics(&self) -> &Arc<IndexerMetrics> {
189 self.indexer.metrics()
190 }
191
192 pub fn cancel(&self) -> &CancellationToken {
195 &self.cancel
196 }
197
198 pub async fn run(self) -> Result<JoinHandle<()>> {
202 let h_ctrl_c = tokio::spawn({
203 let cancel = self.cancel.clone();
204 async move {
205 tokio::select! {
206 _ = cancel.cancelled() => {}
207 _ = signal::ctrl_c() => {
208 info!("Received Ctrl-C, shutting down...");
209 cancel.cancel();
210 }
211 }
212 }
213 });
214
215 let h_metrics = self.metrics.run().await?;
216 let h_indexer = self.indexer.run().await?;
217
218 Ok(tokio::spawn(async move {
219 let _ = h_indexer.await;
220 self.cancel.cancel();
221 let _ = h_metrics.await;
222 let _ = h_ctrl_c.await;
223 }))
224 }
225}
226
227impl Deref for IndexerCluster {
228 type Target = Indexer<Db>;
229
230 fn deref(&self) -> &Self::Target {
231 &self.indexer
232 }
233}
234
235impl DerefMut for IndexerCluster {
236 fn deref_mut(&mut self) -> &mut Self::Target {
237 &mut self.indexer
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
244
245 use async_trait::async_trait;
246 use diesel::{Insertable, QueryDsl, Queryable};
247 use diesel_async::RunQueryDsl;
248 use sui_synthetic_ingestion::synthetic_ingestion;
249 use tempfile::tempdir;
250
251 use crate::FieldCount;
252 use crate::ingestion::ClientArgs;
253 use crate::pipeline::Processor;
254 use crate::pipeline::concurrent::{self, ConcurrentConfig};
255 use crate::postgres::{
256 Connection, Db, DbArgs,
257 temp::{TempDb, get_available_port},
258 };
259 use crate::types::full_checkpoint_content::CheckpointData;
260
261 use super::*;
262
263 diesel::table! {
264 tx_counts (cp_sequence_number) {
266 cp_sequence_number -> BigInt,
267 count -> BigInt,
268 }
269 }
270
271 #[derive(Insertable, Queryable, FieldCount)]
272 #[diesel(table_name = tx_counts)]
273 struct StoredTxCount {
274 cp_sequence_number: i64,
275 count: i64,
276 }
277
278 struct TxCounts;
280
281 #[async_trait]
282 impl Processor for TxCounts {
283 const NAME: &'static str = "tx_counts";
284 type Value = StoredTxCount;
285
286 async fn process(
287 &self,
288 checkpoint: &Arc<CheckpointData>,
289 ) -> anyhow::Result<Vec<Self::Value>> {
290 Ok(vec![StoredTxCount {
291 cp_sequence_number: checkpoint.checkpoint_summary.sequence_number as i64,
292 count: checkpoint.transactions.len() as i64,
293 }])
294 }
295 }
296
297 #[async_trait]
298 impl concurrent::Handler for TxCounts {
299 type Store = Db;
300
301 async fn commit<'a>(
302 values: &[Self::Value],
303 conn: &mut Connection<'a>,
304 ) -> anyhow::Result<usize> {
305 Ok(diesel::insert_into(tx_counts::table)
306 .values(values)
307 .on_conflict_do_nothing()
308 .execute(conn)
309 .await?)
310 }
311 }
312
313 #[tokio::test]
314 async fn test_indexer_cluster() {
315 let db = TempDb::new().expect("Failed to create temporary database");
316 let url = db.database().url();
317
318 let checkpoint_dir = tempdir().unwrap();
320 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
321 ingestion_dir: checkpoint_dir.path().to_owned(),
322 starting_checkpoint: 0,
323 num_checkpoints: 10,
324 checkpoint_size: 2,
325 })
326 .await;
327
328 let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
329 let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
330
331 {
332 let mut conn = writer.connect().await.unwrap();
335 diesel::sql_query(
336 r#"
337 CREATE TABLE tx_counts (
338 cp_sequence_number BIGINT PRIMARY KEY,
339 count BIGINT NOT NULL
340 )
341 "#,
342 )
343 .execute(&mut conn)
344 .await
345 .unwrap();
346 }
347
348 let metrics_address =
349 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
350
351 let args = Args {
352 client_args: Some(ClientArgs {
353 local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
354 remote_store_url: None,
355 rpc_api_url: None,
356 rpc_username: None,
357 rpc_password: None,
358 }),
359 indexer_args: IndexerArgs {
360 first_checkpoint: Some(0),
361 last_checkpoint: Some(9),
362 ..Default::default()
363 },
364 metrics_args: MetricsArgs { metrics_address },
365 };
366
367 let mut indexer = IndexerCluster::builder()
368 .with_database_url(url.clone())
369 .with_args(args)
370 .build()
371 .await
372 .unwrap();
373
374 indexer
375 .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
376 .await
377 .unwrap();
378
379 let metrics = indexer.metrics().clone();
380
381 indexer.run().await.unwrap().await.unwrap();
384
385 {
387 let mut conn = reader.connect().await.unwrap();
388 let counts: Vec<StoredTxCount> = tx_counts::table
389 .order_by(tx_counts::cp_sequence_number)
390 .load(&mut conn)
391 .await
392 .unwrap();
393
394 assert_eq!(counts.len(), 10);
395 for (i, count) in counts.iter().enumerate() {
396 assert_eq!(count.cp_sequence_number, i as i64);
397 assert_eq!(count.count, 2);
398 }
399 }
400
401 assert_eq!(metrics.total_ingested_checkpoints.get(), 10);
403 assert_eq!(metrics.total_ingested_transactions.get(), 20);
404 assert_eq!(metrics.latest_ingested_checkpoint.get(), 9);
405
406 macro_rules! assert_pipeline_metric {
407 ($name:ident, $value:expr) => {
408 assert_eq!(
409 metrics
410 .$name
411 .get_metric_with_label_values(&["tx_counts"])
412 .unwrap()
413 .get(),
414 $value
415 );
416 };
417 }
418
419 assert_pipeline_metric!(total_handler_checkpoints_received, 10);
420 assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
421 assert_pipeline_metric!(total_handler_rows_created, 10);
422 assert_pipeline_metric!(latest_processed_checkpoint, 9);
423 assert_pipeline_metric!(total_collector_checkpoints_received, 10);
424 assert_pipeline_metric!(total_collector_rows_received, 10);
425 assert_pipeline_metric!(latest_collected_checkpoint, 9);
426
427 assert_pipeline_metric!(watermark_checkpoint, 9);
429 assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
430 assert_pipeline_metric!(watermark_transaction, 20);
431 assert_pipeline_metric!(watermark_transaction_in_db, 20);
432 }
433
434 #[test]
435 fn test_individual_methods_override_bundled_args() {
436 let builder = IndexerClusterBuilder::new()
437 .with_args(Args {
438 indexer_args: IndexerArgs {
439 first_checkpoint: Some(100),
440 ..Default::default()
441 },
442 client_args: Some(ClientArgs {
443 local_ingestion_path: Some("/bundled".into()),
444 ..Default::default()
445 }),
446 metrics_args: MetricsArgs {
447 metrics_address: "127.0.0.1:8080".parse().unwrap(),
448 },
449 })
450 .with_indexer_args(IndexerArgs {
451 first_checkpoint: Some(200),
452 ..Default::default()
453 })
454 .with_client_args(ClientArgs {
455 local_ingestion_path: Some("/individual".into()),
456 ..Default::default()
457 })
458 .with_metrics_args(MetricsArgs {
459 metrics_address: "127.0.0.1:9090".parse().unwrap(),
460 });
461
462 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
463 assert_eq!(
464 builder
465 .args
466 .client_args
467 .unwrap()
468 .local_ingestion_path
469 .unwrap()
470 .to_string_lossy(),
471 "/individual"
472 );
473 assert_eq!(
474 builder.args.metrics_args.metrics_address.to_string(),
475 "127.0.0.1:9090"
476 );
477 }
478
479 #[test]
480 fn test_bundled_args_override_individual_methods() {
481 let builder = IndexerClusterBuilder::new()
482 .with_indexer_args(IndexerArgs {
483 first_checkpoint: Some(200),
484 ..Default::default()
485 })
486 .with_client_args(ClientArgs {
487 local_ingestion_path: Some("/individual".into()),
488 ..Default::default()
489 })
490 .with_metrics_args(MetricsArgs {
491 metrics_address: "127.0.0.1:9090".parse().unwrap(),
492 })
493 .with_args(Args {
494 indexer_args: IndexerArgs {
495 first_checkpoint: Some(100),
496 ..Default::default()
497 },
498 client_args: Some(ClientArgs {
499 local_ingestion_path: Some("/bundled".into()),
500 ..Default::default()
501 }),
502 metrics_args: MetricsArgs {
503 metrics_address: "127.0.0.1:8080".parse().unwrap(),
504 },
505 });
506
507 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
508 assert_eq!(
509 builder
510 .args
511 .client_args
512 .unwrap()
513 .local_ingestion_path
514 .unwrap()
515 .to_string_lossy(),
516 "/bundled"
517 );
518 assert_eq!(
519 builder.args.metrics_args.metrics_address.to_string(),
520 "127.0.0.1:8080"
521 );
522 }
523}