sui_rpc_store/indexer/
event_bitmap.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::event_bitmap`](crate::schema::event_bitmap) CF.
6//!
7//! Mirrors the event-space half of
8//! `write_ledger_history_rows_for_checkpoint` in
9//! `sui-core::rpc_index`. For every event in every transaction
10//! the pipeline:
11//!
12//! 1. Visits dimension candidates via
13//!    [`sui_inverted_index::for_each_event_dimension`].
14//! 2. Encodes the dimension key via
15//!    [`sui_inverted_index::encode_dimension_key`].
16//! 3. Packs `(tx_seq, event_idx)` into the event-seq space the
17//!    schema uses (`pack(tx_seq, event_idx)`), checks the packing
18//!    doesn't overflow the per-tx event limit (`1 << EVENT_BITS`)
19//!    or the `tx_seq` ceiling (`u64::MAX >> EVENT_BITS`), and
20//!    groups the bit into `(dim_key, packed / EVENT_BUCKET_SIZE)`.
21
22use std::collections::HashMap;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use roaring::RoaringBitmap;
27use sui_indexer_alt_framework::pipeline::Processor;
28use sui_indexer_alt_framework::pipeline::sequential;
29use sui_inverted_index::encode_dimension_key;
30use sui_inverted_index::for_each_event_dimension;
31use sui_types::full_checkpoint_content::Checkpoint;
32use sui_types::transaction::TransactionDataAPI;
33
34use crate::indexer::Schema;
35use crate::indexer::Store;
36use crate::indexer::tx_seq_at;
37use crate::schema::event_bitmap;
38use crate::schema::event_bitmap::EVENT_BITS;
39use crate::schema::event_bitmap::bit_of;
40use crate::schema::event_bitmap::bucket_of;
41use crate::schema::event_bitmap::pack;
42
43/// Maximum events a single transaction can contribute before the
44/// packed event-seq space would collide with the next
45/// transaction's range.
46const MAX_EVENTS_PER_TX: u32 = 1 << EVENT_BITS;
47
48/// Maximum `tx_seq` value whose `<< EVENT_BITS` still fits in a
49/// `u64`. Anything beyond this would lose its high bits during
50/// packing.
51const MAX_TX_SEQ: u64 = u64::MAX >> EVENT_BITS;
52
53/// Pipeline marker for `event_bitmap`.
54pub struct EventBitmap;
55
56/// One pre-built bitmap for a single `(dimension_key, bucket)`
57/// pair, ready to be staged as a merge operand against the CF.
58pub struct Row {
59    pub dimension_key: Vec<u8>,
60    pub bucket: u64,
61    pub bitmap: RoaringBitmap,
62}
63
64#[async_trait]
65impl Processor for EventBitmap {
66    const NAME: &'static str = "event_bitmap";
67    type Value = Row;
68
69    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
70        let mut groups: HashMap<(Vec<u8>, u64), RoaringBitmap> = HashMap::new();
71
72        for (i, tx) in checkpoint.transactions.iter().enumerate() {
73            let tx_seq = tx_seq_at(checkpoint, i);
74            if tx_seq > MAX_TX_SEQ {
75                anyhow::bail!("tx_seq {tx_seq} exceeds packed event-seq limit {MAX_TX_SEQ}",);
76            }
77            let sender = tx.transaction.sender();
78
79            // `for_each_event_dimension` can't propagate errors,
80            // so a packing failure has to be captured and
81            // surfaced afterwards.
82            let mut packing_error: Option<anyhow::Error> = None;
83            for_each_event_dimension(
84                sender,
85                &tx.effects,
86                tx.events.as_ref(),
87                |event_idx, dim, value| {
88                    if packing_error.is_some() {
89                        return;
90                    }
91                    if event_idx >= MAX_EVENTS_PER_TX {
92                        packing_error = Some(anyhow::anyhow!(
93                            "event_idx {event_idx} exceeds packed event-seq limit {}",
94                            MAX_EVENTS_PER_TX - 1,
95                        ));
96                        return;
97                    }
98                    let packed = pack(tx_seq, event_idx);
99                    let bucket = bucket_of(packed);
100                    let bit = bit_of(packed);
101                    groups
102                        .entry((encode_dimension_key(dim, value), bucket))
103                        .or_default()
104                        .insert(bit);
105                },
106            );
107            if let Some(e) = packing_error {
108                return Err(e);
109            }
110        }
111
112        Ok(groups
113            .into_iter()
114            .map(|((dim_key, bucket), bitmap)| Row {
115                dimension_key: dim_key,
116                bucket,
117                bitmap,
118            })
119            .collect())
120    }
121}
122
123#[async_trait]
124impl sequential::Handler for EventBitmap {
125    type Store = Store;
126    /// Fold operands from multiple checkpoints together so the
127    /// commit path stages at most one merge operand per
128    /// `(dim_key, bucket)` per commit.
129    type Batch = HashMap<(Vec<u8>, u64), RoaringBitmap>;
130
131    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
132        for row in values {
133            let entry = batch.entry((row.dimension_key, row.bucket)).or_default();
134            *entry |= row.bitmap;
135        }
136    }
137
138    async fn commit<'a>(
139        &self,
140        batch: &Self::Batch,
141        conn: &mut sui_consistent_store::Connection<'a, Schema>,
142    ) -> anyhow::Result<usize> {
143        let cf = &conn.store.schema().event_bitmap;
144        for ((dim_key, bucket), bitmap) in batch {
145            let (k, v) = event_bitmap::store_bitmap(dim_key.clone(), *bucket, bitmap.clone());
146            conn.batch.merge(cf, &k, &v)?;
147        }
148        Ok(batch.len())
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::sync::Arc;
155
156    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
157
158    use super::*;
159
160    #[tokio::test]
161    async fn process_runs_against_synthetic_checkpoint() {
162        let checkpoint = Arc::new(TestCheckpointBuilder::new(1).build_checkpoint());
163        let _ = EventBitmap.process(&checkpoint).await.unwrap();
164    }
165}