sui_rpc_store/indexer/
event_bitmap.rs1use std::collections::HashMap;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use roaring::RoaringBitmap;
27use sui_indexer_alt_framework::pipeline::Processor;
28use sui_indexer_alt_framework::pipeline::sequential;
29use sui_inverted_index::encode_dimension_key;
30use sui_inverted_index::for_each_event_dimension;
31use sui_types::full_checkpoint_content::Checkpoint;
32use sui_types::transaction::TransactionDataAPI;
33
34use crate::indexer::Schema;
35use crate::indexer::Store;
36use crate::indexer::tx_seq_at;
37use crate::schema::event_bitmap;
38use crate::schema::event_bitmap::EVENT_BITS;
39use crate::schema::event_bitmap::bit_of;
40use crate::schema::event_bitmap::bucket_of;
41use crate::schema::event_bitmap::pack;
42
43const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
47
48const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
52
53pub struct EventBitmap;
55
56pub struct Row {
59 pub dimension_key: Vec<u8>,
60 pub bucket: u64,
61 pub bitmap: RoaringBitmap,
62}
63
64#[async_trait]
65impl Processor for EventBitmap {
66 const NAME: &'static str = "event_bitmap";
67 type Value = Row;
68
69 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
70 let mut groups: HashMap<(Vec<u8>, u64), RoaringBitmap> = HashMap::new();
71
72 for (i, tx) in checkpoint.transactions.iter().enumerate() {
73 let tx_seq = tx_seq_at(checkpoint, i);
74 if tx_seq > MAX_TX_SEQ {
75 anyhow::bail!("tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}",);
76 }
77 let sender = tx.transaction.sender();
78
79 let mut packing_error: Option<anyhow::Error> = None;
83 for_each_event_dimension(
84 sender,
85 &tx.effects,
86 tx.events.as_ref(),
87 |event_idx, dim, value| {
88 if packing_error.is_some() {
89 return;
90 }
91 if event_idx >= MAX_EVENTS_PER_TX {
92 packing_error = Some(anyhow::anyhow!(
93 "event_idx {event_idx} exceeds packed event-seq limit {}",
94 MAX_EVENTS_PER_TX - 1,
95 ));
96 return;
97 }
98 let packed = pack(tx_seq, event_idx);
99 let bucket = bucket_of(packed);
100 let bit = bit_of(packed);
101 groups
102 .entry((encode_dimension_key(dim, value), bucket))
103 .or_default()
104 .insert(bit);
105 },
106 );
107 if let Some(e) = packing_error {
108 return Err(e);
109 }
110 }
111
112 Ok(groups
113 .into_iter()
114 .map(|((dim_key, bucket), bitmap)| Row {
115 dimension_key: dim_key,
116 bucket,
117 bitmap,
118 })
119 .collect())
120 }
121}
122
123#[async_trait]
124impl sequential::Handler for EventBitmap {
125 type Store = Store;
126 type Batch = HashMap<(Vec<u8>, u64), RoaringBitmap>;
130
131 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
132 for row in values {
133 let entry = batch.entry((row.dimension_key, row.bucket)).or_default();
134 *entry |= row.bitmap;
135 }
136 }
137
138 async fn commit<'a>(
139 &self,
140 batch: &Self::Batch,
141 conn: &mut sui_consistent_store::Connection<'a, Schema>,
142 ) -> anyhow::Result<usize> {
143 let cf = &conn.store.schema().event_bitmap;
144 for ((dim_key, bucket), bitmap) in batch {
145 let (k, v) = event_bitmap::store_bitmap(dim_key.clone(), *bucket, bitmap.clone());
146 conn.batch.merge(cf, &k, &v)?;
147 }
148 Ok(batch.len())
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::sync::Arc;
155
156 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
157
158 use super::*;
159
160 #[tokio::test]
161 async fn process_runs_against_synthetic_checkpoint() {
162 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
163 let _ = EventBitmap.process(&checkpoint).await.unwrap();
164 }
165}