sui_rpc_store/indexer/
transaction_bitmap.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::transaction_bitmap`](crate::schema::transaction_bitmap)
6//! CF.
7//!
8//! Mirrors the tx-space half of
9//! `write_ledger_history_rows_for_checkpoint` in
10//! `sui-core::rpc_index`. For every transaction in the checkpoint
11//! the pipeline:
12//!
13//! 1. Visits every dimension candidate via
14//!    [`sui_inverted_index::for_each_transaction_dimension`].
15//! 2. Encodes each `(dimension, value)` into a `dimension_key` via
16//!    [`sui_inverted_index::encode_dimension_key`] and dedupes
17//!    per-tx, so a transaction matching the same dimension
18//!    multiple times contributes a single bit per
19//!    `(dim_key, bucket)`.
20//! 3. Groups `tx_seq` bits by `(dim_key, tx_seq / TX_BUCKET_SIZE)`,
21//!    folding any number of transactions in the checkpoint into
22//!    one `RoaringBitmap` per group.
23//!
24//! Multiple checkpoints landing in the same commit batch are
25//! folded into the same `RoaringBitmap` per group via the
26//! handler's `batch` callback, so the commit path emits one
27//! merge operand per `(dim_key, bucket)` regardless of how many
28//! checkpoints contributed.
29
30use std::collections::HashMap;
31use std::collections::HashSet;
32use std::sync::Arc;
33
34use async_trait::async_trait;
35use roaring::RoaringBitmap;
36use sui_indexer_alt_framework::pipeline::Processor;
37use sui_indexer_alt_framework::pipeline::sequential;
38use sui_inverted_index::encode_dimension_key;
39use sui_inverted_index::for_each_transaction_dimension;
40use sui_types::full_checkpoint_content::Checkpoint;
41
42use crate::indexer::Schema;
43use crate::indexer::Store;
44use crate::indexer::tx_seq_at;
45use crate::schema::transaction_bitmap;
46use crate::schema::transaction_bitmap::TX_BUCKET_SIZE;
47use crate::schema::transaction_bitmap::bit_of;
48use crate::schema::transaction_bitmap::bucket_of;
49
50/// Pipeline marker for `transaction_bitmap`.
51pub struct TransactionBitmap;
52
53/// One pre-built bitmap for a single `(dimension_key, bucket)`
54/// pair, ready to be staged as a merge operand against the CF.
55pub struct Row {
56    pub dimension_key: Vec<u8>,
57    pub bucket: u64,
58    pub bitmap: RoaringBitmap,
59}
60
61#[async_trait]
62impl Processor for TransactionBitmap {
63    const NAME: &'static str = "transaction_bitmap";
64    type Value = Row;
65
66    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
67        let mut groups: HashMap<(Vec<u8>, u64), RoaringBitmap> = HashMap::new();
68        let mut dim_keys: HashSet<Vec<u8>> = HashSet::new();
69
70        for (i, tx) in checkpoint.transactions.iter().enumerate() {
71            let tx_seq = tx_seq_at(checkpoint, i);
72            let bucket = bucket_of(tx_seq);
73            let bit = bit_of(tx_seq);
74
75            // Dedupe `(dim, value)` pairs that occur multiple
76            // times in the same tx (e.g. AffectedAddress when an
77            // address shows up in several object changes).
78            // Without this we'd add the same bit to the bitmap
79            // repeatedly — not incorrect, but redundant work.
80            dim_keys.clear();
81            for_each_transaction_dimension(
82                &tx.transaction,
83                &tx.effects,
84                tx.events.as_ref(),
85                &checkpoint.object_set,
86                |dim, value| {
87                    dim_keys.insert(encode_dimension_key(dim, value));
88                },
89            );
90
91            for dim_key in dim_keys.drain() {
92                groups.entry((dim_key, bucket)).or_default().insert(bit);
93            }
94        }
95
96        Ok(groups
97            .into_iter()
98            .map(|((dim_key, bucket), bitmap)| Row {
99                dimension_key: dim_key,
100                bucket,
101                bitmap,
102            })
103            .collect())
104    }
105}
106
107#[async_trait]
108impl sequential::Handler for TransactionBitmap {
109    type Store = Store;
110    /// Fold operands from multiple checkpoints together so we
111    /// emit at most one merge operand per `(dim_key, bucket)`
112    /// per commit — even if many checkpoints land in the same
113    /// batch.
114    type Batch = HashMap<(Vec<u8>, u64), RoaringBitmap>;
115
116    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
117        for row in values {
118            let entry = batch.entry((row.dimension_key, row.bucket)).or_default();
119            *entry |= row.bitmap;
120        }
121    }
122
123    async fn commit<'a>(
124        &self,
125        batch: &Self::Batch,
126        conn: &mut sui_consistent_store::Connection<'a, Schema>,
127    ) -> anyhow::Result<usize> {
128        let cf = &conn.store.schema().transaction_bitmap;
129        for ((dim_key, bucket), bitmap) in batch {
130            let (k, v) = transaction_bitmap::store_bitmap(dim_key.clone(), *bucket, bitmap.clone());
131            conn.batch.merge(cf, &k, &v)?;
132        }
133        Ok(batch.len())
134    }
135}
136
137// Re-export for documentation cross-referencing — silence the
138// "unused import" lint without an `#[allow]`.
139#[allow(dead_code)]
140const _BUCKET_SIZE_DOC: u64 = TX_BUCKET_SIZE;
141
142#[cfg(test)]
143mod tests {
144    use std::sync::Arc;
145
146    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
147
148    use super::*;
149
150    #[tokio::test]
151    async fn process_runs_against_synthetic_checkpoint() {
152        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
153        let _ = TransactionBitmap.process(&checkpoint).await.unwrap();
154    }
155}