sui_analytics_indexer/handlers/
handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_indexer_alt_framework::pipeline::sequential;
10use sui_indexer_alt_framework::store::Store;
11use sui_types::base_types::EpochId;
12use sui_types::full_checkpoint_content::Checkpoint;
13
14use crate::schema::RowSchema;
15use crate::store::AnalyticsStore;
16
17/// Row types implement this to provide epoch and checkpoint information for batching.
18/// Batches are committed at epoch boundaries to ensure files don't span epochs.
19///
20/// This trait requires `RowSchema + Send + Sync`, making `dyn Row` object-safe
21/// and usable for dynamic dispatch in the analytics store.
22pub trait Row: RowSchema + Send + Sync {
23    fn get_epoch(&self) -> EpochId;
24    fn get_checkpoint(&self) -> u64;
25}
26
27/// Private trait for type-erased row storage.
28///
29/// Allows storing `Vec<T>` without knowing `T` at compile time.
30/// The conversion to `&dyn Row` happens lazily during iteration.
31trait TypeErasedRows: Send + Sync {
32    fn len(&self) -> usize;
33    fn iter(&self) -> Box<dyn Iterator<Item = &dyn Row> + '_>;
34}
35
36impl<T: Row + 'static> TypeErasedRows for Vec<T> {
37    fn len(&self) -> usize {
38        Vec::len(self)
39    }
40
41    fn iter(&self) -> Box<dyn Iterator<Item = &dyn Row> + '_> {
42        Box::new(self.as_slice().iter().map(|item| item as &dyn Row))
43    }
44}
45
46/// Rows from a single checkpoint with metadata.
47///
48/// Stores rows in a type-erased container, allowing storage of `Vec<T>`
49/// for any concrete row type T. The conversion to `&dyn Row` happens lazily
50/// during iteration.
51#[derive(Clone)]
52pub struct CheckpointRows {
53    pub checkpoint: u64,
54    pub epoch: EpochId,
55    rows: Arc<dyn TypeErasedRows>,
56}
57
58impl CheckpointRows {
59    pub fn len(&self) -> usize {
60        self.rows.len()
61    }
62
63    pub fn is_empty(&self) -> bool {
64        self.rows.len() == 0
65    }
66
67    pub fn iter(&self) -> impl Iterator<Item = &dyn Row> + '_ {
68        self.rows.iter()
69    }
70}
71
72/// Batch of rows grouped by checkpoint, ready for commit.
73/// Non-generic: conversion to trait objects happens in batch(), not commit().
74#[derive(Default)]
75pub struct Batch {
76    checkpoints: Vec<CheckpointRows>,
77}
78
79impl Batch {
80    fn push(&mut self, checkpoint_rows: CheckpointRows) {
81        self.checkpoints.push(checkpoint_rows);
82    }
83
84    fn as_slice(&self) -> &[CheckpointRows] {
85        &self.checkpoints
86    }
87}
88
89/// Generic wrapper that implements Handler for any Processor with analytics batching.
90///
91/// This adapter wraps a `Processor` and provides the common batching and commit
92/// logic for writing analytics data to object stores. Uses a copy-on-write pattern
93/// for atomic commits: accumulated rows are only swapped after successful upload.
94///
95/// Note: Accumulated rows and pipeline config are stored on the `AnalyticsStore`,
96/// not on this handler. This makes handlers stateless and allows state to be shared.
97pub struct AnalyticsHandler<P>
98where
99    P: Processor,
100{
101    processor: P,
102}
103
104impl<P: Processor> AnalyticsHandler<P>
105where
106    P::Value: Row,
107{
108    /// Create a new analytics handler and register the pipeline with the store.
109    pub fn new(processor: P) -> Self {
110        Self { processor }
111    }
112}
113
114#[async_trait]
115impl<P> Processor for AnalyticsHandler<P>
116where
117    P: Processor + Send + Sync,
118    P::Value: Send + Sync,
119{
120    const NAME: &'static str = P::NAME;
121    type Value = P::Value;
122
123    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
124        self.processor.process(checkpoint).await
125    }
126}
127
128#[async_trait]
129impl<P, T> sequential::Handler for AnalyticsHandler<P>
130where
131    P: Processor<Value = T> + Send + Sync,
132    T: Row + 'static,
133{
134    type Store = AnalyticsStore;
135    type Batch = Batch;
136
137    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
138        let Some(first) = values.as_slice().first() else {
139            return;
140        };
141        let checkpoint = first.get_checkpoint();
142        let epoch = first.get_epoch();
143
144        let rows: Vec<T> = values.collect();
145
146        let checkpoint_rows = CheckpointRows {
147            checkpoint,
148            epoch,
149            rows: Arc::new(rows) as Arc<dyn TypeErasedRows>,
150        };
151
152        batch.push(checkpoint_rows);
153    }
154
155    async fn commit<'a>(
156        &self,
157        batch: &Self::Batch,
158        conn: &mut <Self::Store as Store>::Connection<'a>,
159    ) -> Result<usize> {
160        conn.commit_batch::<P>(batch.as_slice()).await
161    }
162}