sui_rpc_store/indexer/
checkpoint_summary.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::checkpoint_summary`](crate::schema::checkpoint_summary)
6//! CF: one row per checkpoint carrying the BCS-encoded
7//! `CheckpointSummary` and its quorum signature.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::schema::checkpoint_summary;
19use crate::schema::primitives::U64Be;
20
21/// Pipeline marker for `checkpoint_summary`.
22pub struct CheckpointSummary;
23
24/// One stored row, ready to be put into the CF. The processor
25/// pre-builds the typed [`checkpoint_summary::Value`] so the
26/// commit path is just a `Batch::put` per entry.
27pub struct Row {
28    pub seq: u64,
29    pub value: checkpoint_summary::Value,
30}
31
32#[async_trait]
33impl Processor for CheckpointSummary {
34    const NAME: &'static str = "checkpoint_summary";
35    type Value = Row;
36
37    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38        Ok(vec![Row {
39            seq: checkpoint.summary.data().sequence_number,
40            value: checkpoint_summary::store(
41                checkpoint.summary.data(),
42                checkpoint.summary.auth_sig(),
43            ),
44        }])
45    }
46}
47
48#[async_trait]
49impl sequential::Handler for CheckpointSummary {
50    type Store = Store;
51    type Batch = Vec<Row>;
52
53    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
54        batch.extend(values);
55    }
56
57    async fn commit<'a>(
58        &self,
59        batch: &Self::Batch,
60        conn: &mut sui_consistent_store::Connection<'a, Schema>,
61    ) -> anyhow::Result<usize> {
62        let cf = &conn.store.schema().checkpoint_summary;
63        for row in batch {
64            conn.batch.put(cf, &U64Be(row.seq), &row.value)?;
65        }
66        Ok(batch.len())
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use std::sync::Arc;
73
74    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
75
76    use super::*;
77
78    #[tokio::test]
79    async fn process_emits_one_row_per_checkpoint() {
80        let checkpoint = Arc::new(TestCheckpointBuilder::new(42).build_checkpoint());
81        let rows = CheckpointSummary.process(&checkpoint).await.unwrap();
82        assert_eq!(rows.len(), 1);
83        assert_eq!(rows[0].seq, 42);
84    }
85}