sui_indexer_alt_framework/
cluster.rs1use std::ops::Deref;
5use std::ops::DerefMut;
6use std::sync::Arc;
7
8use anyhow::Context;
9use diesel_migrations::EmbeddedMigrations;
10use prometheus::Registry;
11use sui_futures::service::Service;
12use sui_indexer_alt_metrics::MetricsArgs;
13use sui_indexer_alt_metrics::MetricsService;
14use url::Url;
15
16use crate::Indexer;
17use crate::IndexerArgs;
18use crate::ingestion::ClientArgs;
19use crate::ingestion::IngestionConfig;
20use crate::metrics::IndexerMetrics;
21use crate::metrics::IngestionMetrics;
22use crate::postgres::Db;
23use crate::postgres::DbArgs;
24
25#[derive(clap::Parser, Debug, Default)]
29pub struct Args {
30 #[clap(flatten)]
32 pub indexer_args: IndexerArgs,
33
34 #[clap(flatten)]
36 pub client_args: ClientArgs,
37
38 #[clap(flatten)]
40 pub metrics_args: MetricsArgs,
41}
42
43pub struct IndexerCluster {
47 indexer: Indexer<Db>,
48 metrics: MetricsService,
49}
50
51#[derive(Default)]
53pub struct IndexerClusterBuilder {
54 database_url: Option<Url>,
55 db_args: DbArgs,
56 args: Args,
57 ingestion_config: IngestionConfig,
58 migrations: Option<&'static EmbeddedMigrations>,
59 metrics_prefix: Option<String>,
60}
61
62impl IndexerClusterBuilder {
63 pub fn new() -> Self {
65 Self::default()
66 }
67
68 pub fn with_database_url(mut self, url: Url) -> Self {
73 self.database_url = Some(url);
74 self
75 }
76
77 pub fn with_db_args(mut self, args: DbArgs) -> Self {
82 self.db_args = args;
83 self
84 }
85
86 pub fn with_args(mut self, args: Args) -> Self {
95 self.args = args;
96 self
97 }
98
99 pub fn with_indexer_args(mut self, args: IndexerArgs) -> Self {
102 self.args.indexer_args = args;
103 self
104 }
105
106 pub fn with_client_args(mut self, args: ClientArgs) -> Self {
109 self.args.client_args = args;
110 self
111 }
112
113 pub fn with_metrics_args(mut self, args: MetricsArgs) -> Self {
116 self.args.metrics_args = args;
117 self
118 }
119
120 pub fn with_ingestion_config(mut self, config: IngestionConfig) -> Self {
123 self.ingestion_config = config;
124 self
125 }
126
127 pub fn with_migrations(mut self, migrations: &'static EmbeddedMigrations) -> Self {
131 self.migrations = Some(migrations);
132 self
133 }
134
135 pub fn with_metrics_prefix(mut self, label: impl Into<String>) -> Self {
137 self.metrics_prefix = Some(label.into());
138 self
139 }
140
141 pub async fn build(self) -> anyhow::Result<IndexerCluster> {
148 let database_url = self.database_url.context("database_url is required")?;
149
150 tracing_subscriber::fmt::init();
151
152 let registry = Registry::new();
153 let metrics = MetricsService::new(self.args.metrics_args, registry);
154 let client_args = self.args.client_args;
155
156 let indexer = Indexer::new_from_pg(
157 database_url,
158 self.db_args,
159 self.args.indexer_args,
160 client_args,
161 self.ingestion_config,
162 self.migrations,
163 self.metrics_prefix.as_deref(),
164 metrics.registry(),
165 )
166 .await?;
167
168 Ok(IndexerCluster { indexer, metrics })
169 }
170}
171
172impl IndexerCluster {
173 pub fn builder() -> IndexerClusterBuilder {
175 IndexerClusterBuilder::new()
176 }
177
178 pub fn indexer_metrics(&self) -> &Arc<IndexerMetrics> {
181 self.indexer.indexer_metrics()
182 }
183
184 pub fn ingestion_metrics(&self) -> &Arc<IngestionMetrics> {
187 self.indexer.ingestion_metrics()
188 }
189
190 pub async fn run(self) -> anyhow::Result<Service> {
194 let s_indexer = self.indexer.run().await?;
195 let s_metrics = self.metrics.run().await?;
196
197 Ok(s_indexer.attach(s_metrics))
198 }
199}
200
201impl Deref for IndexerCluster {
202 type Target = Indexer<Db>;
203
204 fn deref(&self) -> &Self::Target {
205 &self.indexer
206 }
207}
208
209impl DerefMut for IndexerCluster {
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 &mut self.indexer
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::net::IpAddr;
218 use std::net::Ipv4Addr;
219 use std::net::SocketAddr;
220
221 use async_trait::async_trait;
222 use diesel::Insertable;
223 use diesel::QueryDsl;
224 use diesel::Queryable;
225 use diesel_async::RunQueryDsl;
226 use sui_synthetic_ingestion::synthetic_ingestion;
227 use tempfile::tempdir;
228
229 use crate::FieldCount;
230 use crate::ingestion::ClientArgs;
231 use crate::ingestion::ingestion_client::IngestionClientArgs;
232 use crate::pipeline::Processor;
233 use crate::pipeline::concurrent::ConcurrentConfig;
234 use crate::postgres::Connection;
235 use crate::postgres::Db;
236 use crate::postgres::DbArgs;
237 use crate::postgres::temp::TempDb;
238 use crate::postgres::temp::get_available_port;
239 use crate::types::full_checkpoint_content::Checkpoint;
240
241 use super::*;
242
243 diesel::table! {
244 tx_counts (cp_sequence_number) {
246 cp_sequence_number -> BigInt,
247 count -> BigInt,
248 }
249 }
250
251 #[derive(Insertable, Queryable, FieldCount)]
252 #[diesel(table_name = tx_counts)]
253 struct StoredTxCount {
254 cp_sequence_number: i64,
255 count: i64,
256 }
257
258 struct TxCounts;
260
261 #[async_trait]
262 impl Processor for TxCounts {
263 const NAME: &'static str = "tx_counts";
264 type Value = StoredTxCount;
265
266 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Self::Value>> {
267 Ok(vec![StoredTxCount {
268 cp_sequence_number: checkpoint.summary.sequence_number as i64,
269 count: checkpoint.transactions.len() as i64,
270 }])
271 }
272 }
273
274 #[async_trait]
275 impl crate::postgres::handler::Handler for TxCounts {
276 async fn commit<'a>(
277 values: &[Self::Value],
278 conn: &mut Connection<'a>,
279 ) -> anyhow::Result<usize> {
280 Ok(diesel::insert_into(tx_counts::table)
281 .values(values)
282 .on_conflict_do_nothing()
283 .execute(conn)
284 .await?)
285 }
286 }
287
288 #[tokio::test]
289 async fn test_indexer_cluster() {
290 let db = TempDb::new().expect("Failed to create temporary database");
291 let url = db.database().url();
292
293 let checkpoint_dir = tempdir().unwrap();
295 synthetic_ingestion::generate_ingestion(synthetic_ingestion::Config {
296 ingestion_dir: checkpoint_dir.path().to_owned(),
297 starting_checkpoint: 0,
298 num_checkpoints: 10,
299 checkpoint_size: 2,
300 })
301 .await;
302
303 let reader = Db::for_read(url.clone(), DbArgs::default()).await.unwrap();
304 let writer = Db::for_write(url.clone(), DbArgs::default()).await.unwrap();
305
306 {
307 let mut conn = writer.connect().await.unwrap();
310 diesel::sql_query(
311 r#"
312 CREATE TABLE tx_counts (
313 cp_sequence_number BIGINT PRIMARY KEY,
314 count BIGINT NOT NULL
315 )
316 "#,
317 )
318 .execute(&mut conn)
319 .await
320 .unwrap();
321 }
322
323 let metrics_address =
324 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), get_available_port());
325
326 let args = Args {
327 client_args: ClientArgs {
328 ingestion: IngestionClientArgs {
329 local_ingestion_path: Some(checkpoint_dir.path().to_owned()),
330 ..Default::default()
331 },
332 ..Default::default()
333 },
334 indexer_args: IndexerArgs {
335 first_checkpoint: Some(0),
336 last_checkpoint: Some(9),
337 ..Default::default()
338 },
339 metrics_args: MetricsArgs { metrics_address },
340 };
341
342 let mut indexer = IndexerCluster::builder()
343 .with_database_url(url.clone())
344 .with_args(args)
345 .build()
346 .await
347 .unwrap();
348
349 indexer
350 .concurrent_pipeline(TxCounts, ConcurrentConfig::default())
351 .await
352 .unwrap();
353
354 let ingestion_metrics = indexer.ingestion_metrics().clone();
355 let indexer_metrics = indexer.indexer_metrics().clone();
356
357 indexer.run().await.unwrap().join().await.unwrap();
360
361 {
363 let mut conn = reader.connect().await.unwrap();
364 let counts: Vec<StoredTxCount> = tx_counts::table
365 .order_by(tx_counts::cp_sequence_number)
366 .load(&mut conn)
367 .await
368 .unwrap();
369
370 assert_eq!(counts.len(), 10);
371 for (i, count) in counts.iter().enumerate() {
372 assert_eq!(count.cp_sequence_number, i as i64);
373 assert_eq!(count.count, 3); }
375 }
376
377 assert_eq!(ingestion_metrics.total_ingested_checkpoints.get(), 10);
379 assert_eq!(ingestion_metrics.total_ingested_transactions.get(), 30);
381 assert_eq!(ingestion_metrics.latest_ingested_checkpoint.get(), 9);
382
383 macro_rules! assert_pipeline_metric {
384 ($name:ident, $value:expr) => {
385 assert_eq!(
386 indexer_metrics
387 .$name
388 .get_metric_with_label_values(&["tx_counts"])
389 .unwrap()
390 .get(),
391 $value
392 );
393 };
394 }
395
396 assert_pipeline_metric!(total_handler_checkpoints_received, 10);
397 assert_pipeline_metric!(total_handler_checkpoints_processed, 10);
398 assert_pipeline_metric!(total_handler_rows_created, 10);
399 assert_pipeline_metric!(latest_processed_checkpoint, 9);
400 assert_pipeline_metric!(total_collector_checkpoints_received, 10);
401 assert_pipeline_metric!(total_collector_rows_received, 10);
402 assert_pipeline_metric!(latest_collected_checkpoint, 9);
403
404 assert_pipeline_metric!(watermark_checkpoint, 9);
406 assert_pipeline_metric!(watermark_checkpoint_in_db, 9);
407 assert_pipeline_metric!(watermark_transaction, 30);
409 assert_pipeline_metric!(watermark_transaction_in_db, 30);
410 }
411
412 #[test]
413 fn test_individual_methods_override_bundled_args() {
414 let builder = IndexerClusterBuilder::new()
415 .with_args(Args {
416 indexer_args: IndexerArgs {
417 first_checkpoint: Some(100),
418 ..Default::default()
419 },
420 client_args: ClientArgs {
421 ingestion: IngestionClientArgs {
422 local_ingestion_path: Some("/bundled".into()),
423 ..Default::default()
424 },
425 ..Default::default()
426 },
427 metrics_args: MetricsArgs {
428 metrics_address: "127.0.0.1:8080".parse().unwrap(),
429 },
430 })
431 .with_indexer_args(IndexerArgs {
432 first_checkpoint: Some(200),
433 ..Default::default()
434 })
435 .with_client_args(ClientArgs {
436 ingestion: IngestionClientArgs {
437 local_ingestion_path: Some("/individual".into()),
438 ..Default::default()
439 },
440 ..Default::default()
441 })
442 .with_metrics_args(MetricsArgs {
443 metrics_address: "127.0.0.1:9090".parse().unwrap(),
444 });
445
446 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(200));
447 assert_eq!(
448 builder
449 .args
450 .client_args
451 .ingestion
452 .local_ingestion_path
453 .as_ref()
454 .unwrap()
455 .to_string_lossy(),
456 "/individual"
457 );
458 assert_eq!(
459 builder.args.metrics_args.metrics_address.to_string(),
460 "127.0.0.1:9090"
461 );
462 }
463
464 #[test]
465 fn test_bundled_args_override_individual_methods() {
466 let builder = IndexerClusterBuilder::new()
467 .with_indexer_args(IndexerArgs {
468 first_checkpoint: Some(200),
469 ..Default::default()
470 })
471 .with_client_args(ClientArgs {
472 ingestion: IngestionClientArgs {
473 local_ingestion_path: Some("/individual".into()),
474 ..Default::default()
475 },
476 ..Default::default()
477 })
478 .with_metrics_args(MetricsArgs {
479 metrics_address: "127.0.0.1:9090".parse().unwrap(),
480 })
481 .with_args(Args {
482 indexer_args: IndexerArgs {
483 first_checkpoint: Some(100),
484 ..Default::default()
485 },
486 client_args: ClientArgs {
487 ingestion: IngestionClientArgs {
488 local_ingestion_path: Some("/bundled".into()),
489 ..Default::default()
490 },
491 ..Default::default()
492 },
493 metrics_args: MetricsArgs {
494 metrics_address: "127.0.0.1:8080".parse().unwrap(),
495 },
496 });
497
498 assert_eq!(builder.args.indexer_args.first_checkpoint, Some(100));
499 assert_eq!(
500 builder
501 .args
502 .client_args
503 .ingestion
504 .local_ingestion_path
505 .as_ref()
506 .unwrap()
507 .to_string_lossy(),
508 "/bundled"
509 );
510 assert_eq!(
511 builder.args.metrics_args.metrics_address.to_string(),
512 "127.0.0.1:8080"
513 );
514 }
515}