1use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22use std::sync::RwLock;
23use std::time::Duration;
24use std::time::Instant;
25
26use anyhow::Result;
27use anyhow::bail;
28use async_trait::async_trait;
29use object_store::PutPayload;
30use object_store::path::Path as ObjectPath;
31use scoped_futures::ScopedBoxFuture;
32use sui_indexer_alt_framework::pipeline::Processor;
33use sui_indexer_alt_framework::store::Connection;
34use sui_indexer_alt_framework::store::Store;
35use sui_indexer_alt_framework::store::TransactionalStore;
36use sui_indexer_alt_framework_store_traits::CommitterWatermark;
37use sui_indexer_alt_framework_store_traits::InitWatermark;
38use sui_indexer_alt_framework_store_traits::PrunerWatermark;
39use sui_indexer_alt_framework_store_traits::ReaderWatermark;
40use sui_indexer_alt_framework_store_traits::init_with_committer_watermark;
41use sui_types::base_types::EpochId;
42use tokio::sync::mpsc;
43use tracing::debug;
44use tracing::info;
45use tracing::warn;
46
47use crate::config::FileFormat;
48use crate::config::IndexerConfig;
49use crate::handlers::CheckpointRows;
50use crate::metrics::Metrics;
51use crate::schema::RowSchema;
52
53#[derive(Clone)]
55pub struct Batch {
56 pub(crate) checkpoints_rows: Vec<CheckpointRows>,
57 row_count: usize,
58 created_at: Instant,
60 pub(crate) explicit_range: Option<Range<u64>>,
63}
64
65impl Default for Batch {
66 fn default() -> Self {
67 Self {
68 checkpoints_rows: Vec::new(),
69 row_count: 0,
70 created_at: Instant::now(),
71 explicit_range: None,
72 }
73 }
74}
75
76impl Batch {
77 pub(crate) fn first_checkpoint(&self) -> Option<u64> {
78 self.checkpoints_rows.first().map(|c| c.checkpoint)
79 }
80
81 pub(crate) fn last_checkpoint(&self) -> Option<u64> {
82 self.checkpoints_rows.last().map(|c| c.checkpoint)
83 }
84
85 pub(crate) fn epoch(&self) -> Option<EpochId> {
86 self.checkpoints_rows.last().map(|c| c.epoch)
87 }
88
89 pub(crate) fn row_count(&self) -> usize {
90 self.row_count
91 }
92
93 pub(crate) fn checkpoint_count(&self) -> usize {
94 self.checkpoints_rows.len()
95 }
96
97 pub(crate) fn checkpoint_range(&self) -> Option<Range<u64>> {
100 self.explicit_range.clone().or_else(|| {
101 match (self.first_checkpoint(), self.last_checkpoint()) {
102 (Some(first), Some(last)) => Some(first..last + 1),
103 _ => None,
104 }
105 })
106 }
107
108 pub(crate) fn add(&mut self, checkpoint_rows: CheckpointRows) {
109 self.row_count += checkpoint_rows.len();
110 self.checkpoints_rows.push(checkpoint_rows);
111 }
112
113 pub(crate) fn elapsed(&self) -> Duration {
115 self.created_at.elapsed()
116 }
117}
118
119mod live;
120mod migration;
121mod uploader;
122
123pub use live::LiveStore;
124pub use migration::FileRangeEntry;
125pub use migration::FileRangeIndex;
126pub use migration::MigrationStore;
127pub use migration::WatermarkUpdateError;
128use uploader::PendingFileUpload;
129
130#[derive(Clone)]
132pub enum StoreMode {
133 Live(LiveStore),
134 Migration(MigrationStore),
135}
136
137use crate::config::PipelineConfig;
138
139#[derive(Clone)]
141pub struct AnalyticsStore {
142 mode: StoreMode,
143 pending_by_pipeline: Arc<RwLock<HashMap<String, Batch>>>,
145 metrics: Metrics,
147 uploader_senders: Arc<RwLock<HashMap<String, mpsc::Sender<PendingFileUpload>>>>,
149 worker_handles: Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>,
151 config: IndexerConfig,
153 schemas_by_pipeline: Arc<RwLock<HashMap<String, &'static [&'static str]>>>,
155}
156
157pub struct AnalyticsConnection<'a> {
161 store: &'a AnalyticsStore,
162 watermark: Option<CommitterWatermark>,
165}
166
167impl StoreMode {
168 pub(crate) fn split_framework_batch_into_files(
174 &self,
175 pipeline_config: &PipelineConfig,
176 batch_from_framework: &[CheckpointRows],
177 pending_batch: Batch,
178 watermark: &CommitterWatermark,
179 ) -> (Batch, Vec<Batch>) {
180 match self {
181 StoreMode::Live(store) => store.split_framework_batch_into_files(
182 pipeline_config,
183 batch_from_framework,
184 pending_batch,
185 ),
186 StoreMode::Migration(store) => store.split_framework_batch_into_files(
187 pipeline_config,
188 batch_from_framework,
189 pending_batch,
190 watermark,
191 ),
192 }
193 }
194
195 pub(crate) async fn write_to_object_store(
201 &self,
202 pipeline: &str,
203 path: &ObjectPath,
204 checkpoint_range: &Range<u64>,
205 payload: PutPayload,
206 ) -> anyhow::Result<()> {
207 match self {
208 StoreMode::Live(store) => store.write_to_object_store(path, payload).await,
209 StoreMode::Migration(store) => {
210 store
211 .write_to_object_store(pipeline, path, checkpoint_range, payload)
212 .await
213 }
214 }
215 }
216
217 pub(crate) async fn update_watermark_after_upload(
222 &self,
223 pipeline: &str,
224 epoch: u64,
225 checkpoint_hi_inclusive: u64,
226 ) -> Result<(), WatermarkUpdateError> {
227 match self {
228 StoreMode::Live(_) => Ok(()),
229 StoreMode::Migration(store) => {
230 store
231 .update_watermark(pipeline, epoch, checkpoint_hi_inclusive)
232 .await
233 }
234 }
235 }
236
237 pub(crate) fn spawn_uploader(
241 &self,
242 pipeline_name: String,
243 output_prefix: String,
244 metrics: Metrics,
245 config: &IndexerConfig,
246 ) -> (mpsc::Sender<PendingFileUpload>, tokio::task::JoinHandle<()>) {
247 uploader::spawn_uploader(pipeline_name, output_prefix, self.clone(), metrics, config)
248 }
249}
250
251impl AnalyticsStore {
252 pub fn new(
258 object_store: Arc<dyn object_store::ObjectStore>,
259 config: IndexerConfig,
260 metrics: Metrics,
261 ) -> Self {
262 let mode = if let Some(ref migration_id) = config.migration_id {
263 info!(migration_id, "Enabling migration mode");
264 StoreMode::Migration(MigrationStore::new(object_store, migration_id.clone()))
265 } else {
266 StoreMode::Live(LiveStore::new(object_store))
267 };
268 Self {
269 mode,
270 pending_by_pipeline: Arc::new(RwLock::new(HashMap::new())),
271 metrics,
272 uploader_senders: Arc::new(RwLock::new(HashMap::new())),
273 worker_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
274 config,
275 schemas_by_pipeline: Arc::new(RwLock::new(HashMap::new())),
276 }
277 }
278
279 pub async fn find_checkpoint_range(
290 &self,
291 first_checkpoint: Option<u64>,
292 last_checkpoint: Option<u64>,
293 ) -> Result<(Option<u64>, Option<u64>)> {
294 match &self.mode {
295 StoreMode::Live(_) => Ok((first_checkpoint, last_checkpoint)),
296 StoreMode::Migration(store) => {
297 let pipelines: Vec<_> = self
301 .config
302 .pipeline_configs()
303 .iter()
304 .map(|p| (p.pipeline.name(), p.output_prefix()))
305 .collect();
306 store
307 .find_checkpoint_range(
308 pipelines.iter().map(|(name, prefix)| (*name, *prefix)),
309 first_checkpoint,
310 last_checkpoint,
311 )
312 .await
313 }
314 }
315 }
316
317 pub fn register_schema<P: Processor, T: RowSchema>(&self) {
319 self.schemas_by_pipeline
320 .write()
321 .unwrap()
322 .insert(P::NAME.to_string(), T::schema());
323 }
324
325 fn get_schema(&self, pipeline: &str) -> Option<&'static [&'static str]> {
327 self.schemas_by_pipeline
328 .read()
329 .unwrap()
330 .get(pipeline)
331 .copied()
332 }
333
334 fn get_or_create_uploader(&self, pipeline: &str) -> mpsc::Sender<PendingFileUpload> {
338 {
340 let uploaders = self.uploader_senders.read().unwrap();
341 if let Some(tx) = uploaders.get(pipeline) {
342 return tx.clone();
343 }
344 }
345
346 let mut uploaders = self.uploader_senders.write().unwrap();
348 if let Some(tx) = uploaders.get(pipeline) {
350 return tx.clone();
351 }
352
353 let output_prefix = self
354 .config
355 .get_pipeline_config(pipeline)
356 .expect("Pipeline not configured")
357 .output_prefix();
358
359 let (tx, handle) = self.mode.spawn_uploader(
360 pipeline.to_string(),
361 output_prefix.to_string(),
362 self.metrics.clone(),
363 &self.config,
364 );
365 uploaders.insert(pipeline.to_string(), tx.clone());
366
367 let handles = self.worker_handles.clone();
370 tokio::spawn(async move {
371 handles.lock().await.push(handle);
372 });
373
374 tx
375 }
376
377 async fn flush_pending_batches(&self) {
382 let pending: HashMap<String, Batch> = {
384 let mut pending_map = self.pending_by_pipeline.write().unwrap();
385 std::mem::take(&mut *pending_map)
386 };
387
388 match &self.mode {
389 StoreMode::Live(_) => {
390 for (pipeline_name, batch) in pending {
391 if batch.checkpoint_count() == 0 {
392 continue;
393 }
394
395 let pipeline_config = self
396 .config
397 .get_pipeline_config(&pipeline_name)
398 .expect("Pipeline config must exist for pending batch");
399
400 let schema = self
401 .get_schema(&pipeline_name)
402 .expect("Schema must be registered for pending batch");
403
404 info!(
405 pipeline = %pipeline_name,
406 checkpoints = batch.checkpoint_count(),
407 rows = batch.row_count(),
408 "Flushing pending batch on shutdown"
409 );
410
411 let pending_upload = PendingFileUpload {
412 epoch: batch.epoch().unwrap(),
413 checkpoint_range: batch.checkpoint_range().unwrap(),
414 file_format: pipeline_config.file_format,
415 checkpoints_rows: batch.checkpoints_rows,
416 schema,
417 };
418
419 let tx = self.get_or_create_uploader(&pipeline_name);
420 if tx.send(pending_upload).await.is_err() {
421 warn!(pipeline = %pipeline_name, "Failed to send final batch to uploader");
422 }
423 }
424 }
425 StoreMode::Migration(_) => {
426 }
429 }
430 }
431
432 pub async fn shutdown(&self) {
434 self.flush_pending_batches().await;
436
437 self.uploader_senders.write().unwrap().clear();
439
440 let mut handles = self.worker_handles.lock().await;
442 for handle in handles.drain(..) {
443 let _ = handle.await;
444 }
445 }
446}
447
448impl<'a> AnalyticsConnection<'a> {
449 pub fn mode(&self) -> &StoreMode {
451 &self.store.mode
452 }
453
454 pub fn get_pending_batch(&self, pipeline: &str) -> Batch {
457 self.store
458 .pending_by_pipeline
459 .read()
460 .unwrap()
461 .get(pipeline)
462 .cloned()
463 .unwrap_or_default()
464 }
465
466 pub fn set_pending_batch(&self, pipeline: &str, rows: Batch) {
468 self.store
469 .pending_by_pipeline
470 .write()
471 .unwrap()
472 .insert(pipeline.to_string(), rows);
473 }
474
475 fn pipeline_config(&self, pipeline: &str) -> &PipelineConfig {
477 self.store
478 .config
479 .get_pipeline_config(pipeline)
480 .unwrap_or_else(|| panic!("Pipeline '{}' not configured", pipeline))
481 }
482
483 pub async fn write_to_object_store(
487 &self,
488 pipeline: &str,
489 epoch: EpochId,
490 checkpoint_range: Range<u64>,
491 file_format: FileFormat,
492 payload: PutPayload,
493 ) -> anyhow::Result<()> {
494 let path = construct_object_store_path(pipeline, epoch, &checkpoint_range, file_format);
495 self.store
496 .mode
497 .write_to_object_store(pipeline, &path, &checkpoint_range, payload)
498 .await
499 }
500
501 pub async fn commit_batch<P: Processor>(
537 &mut self,
538 batch_from_framework: &[CheckpointRows],
539 ) -> Result<usize> {
540 let pipeline = P::NAME;
541 let pipeline_config = self.pipeline_config(pipeline);
542
543 let (pending_batch, complete_batches) = {
546 let pending_batch = self.get_pending_batch(pipeline);
547 self.store.mode.split_framework_batch_into_files(
548 pipeline_config,
549 batch_from_framework,
550 pending_batch,
551 self.watermark
552 .as_ref()
553 .expect("watermark should be set on connection."),
554 )
555 };
556
557 debug!(
558 pipeline = pipeline,
559 files_to_upload = complete_batches.len(),
560 pending_checkpoints = pending_batch.checkpoint_count(),
561 "Commit starting"
562 );
563
564 let tx = self.store.get_or_create_uploader(pipeline);
566
567 let mut total_rows = 0;
568 for batch in complete_batches {
569 total_rows += batch.row_count();
570
571 let pending_upload = PendingFileUpload {
572 epoch: batch.epoch().unwrap(),
573 checkpoint_range: batch.checkpoint_range().unwrap(),
574 file_format: pipeline_config.file_format,
575 checkpoints_rows: batch.checkpoints_rows,
576 schema: self
577 .store
578 .get_schema(pipeline)
579 .unwrap_or_else(|| panic!("Schema not registered for pipeline: {}", pipeline)),
580 };
581
582 tx.send(pending_upload)
584 .await
585 .unwrap_or_else(|e| panic!("Upload channel closed: {}", e));
586 }
587
588 self.set_pending_batch(pipeline, pending_batch);
589
590 debug!(
591 pipeline = pipeline,
592 total_rows = total_rows,
593 "Commit complete, files queued for upload"
594 );
595
596 Ok(total_rows)
597 }
598}
599
600#[async_trait]
601impl Store for AnalyticsStore {
602 type Connection<'c> = AnalyticsConnection<'c>;
603
604 async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
605 Ok(AnalyticsConnection {
606 store: self,
607 watermark: None,
608 })
609 }
610}
611
612#[async_trait]
613impl TransactionalStore for AnalyticsStore {
614 async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
615 where
616 R: Send + 'a,
617 F: Send + 'a,
618 F: for<'r> FnOnce(
619 &'r mut Self::Connection<'_>,
620 ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
621 {
622 let mut conn = self.connect().await?;
623 f(&mut conn).await
624 }
625}
626
627#[async_trait]
628impl Connection for AnalyticsConnection<'_> {
629 async fn init_watermark(
634 &mut self,
635 pipeline_task: &str,
636 init_watermark: InitWatermark,
637 ) -> anyhow::Result<InitWatermark> {
638 init_with_committer_watermark(self, pipeline_task, init_watermark).await
639 }
640
641 async fn committer_watermark(
646 &mut self,
647 pipeline: &str,
648 ) -> anyhow::Result<Option<CommitterWatermark>> {
649 let output_prefix = self.pipeline_config(pipeline).output_prefix().to_string();
650 match &self.store.mode {
651 StoreMode::Live(store) => store.committer_watermark(&output_prefix).await,
652 StoreMode::Migration(store) => store.committer_watermark(&output_prefix).await,
653 }
654 }
655
656 async fn reader_watermark(
657 &mut self,
658 _pipeline: &'static str,
659 ) -> anyhow::Result<Option<ReaderWatermark>> {
660 Ok(None)
662 }
663
664 async fn pruner_watermark(
665 &mut self,
666 _pipeline: &'static str,
667 _delay: Duration,
668 ) -> anyhow::Result<Option<PrunerWatermark>> {
669 Ok(None)
671 }
672
673 async fn set_committer_watermark(
679 &mut self,
680 _pipeline_task: &str,
681 watermark: CommitterWatermark,
682 ) -> anyhow::Result<bool> {
683 self.watermark = Some(watermark);
684 Ok(true)
685 }
686
687 async fn set_reader_watermark(
688 &mut self,
689 _pipeline: &'static str,
690 _reader_lo: u64,
691 ) -> anyhow::Result<bool> {
692 bail!("Pruning not supported by analytics store");
693 }
694
695 async fn set_pruner_watermark(
696 &mut self,
697 _pipeline: &'static str,
698 _pruner_hi: u64,
699 ) -> anyhow::Result<bool> {
700 bail!("Pruning not supported by analytics store");
701 }
702}
703
704pub(crate) fn construct_object_store_path(
707 pipeline: &str,
708 epoch: EpochId,
709 checkpoint_range: &Range<u64>,
710 file_format: FileFormat,
711) -> ObjectPath {
712 let extension = match file_format {
713 FileFormat::Csv => "csv",
714 FileFormat::Parquet => "parquet",
715 };
716 ObjectPath::from(format!(
717 "{}/epoch_{}/{}_{}.{}",
718 pipeline, epoch, checkpoint_range.start, checkpoint_range.end, extension
719 ))
720}
721
722pub(crate) fn parse_checkpoint_range(filename: &str) -> Option<Range<u64>> {
725 let base = filename.split('.').next()?;
726 let (start_str, end_str) = base.split_once('_')?;
727 let start: u64 = start_str.parse().ok()?;
728 let end: u64 = end_str.parse().ok()?;
729 Some(start..end)
730}