sui_analytics_indexer/
pipeline.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pipeline definitions for the analytics indexer.
5
6use std::sync::Arc;
7
8use anyhow::Result;
9use num_enum::IntoPrimitive;
10use num_enum::TryFromPrimitive;
11use serde::Deserialize;
12use serde::Serialize;
13use strum_macros::EnumIter;
14use sui_indexer_alt_framework::Indexer;
15use sui_indexer_alt_framework::pipeline::Processor;
16use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
17
18use crate::config::PipelineConfig;
19use crate::handlers::AnalyticsHandler;
20use crate::handlers::Row;
21use crate::handlers::tables::CheckpointProcessor;
22use crate::handlers::tables::DynamicFieldProcessor;
23use crate::handlers::tables::EventProcessor;
24use crate::handlers::tables::MoveCallProcessor;
25use crate::handlers::tables::ObjectProcessor;
26use crate::handlers::tables::PackageBCSProcessor;
27use crate::handlers::tables::PackageProcessor;
28use crate::handlers::tables::TransactionBCSProcessor;
29use crate::handlers::tables::TransactionObjectsProcessor;
30use crate::handlers::tables::TransactionProcessor;
31use crate::handlers::tables::WrappedObjectProcessor;
32use crate::metrics::Metrics;
33use crate::package_store::PackageCache;
34use crate::store::AnalyticsStore;
35
36/// Register a sequential pipeline with the analytics handler.
37async fn register_sequential_pipeline<P, T>(
38    indexer: &mut Indexer<AnalyticsStore>,
39    processor: P,
40    sequential_config: SequentialConfig,
41) -> Result<()>
42where
43    P: Processor<Value = T> + Send + Sync,
44    T: Row + 'static,
45{
46    indexer.store().register_schema::<P, T>();
47
48    let handler = AnalyticsHandler::new(processor);
49    indexer
50        .sequential_pipeline(handler, sequential_config)
51        .await?;
52    Ok(())
53}
54
55/// Available analytics pipelines.
56#[derive(
57    Copy,
58    Clone,
59    Debug,
60    Eq,
61    PartialEq,
62    strum_macros::Display,
63    Serialize,
64    Deserialize,
65    TryFromPrimitive,
66    IntoPrimitive,
67    EnumIter,
68)]
69#[repr(u8)]
70pub enum Pipeline {
71    Checkpoint = 0,
72    Object,
73    Transaction,
74    TransactionBCS,
75    TransactionObjects,
76    Event,
77    MoveCall,
78    MovePackage,
79    MovePackageBCS,
80    DynamicField,
81    WrappedObject,
82}
83
84impl Pipeline {
85    /// Returns the pipeline name used for watermarks and metrics.
86    /// This must match the corresponding `Processor::NAME` constant.
87    /// Names match the enum variant names (PascalCase).
88    pub const fn name(&self) -> &'static str {
89        match self {
90            Pipeline::Checkpoint => "Checkpoint",
91            Pipeline::Transaction => "Transaction",
92            Pipeline::TransactionBCS => "TransactionBCS",
93            Pipeline::TransactionObjects => "TransactionObjects",
94            Pipeline::Object => "Object",
95            Pipeline::Event => "Event",
96            Pipeline::MoveCall => "MoveCall",
97            Pipeline::MovePackage => "MovePackage",
98            Pipeline::MovePackageBCS => "MovePackageBCS",
99            Pipeline::DynamicField => "DynamicField",
100            Pipeline::WrappedObject => "WrappedObject",
101        }
102    }
103
104    /// Returns the default output path for this pipeline in the object store.
105    /// Used when `output_prefix` is not configured. Uses snake_case for
106    /// backwards compatibility with existing data.
107    pub const fn default_path(&self) -> &'static str {
108        match self {
109            Pipeline::Checkpoint => "checkpoints",
110            Pipeline::Transaction => "transactions",
111            Pipeline::TransactionBCS => "transaction_bcs",
112            Pipeline::TransactionObjects => "transaction_objects",
113            Pipeline::Object => "objects",
114            Pipeline::Event => "events",
115            Pipeline::MoveCall => "move_call",
116            Pipeline::MovePackage => "move_package",
117            Pipeline::MovePackageBCS => "move_package_bcs",
118            Pipeline::DynamicField => "dynamic_field",
119            Pipeline::WrappedObject => "wrapped_object",
120        }
121    }
122
123    /// Registers this pipeline with the indexer.
124    pub async fn register(
125        &self,
126        indexer: &mut Indexer<AnalyticsStore>,
127        pipeline_config: &PipelineConfig,
128        package_cache: Arc<PackageCache>,
129        metrics: Metrics,
130        sequential_config: SequentialConfig,
131    ) -> Result<()> {
132        match self {
133            Pipeline::Checkpoint => {
134                register_sequential_pipeline(indexer, CheckpointProcessor, sequential_config).await
135            }
136            Pipeline::Transaction => {
137                register_sequential_pipeline(indexer, TransactionProcessor, sequential_config).await
138            }
139            Pipeline::TransactionBCS => {
140                register_sequential_pipeline(indexer, TransactionBCSProcessor, sequential_config)
141                    .await
142            }
143            Pipeline::Event => {
144                register_sequential_pipeline(
145                    indexer,
146                    EventProcessor::new(package_cache.clone()),
147                    sequential_config,
148                )
149                .await
150            }
151            Pipeline::MoveCall => {
152                register_sequential_pipeline(indexer, MoveCallProcessor, sequential_config).await
153            }
154            Pipeline::Object => {
155                register_sequential_pipeline(
156                    indexer,
157                    ObjectProcessor::new(
158                        package_cache.clone(),
159                        &pipeline_config.package_id_filter,
160                        metrics,
161                    ),
162                    sequential_config,
163                )
164                .await
165            }
166            Pipeline::DynamicField => {
167                register_sequential_pipeline(
168                    indexer,
169                    DynamicFieldProcessor::new(package_cache.clone()),
170                    sequential_config,
171                )
172                .await
173            }
174            Pipeline::TransactionObjects => {
175                register_sequential_pipeline(
176                    indexer,
177                    TransactionObjectsProcessor,
178                    sequential_config,
179                )
180                .await
181            }
182            Pipeline::MovePackage => {
183                register_sequential_pipeline(indexer, PackageProcessor, sequential_config).await
184            }
185            Pipeline::MovePackageBCS => {
186                register_sequential_pipeline(indexer, PackageBCSProcessor, sequential_config).await
187            }
188            Pipeline::WrappedObject => {
189                register_sequential_pipeline(
190                    indexer,
191                    WrappedObjectProcessor::new(package_cache.clone()),
192                    sequential_config,
193                )
194                .await
195            }
196        }
197    }
198}