sui_rpc_store/indexer/
transaction_bitmap.rs1use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use roaring::RoaringBitmap;
36use sui_indexer_alt_framework::pipeline::Processor;
37use sui_indexer_alt_framework::pipeline::sequential;
38use sui_inverted_index::encode_dimension_key;
39use sui_inverted_index::for_each_transaction_dimension;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use crate::indexer::Schema;
43use crate::indexer::Store;
44use crate::indexer::tx_seq_at;
45use crate::schema::transaction_bitmap;
46use crate::schema::transaction_bitmap::TX_BUCKET_SIZE;
47use crate::schema::transaction_bitmap::bit_of;
48use crate::schema::transaction_bitmap::bucket_of;
49
50pub struct TransactionBitmap;
52
53pub struct Row {
56 pub dimension_key: Vec<u8>,
57 pub bucket: u64,
58 pub bitmap: RoaringBitmap,
59}
60
61#[async_trait]
62impl Processor for TransactionBitmap {
63 const NAME: &'static str = "transaction_bitmap";
64 type Value = Row;
65
66 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
67 let mut groups: HashMap<(Vec<u8>, u64), RoaringBitmap> = HashMap::new();
68 let mut dim_keys: HashSet<Vec<u8>> = HashSet::new();
69
70 for (i, tx) in checkpoint.transactions.iter().enumerate() {
71 let tx_seq = tx_seq_at(checkpoint, i);
72 let bucket = bucket_of(tx_seq);
73 let bit = bit_of(tx_seq);
74
75 dim_keys.clear();
81 for_each_transaction_dimension(
82 &tx.transaction,
83 &tx.effects,
84 tx.events.as_ref(),
85 &checkpoint.object_set,
86 |dim, value| {
87 dim_keys.insert(encode_dimension_key(dim, value));
88 },
89 );
90
91 for dim_key in dim_keys.drain() {
92 groups.entry((dim_key, bucket)).or_default().insert(bit);
93 }
94 }
95
96 Ok(groups
97 .into_iter()
98 .map(|((dim_key, bucket), bitmap)| Row {
99 dimension_key: dim_key,
100 bucket,
101 bitmap,
102 })
103 .collect())
104 }
105}
106
107#[async_trait]
108impl sequential::Handler for TransactionBitmap {
109 type Store = Store;
110 type Batch = HashMap<(Vec<u8>, u64), RoaringBitmap>;
115
116 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
117 for row in values {
118 let entry = batch.entry((row.dimension_key, row.bucket)).or_default();
119 *entry |= row.bitmap;
120 }
121 }
122
123 async fn commit<'a>(
124 &self,
125 batch: &Self::Batch,
126 conn: &mut sui_consistent_store::Connection<'a, Schema>,
127 ) -> anyhow::Result<usize> {
128 let cf = &conn.store.schema().transaction_bitmap;
129 for ((dim_key, bucket), bitmap) in batch {
130 let (k, v) = transaction_bitmap::store_bitmap(dim_key.clone(), *bucket, bitmap.clone());
131 conn.batch.merge(cf, &k, &v)?;
132 }
133 Ok(batch.len())
134 }
135}
136
137#[allow(dead_code)]
140const _BUCKET_SIZE_DOC: u64 = TX_BUCKET_SIZE;
141
142#[cfg(test)]
143mod tests {
144 use std::sync::Arc;
145
146 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
147
148 use super::*;
149
150 #[tokio::test]
151 async fn process_runs_against_synthetic_checkpoint() {
152 let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
153 let _ = TransactionBitmap.process(&checkpoint).await.unwrap();
154 }
155}