sui_analytics_indexer/store/
live.rs1use std::sync::Arc;
7use std::time::Duration;
8
9use object_store::ObjectStore;
10use object_store::ObjectStoreExt as _;
11use object_store::path::Path as ObjectPath;
12use sui_indexer_alt_framework_store_traits::CommitterWatermark;
13use sui_storage::object_store::util::find_all_dirs_with_epoch_prefix;
14use sui_storage::object_store::util::find_all_files_with_epoch_prefix;
15use tracing::info;
16
17use crate::config::BatchSizeConfig;
18use crate::config::PipelineConfig;
19use crate::handlers::CheckpointRows;
20use crate::store::Batch;
21
22#[derive(Clone)]
27pub struct LiveStore {
28 object_store: Arc<dyn ObjectStore>,
29}
30
31impl LiveStore {
32 pub fn new(object_store: Arc<dyn ObjectStore>) -> Self {
34 Self { object_store }
35 }
36
37 pub(crate) async fn committer_watermark(
44 &self,
45 pipeline: &str,
46 ) -> anyhow::Result<Option<CommitterWatermark>> {
47 let prefix = ObjectPath::from(pipeline);
48 let epoch_dirs = find_all_dirs_with_epoch_prefix(&self.object_store, Some(&prefix)).await?;
49
50 let Some((&epoch, epoch_path)) = epoch_dirs.last_key_value() else {
52 return Ok(None); };
54
55 let checkpoint_ranges =
57 find_all_files_with_epoch_prefix(&self.object_store, Some(epoch_path)).await?;
58
59 let checkpoint_hi = checkpoint_ranges.iter().map(|r| r.end).max().unwrap_or(0);
61
62 if checkpoint_hi == 0 {
64 return Ok(None);
65 }
66
67 info!(
68 pipeline,
69 epoch,
70 checkpoint = checkpoint_hi - 1,
71 "Determined watermark from bucket iteration"
72 );
73
74 Ok(Some(CommitterWatermark {
75 epoch_hi_inclusive: epoch,
76 checkpoint_hi_inclusive: checkpoint_hi - 1, tx_hi: 0, timestamp_ms_hi_inclusive: 0, }))
80 }
81
82 pub(crate) async fn write_to_object_store(
84 &self,
85 path: &ObjectPath,
86 payload: object_store::PutPayload,
87 ) -> anyhow::Result<()> {
88 self.object_store.put(path, payload).await?;
89 Ok(())
90 }
91
92 pub(crate) fn split_framework_batch_into_files(
99 &self,
100 pipeline_config: &PipelineConfig,
101 rows_by_checkpoint: &[CheckpointRows],
102 mut pending_batch: Batch,
103 ) -> (Batch, Vec<Batch>) {
104 let batch_size = pipeline_config
105 .batch_size
106 .as_ref()
107 .expect("batch_size not configured for pipeline");
108
109 let max_duration = Duration::from_secs(pipeline_config.force_batch_cut_after_secs);
110
111 let mut complete_batches: Vec<Batch> = Vec::new();
112
113 for checkpoint_rows in rows_by_checkpoint {
114 if pending_batch
116 .epoch()
117 .is_some_and(|e| e != checkpoint_rows.epoch)
118 {
119 assert!(
120 !pending_batch.checkpoints_rows.is_empty(),
121 "invalid state: epoch set but rows empty"
122 );
123 complete_batches.push(pending_batch);
124 pending_batch = Batch::default();
125 }
126
127 match *batch_size {
128 BatchSizeConfig::Rows(n) => {
129 if pending_batch.row_count() >= n {
131 complete_batches.push(pending_batch);
132 pending_batch = Batch::default();
133 }
134 pending_batch.add(checkpoint_rows.clone());
135 }
136 BatchSizeConfig::Checkpoints(n) => {
137 pending_batch.add(checkpoint_rows.clone());
138 if pending_batch.checkpoint_count() == n {
140 complete_batches.push(pending_batch);
141 pending_batch = Batch::default();
142 }
143 }
144 }
145
146 if pending_batch.checkpoint_count() > 0 && pending_batch.elapsed() >= max_duration {
148 complete_batches.push(pending_batch);
149 pending_batch = Batch::default();
150 }
151 }
152
153 (pending_batch, complete_batches)
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use bytes::Bytes;
161 use object_store::PutPayload;
162 use object_store::memory::InMemory;
163
164 use crate::config::IndexerConfig;
165 use crate::config::OutputStoreConfig;
166 use crate::config::PipelineConfig;
167 use crate::metrics::Metrics;
168 use crate::pipeline::Pipeline;
169 use crate::store::AnalyticsStore;
170 use sui_indexer_alt_framework::store::Connection;
171 use sui_indexer_alt_framework::store::Store;
172
173 fn test_metrics() -> Metrics {
174 Metrics::new(&prometheus::Registry::new())
175 }
176
177 fn test_config(object_store: Arc<dyn object_store::ObjectStore>) -> IndexerConfig {
178 IndexerConfig {
179 output_store: OutputStoreConfig::Custom(object_store),
180 work_dir: None,
181 sf_account_identifier: None,
182 sf_warehouse: None,
183 sf_database: None,
184 sf_schema: None,
185 sf_username: None,
186 sf_role: None,
187 sf_password_file: None,
188 migration_id: None,
189 file_format: crate::config::FileFormat::Parquet,
190 pipeline_configs: vec![PipelineConfig {
191 pipeline: Pipeline::Checkpoint,
192 file_format: crate::config::FileFormat::Parquet,
193 package_id_filter: None,
194 sf_table_id: None,
195 sf_checkpoint_col_id: None,
196 report_sf_max_table_checkpoint: false,
197 batch_size: None,
198 output_prefix: Some("test_pipeline".to_string()),
199 force_batch_cut_after_secs: 600,
200 sequential: Default::default(),
201 }],
202 ingestion: Default::default(),
203 committer: Default::default(),
204 max_pending_uploads: 10,
205 max_concurrent_serialization: 3,
206 watermark_update_interval_secs: 60,
207 }
208 }
209
210 async fn create_test_file(
211 store: &Arc<dyn object_store::ObjectStore>,
212 pipeline: &str,
213 epoch: u64,
214 start: u64,
215 end: u64,
216 ) {
217 let path = ObjectPath::from(format!(
218 "{}/epoch_{}/{}_{}.parquet",
219 pipeline, epoch, start, end
220 ));
221 let payload: PutPayload = Bytes::from("test data").into();
222 store.put(&path, payload).await.unwrap();
223 }
224
225 #[tokio::test]
226 async fn test_committer_watermark_multiple_epochs() {
227 let object_store: Arc<dyn object_store::ObjectStore> = Arc::new(InMemory::new());
228 create_test_file(&object_store, "test_pipeline", 0, 0, 100).await;
230 create_test_file(&object_store, "test_pipeline", 0, 100, 200).await;
231 create_test_file(&object_store, "test_pipeline", 1, 200, 300).await;
233 create_test_file(&object_store, "test_pipeline", 1, 300, 400).await;
234
235 let config = test_config(object_store.clone());
236 let store = AnalyticsStore::new(object_store, config, test_metrics());
237 let mut conn = store.connect().await.unwrap();
238
239 let watermark = conn.committer_watermark("Checkpoint").await.unwrap();
241 assert!(watermark.is_some());
242 let watermark = watermark.unwrap();
243 assert_eq!(watermark.epoch_hi_inclusive, 1);
245 assert_eq!(watermark.checkpoint_hi_inclusive, 399); }
247}