sui_indexer_alt_framework/
cluster.rs1use std::ops::Deref;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8use anyhow::Context;
9use diesel_migrations::EmbeddedMigrations;
10use prometheus::Registry;
11use sui_futures::service::Service;
12use sui_indexer_alt_metrics::MetricsArgs;
13use sui_indexer_alt_metrics::MetricsService;
14use url::Url;
15
16use crate::Indexer;
17use crate::IndexerArgs;
18use crate::Result;
19use crate::ingestion::ClientArgs;
20use crate::ingestion::IngestionConfig;
21use crate::metrics::IndexerMetrics;
22use crate::metrics::IngestionMetrics;
23use crate::postgres::Db;
24use crate::postgres::DbArgs;
25
26#[derive(clap::Parser, Debug, Default)]
30pub struct Args {
31 #[clap(flatten)]
33 pub indexer_args: IndexerArgs,
34
35 #[clap(flatten)]
37 pub client_args: ClientArgs,
38
39 #[clap(flatten)]
41 pub metrics_args: MetricsArgs,
42}
43
44pub struct IndexerCluster {
48 indexer: Indexer<Db>,
49 metrics: MetricsService,
50}
51
52#[derive(Default)]
54pub struct IndexerClusterBuilder {
55 database_url: Option<Url>,
56 db_args: DbArgs,
57 args: Args,
58 ingestion_config: IngestionConfig,
59 migrations: Option<&'static EmbeddedMigrations>,
60 metrics_prefix: Option<String>,
61}
62
63impl IndexerClusterBuilder {
64 pub fn new() -> Self {
66 Self::default()
67 }
68
69 pub fn with_database_url(mut self, url: Url) -> Self {
74 self.database_url = Some(url);
75 self
76 }
77
78 pub fn with_db_args(mut self, args: DbArgs) -> Self {
83 self.db_args = args;
84 self
85 }
86
87 pub fn with_args(mut self, args: Args) -> Self {
96 self.args = args;
97 self
98 }
99
100 pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
103 self.args.indexer_args = args;
104 self
105 }
106
107 pub fn with_client_args(mut self, args: ClientArgs) -> Self {
110 self.args.client_args = args;
111 self
112 }
113
114 pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
117 self.args.metrics_args = args;
118 self
119 }
120
121 pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
124 self.ingestion_config = config;
125 self
126 }
127
128 pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
132 self.migrations = Some(migrations);
133 self
134 }
135
136 pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
138 self.metrics_prefix = Some(label.into());
139 self
140 }
141
142 pub async fn build(self) -> Result<IndexerCluster> {
149 let database_url = self.database_url.context("database_url is required")?;
150
151 tracing_subscriber::fmt::init();
152
153 let registry = Registry::new();
154 let metrics = MetricsService::new(self.args.metrics_args, registry);
155 let client_args = self.args.client_args;
156
157 let indexer = Indexer::new_from_pg(
158 database_url,
159 self.db_args,
160 self.args.indexer_args,
161 client_args,
162 self.ingestion_config,
163 self.migrations,
164 self.metrics_prefix.as_deref(),
165 metrics.registry(),
166 )
167 .await?;
168
169 Ok(IndexerCluster { indexer, metrics })
170 }
171}
172
173impl IndexerCluster {
174 pub fn builder() -> IndexerClusterBuilder {
176 IndexerClusterBuilder::new()
177 }
178
179 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
182 self.indexer.indexer_metrics()
183 }
184
185 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
188 self.indexer.ingestion_metrics()
189 }
190
191 pub async fn run(self) -> Result<Service> {
195 let s_indexer = self.indexer.run().await?;
196 let s_metrics = self.metrics.run().await?;
197
198 Ok(s_indexer.attach(s_metrics))
199 }
200}
201
202impl Deref for IndexerCluster {
203 type Target = Indexer<Db>;
204
205 fn deref(&self) -> &Self::Target {
206 &self.indexer
207 }
208}
209
210impl DerefMut for IndexerCluster {
211 fn deref_mut(&mut self) -> &mut Self::Target {
212 &mut self.indexer
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use std::net::IpAddr;
219 use std::net::Ipv4Addr;
220 use std::net::SocketAddr;
221
222 use async_trait::async_trait;
223 use diesel::Insertable;
224 use diesel::QueryDsl;
225 use diesel::Queryable;
226 use diesel_async::RunQueryDsl;
227 use sui_synthetic_ingestion::synthetic_ingestion;
228 use tempfile::tempdir;
229
230 use crate::FieldCount;
231 use crate::ingestion::ClientArgs;
232 use crate::ingestion::ingestion_client::IngestionClientArgs;
233 use crate::pipeline::Processor;
234 use crate::pipeline::concurrent::ConcurrentConfig;
235 use crate::postgres::Connection;
236 use crate::postgres::Db;
237 use crate::postgres::DbArgs;
238 use crate::postgres::temp::TempDb;
239 use crate::postgres::temp::get_available_port;
240 use crate::types::full_checkpoint_content::Checkpoint;
241
242 use super::*;
243
244 diesel::table! {
245 tx_counts (cp_sequence_number) {
247 cp_sequence_number -> BigInt,
248 count -> BigInt,
249 }
250 }
251
252 #[derive(Insertable, Queryable, FieldCount)]
253 #[diesel(table_name = tx_counts)]
254 struct StoredTxCount {
255 cp_sequence_number: i64,
256 count: i64,
257 }
258
259 struct TxCounts;
261
262 #[async_trait]
263 impl Processor for TxCounts {
264 const NAME: &'static str = "tx_counts";
265 type Value = StoredTxCount;
266
267 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
268 Ok(vec![StoredTxCount {
269 cp_sequence_number: checkpoint.summary.sequence_number as i64,
270 count: checkpoint.transactions.len() as i64,
271 }])
272 }
273 }
274
275 #[async_trait]
276 impl crate::postgres::handler::Handler for TxCounts {
277 async fn commit<'a>(
278 values: &[Self::Value],
279 conn: &mut Connection<'a>,
280 ) -> anyhow::Result<usize> {
281 Ok(diesel::insert_into(tx_counts::table)
282 .values(values)
283 .on_conflict_do_nothing()
284 .execute(conn)
285 .await?)
286 }
287 }
288
289 #[tokio::test]
290 async fn test_indexer_cluster() {
291 let db = TempDb::new().expect("Failed to create temporary database");
292 let url = db.database().url();
293
294 let checkpoint_dir = tempdir().unwrap();
296 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
297 ingestion_dir: checkpoint_dir.path().to_owned(),
298 starting_checkpoint: 0,
299 num_checkpoints: 10,
300 checkpoint_size: 2,
301 })
302 .await;
303
304 let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
305 let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
306
307 {
308 let mut conn = writer.connect().await.unwrap();
311 diesel::sql_query(
312 r#"
313 CREATE TABLE tx_counts (
314 cp_sequence_number BIGINT PRIMARY KEY,
315 count BIGINT NOT NULL
316 )
317 "#,
318 )
319 .execute(&mut conn)
320 .await
321 .unwrap();
322 }
323
324 let metrics_address =
325 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
326
327 let args = Args {
328 client_args: ClientArgs {
329 ingestion: IngestionClientArgs {
330 local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
331 ..Default::default()
332 },
333 ..Default::default()
334 },
335 indexer_args: IndexerArgs {
336 first_checkpoint: Some(0),
337 last_checkpoint: Some(9),
338 ..Default::default()
339 },
340 metrics_args: MetricsArgs { metrics_address },
341 };
342
343 let mut indexer = IndexerCluster::builder()
344 .with_database_url(url.clone())
345 .with_args(args)
346 .build()
347 .await
348 .unwrap();
349
350 indexer
351 .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
352 .await
353 .unwrap();
354
355 let ingestion_metrics = indexer.ingestion_metrics().clone();
356 let indexer_metrics = indexer.indexer_metrics().clone();
357
358 indexer.run().await.unwrap().join().await.unwrap();
361
362 {
364 let mut conn = reader.connect().await.unwrap();
365 let counts: Vec<StoredTxCount> = tx_counts::table
366 .order_by(tx_counts::cp_sequence_number)
367 .load(&mut conn)
368 .await
369 .unwrap();
370
371 assert_eq!(counts.len(), 10);
372 for (i, count) in counts.iter().enumerate() {
373 assert_eq!(count.cp_sequence_number, i as i64);
374 assert_eq!(count.count, 3); }
376 }
377
378 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
380 assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
382 assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
383
384 macro_rules! assert_pipeline_metric {
385 ($name:ident, $value:expr) => {
386 assert_eq!(
387 indexer_metrics
388 .$name
389 .get_metric_with_label_values(&["tx_counts"])
390 .unwrap()
391 .get(),
392 $value
393 );
394 };
395 }
396
397 assert_pipeline_metric!(total_handler_checkpoints_received, 10);
398 assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
399 assert_pipeline_metric!(total_handler_rows_created, 10);
400 assert_pipeline_metric!(latest_processed_checkpoint, 9);
401 assert_pipeline_metric!(total_collector_checkpoints_received, 10);
402 assert_pipeline_metric!(total_collector_rows_received, 10);
403 assert_pipeline_metric!(latest_collected_checkpoint, 9);
404
405 assert_pipeline_metric!(watermark_checkpoint, 9);
407 assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
408 assert_pipeline_metric!(watermark_transaction, 30);
410 assert_pipeline_metric!(watermark_transaction_in_db, 30);
411 }
412
413 #[test]
414 fn test_individual_methods_override_bundled_args() {
415 let builder = IndexerClusterBuilder::new()
416 .with_args(Args {
417 indexer_args: IndexerArgs {
418 first_checkpoint: Some(100),
419 ..Default::default()
420 },
421 client_args: ClientArgs {
422 ingestion: IngestionClientArgs {
423 local_ingestion_path: Some("/bundled".into()),
424 ..Default::default()
425 },
426 ..Default::default()
427 },
428 metrics_args: MetricsArgs {
429 metrics_address: "127.0.0.1:8080".parse().unwrap(),
430 },
431 })
432 .with_indexer_args(IndexerArgs {
433 first_checkpoint: Some(200),
434 ..Default::default()
435 })
436 .with_client_args(ClientArgs {
437 ingestion: IngestionClientArgs {
438 local_ingestion_path: Some("/individual".into()),
439 ..Default::default()
440 },
441 ..Default::default()
442 })
443 .with_metrics_args(MetricsArgs {
444 metrics_address: "127.0.0.1:9090".parse().unwrap(),
445 });
446
447 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
448 assert_eq!(
449 builder
450 .args
451 .client_args
452 .ingestion
453 .local_ingestion_path
454 .as_ref()
455 .unwrap()
456 .to_string_lossy(),
457 "/individual"
458 );
459 assert_eq!(
460 builder.args.metrics_args.metrics_address.to_string(),
461 "127.0.0.1:9090"
462 );
463 }
464
465 #[test]
466 fn test_bundled_args_override_individual_methods() {
467 let builder = IndexerClusterBuilder::new()
468 .with_indexer_args(IndexerArgs {
469 first_checkpoint: Some(200),
470 ..Default::default()
471 })
472 .with_client_args(ClientArgs {
473 ingestion: IngestionClientArgs {
474 local_ingestion_path: Some("/individual".into()),
475 ..Default::default()
476 },
477 ..Default::default()
478 })
479 .with_metrics_args(MetricsArgs {
480 metrics_address: "127.0.0.1:9090".parse().unwrap(),
481 })
482 .with_args(Args {
483 indexer_args: IndexerArgs {
484 first_checkpoint: Some(100),
485 ..Default::default()
486 },
487 client_args: ClientArgs {
488 ingestion: IngestionClientArgs {
489 local_ingestion_path: Some("/bundled".into()),
490 ..Default::default()
491 },
492 ..Default::default()
493 },
494 metrics_args: MetricsArgs {
495 metrics_address: "127.0.0.1:8080".parse().unwrap(),
496 },
497 });
498
499 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
500 assert_eq!(
501 builder
502 .args
503 .client_args
504 .ingestion
505 .local_ingestion_path
506 .as_ref()
507 .unwrap()
508 .to_string_lossy(),
509 "/bundled"
510 );
511 assert_eq!(
512 builder.args.metrics_args.metrics_address.to_string(),
513 "127.0.0.1:8080"
514 );
515 }
516}