sui_analytics_indexer/handlers/
handler.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use sui_indexer_alt_framework::pipeline::Processor;
9use sui_indexer_alt_framework::pipeline::sequential;
10use sui_indexer_alt_framework::store::Store;
11use sui_types::base_types::EpochId;
12use sui_types::full_checkpoint_content::Checkpoint;
13
14use crate::schema::RowSchema;
15use crate::store::AnalyticsStore;
16
17pub trait Row: RowSchema + Send + Sync {
23 fn get_epoch(&self) -> EpochId;
24 fn get_checkpoint(&self) -> u64;
25}
26
27trait TypeErasedRows: Send + Sync {
32 fn len(&self) -> usize;
33 fn iter(&self) -> Box<dyn Iterator<Item = &dyn Row> + '_>;
34}
35
36impl<T: Row + 'static> TypeErasedRows for Vec<T> {
37 fn len(&self) -> usize {
38 Vec::len(self)
39 }
40
41 fn iter(&self) -> Box<dyn Iterator<Item = &dyn Row> + '_> {
42 Box::new(self.as_slice().iter().map(|item| item as &dyn Row))
43 }
44}
45
46#[derive(Clone)]
52pub struct CheckpointRows {
53 pub checkpoint: u64,
54 pub epoch: EpochId,
55 rows: Arc<dyn TypeErasedRows>,
56}
57
58impl CheckpointRows {
59 pub fn len(&self) -> usize {
60 self.rows.len()
61 }
62
63 pub fn is_empty(&self) -> bool {
64 self.rows.len() == 0
65 }
66
67 pub fn iter(&self) -> impl Iterator<Item = &dyn Row> + '_ {
68 self.rows.iter()
69 }
70}
71
72#[derive(Default)]
75pub struct Batch {
76 checkpoints: Vec<CheckpointRows>,
77}
78
79impl Batch {
80 fn push(&mut self, checkpoint_rows: CheckpointRows) {
81 self.checkpoints.push(checkpoint_rows);
82 }
83
84 fn as_slice(&self) -> &[CheckpointRows] {
85 &self.checkpoints
86 }
87}
88
89pub struct AnalyticsHandler<P>
98where
99 P: Processor,
100{
101 processor: P,
102}
103
104impl<P: Processor> AnalyticsHandler<P>
105where
106 P::Value: Row,
107{
108 pub fn new(processor: P) -> Self {
110 Self { processor }
111 }
112}
113
114#[async_trait]
115impl<P> Processor for AnalyticsHandler<P>
116where
117 P: Processor + Send + Sync,
118 P::Value: Send + Sync,
119{
120 const NAME: &'static str = P::NAME;
121 type Value = P::Value;
122
123 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
124 self.processor.process(checkpoint).await
125 }
126}
127
128#[async_trait]
129impl<P, T> sequential::Handler for AnalyticsHandler<P>
130where
131 P: Processor<Value = T> + Send + Sync,
132 T: Row + 'static,
133{
134 type Store = AnalyticsStore;
135 type Batch = Batch;
136
137 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
138 let Some(first) = values.as_slice().first() else {
139 return;
140 };
141 let checkpoint = first.get_checkpoint();
142 let epoch = first.get_epoch();
143
144 let rows: Vec<T> = values.collect();
145
146 let checkpoint_rows = CheckpointRows {
147 checkpoint,
148 epoch,
149 rows: Arc::new(rows) as Arc<dyn TypeErasedRows>,
150 };
151
152 batch.push(checkpoint_rows);
153 }
154
155 async fn commit<'a>(
156 &self,
157 batch: &Self::Batch,
158 conn: &mut <Self::Store as Store>::Connection<'a>,
159 ) -> Result<usize> {
160 conn.commit_batch::<P>(batch.as_slice()).await
161 }
162}