1use std::sync::Arc;
7
8use anyhow::Result;
9use num_enum::IntoPrimitive;
10use num_enum::TryFromPrimitive;
11use serde::Deserialize;
12use serde::Serialize;
13use strum_macros::EnumIter;
14use sui_indexer_alt_framework::Indexer;
15use sui_indexer_alt_framework::pipeline::Processor;
16use sui_indexer_alt_framework::pipeline::sequential::SequentialConfig;
17
18use crate::config::PipelineConfig;
19use crate::handlers::AnalyticsHandler;
20use crate::handlers::Row;
21use crate::handlers::tables::CheckpointProcessor;
22use crate::handlers::tables::DynamicFieldProcessor;
23use crate::handlers::tables::EventProcessor;
24use crate::handlers::tables::MoveCallProcessor;
25use crate::handlers::tables::ObjectProcessor;
26use crate::handlers::tables::PackageBCSProcessor;
27use crate::handlers::tables::PackageProcessor;
28use crate::handlers::tables::TransactionBCSProcessor;
29use crate::handlers::tables::TransactionObjectsProcessor;
30use crate::handlers::tables::TransactionProcessor;
31use crate::handlers::tables::WrappedObjectProcessor;
32use crate::metrics::Metrics;
33use crate::package_store::PackageCache;
34use crate::store::AnalyticsStore;
35
36async fn register_sequential_pipeline<P, T>(
38 indexer: &mut Indexer<AnalyticsStore>,
39 processor: P,
40 sequential_config: SequentialConfig,
41) -> Result<()>
42where
43 P: Processor<Value = T> + Send + Sync,
44 T: Row + 'static,
45{
46 indexer.store().register_schema::<P, T>();
47
48 let handler = AnalyticsHandler::new(processor);
49 indexer
50 .sequential_pipeline(handler, sequential_config)
51 .await?;
52 Ok(())
53}
54
55#[derive(
57 Copy,
58 Clone,
59 Debug,
60 Eq,
61 PartialEq,
62 strum_macros::Display,
63 Serialize,
64 Deserialize,
65 TryFromPrimitive,
66 IntoPrimitive,
67 EnumIter,
68)]
69#[repr(u8)]
70pub enum Pipeline {
71 Checkpoint = 0,
72 Object,
73 Transaction,
74 TransactionBCS,
75 TransactionObjects,
76 Event,
77 MoveCall,
78 MovePackage,
79 MovePackageBCS,
80 DynamicField,
81 WrappedObject,
82}
83
84impl Pipeline {
85 pub const fn name(&self) -> &'static str {
89 match self {
90 Pipeline::Checkpoint => "Checkpoint",
91 Pipeline::Transaction => "Transaction",
92 Pipeline::TransactionBCS => "TransactionBCS",
93 Pipeline::TransactionObjects => "TransactionObjects",
94 Pipeline::Object => "Object",
95 Pipeline::Event => "Event",
96 Pipeline::MoveCall => "MoveCall",
97 Pipeline::MovePackage => "MovePackage",
98 Pipeline::MovePackageBCS => "MovePackageBCS",
99 Pipeline::DynamicField => "DynamicField",
100 Pipeline::WrappedObject => "WrappedObject",
101 }
102 }
103
104 pub const fn default_path(&self) -> &'static str {
108 match self {
109 Pipeline::Checkpoint => "checkpoints",
110 Pipeline::Transaction => "transactions",
111 Pipeline::TransactionBCS => "transaction_bcs",
112 Pipeline::TransactionObjects => "transaction_objects",
113 Pipeline::Object => "objects",
114 Pipeline::Event => "events",
115 Pipeline::MoveCall => "move_call",
116 Pipeline::MovePackage => "move_package",
117 Pipeline::MovePackageBCS => "move_package_bcs",
118 Pipeline::DynamicField => "dynamic_field",
119 Pipeline::WrappedObject => "wrapped_object",
120 }
121 }
122
123 pub async fn register(
125 &self,
126 indexer: &mut Indexer<AnalyticsStore>,
127 pipeline_config: &PipelineConfig,
128 package_cache: Arc<PackageCache>,
129 metrics: Metrics,
130 sequential_config: SequentialConfig,
131 ) -> Result<()> {
132 match self {
133 Pipeline::Checkpoint => {
134 register_sequential_pipeline(indexer, CheckpointProcessor, sequential_config).await
135 }
136 Pipeline::Transaction => {
137 register_sequential_pipeline(indexer, TransactionProcessor, sequential_config).await
138 }
139 Pipeline::TransactionBCS => {
140 register_sequential_pipeline(indexer, TransactionBCSProcessor, sequential_config)
141 .await
142 }
143 Pipeline::Event => {
144 register_sequential_pipeline(
145 indexer,
146 EventProcessor::new(package_cache.clone()),
147 sequential_config,
148 )
149 .await
150 }
151 Pipeline::MoveCall => {
152 register_sequential_pipeline(indexer, MoveCallProcessor, sequential_config).await
153 }
154 Pipeline::Object => {
155 register_sequential_pipeline(
156 indexer,
157 ObjectProcessor::new(
158 package_cache.clone(),
159 &pipeline_config.package_id_filter,
160 metrics,
161 ),
162 sequential_config,
163 )
164 .await
165 }
166 Pipeline::DynamicField => {
167 register_sequential_pipeline(
168 indexer,
169 DynamicFieldProcessor::new(package_cache.clone()),
170 sequential_config,
171 )
172 .await
173 }
174 Pipeline::TransactionObjects => {
175 register_sequential_pipeline(
176 indexer,
177 TransactionObjectsProcessor,
178 sequential_config,
179 )
180 .await
181 }
182 Pipeline::MovePackage => {
183 register_sequential_pipeline(indexer, PackageProcessor, sequential_config).await
184 }
185 Pipeline::MovePackageBCS => {
186 register_sequential_pipeline(indexer, PackageBCSProcessor, sequential_config).await
187 }
188 Pipeline::WrappedObject => {
189 register_sequential_pipeline(
190 indexer,
191 WrappedObjectProcessor::new(package_cache.clone()),
192 sequential_config,
193 )
194 .await
195 }
196 }
197 }
198}