sui_rpc_store/indexer/
checkpoint_contents.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Sequential pipeline that populates the
5//! [`schema::checkpoint_contents`](crate::schema::checkpoint_contents)
6//! CF: one row per checkpoint carrying the BCS-encoded
7//! `CheckpointContents`.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::schema::checkpoint_contents;
19use crate::schema::primitives::U64Be;
20
21/// Pipeline marker for `checkpoint_contents`.
22pub struct CheckpointContents;
23
24/// One stored row, ready to be put into the CF.
25pub struct Row {
26    pub seq: u64,
27    pub value: checkpoint_contents::Value,
28}
29
30#[async_trait]
31impl Processor for CheckpointContents {
32    const NAME: &'static str = "checkpoint_contents";
33    type Value = Row;
34
35    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36        Ok(vec![Row {
37            seq: checkpoint.summary.data().sequence_number,
38            value: checkpoint_contents::store(&checkpoint.contents),
39        }])
40    }
41}
42
43#[async_trait]
44impl sequential::Handler for CheckpointContents {
45    type Store = Store;
46    type Batch = Vec<Row>;
47
48    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
49        batch.extend(values);
50    }
51
52    async fn commit<'a>(
53        &self,
54        batch: &Self::Batch,
55        conn: &mut sui_consistent_store::Connection<'a, Schema>,
56    ) -> anyhow::Result<usize> {
57        let cf = &conn.store.schema().checkpoint_contents;
58        for row in batch {
59            conn.batch.put(cf, &U64Be(row.seq), &row.value)?;
60        }
61        Ok(batch.len())
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use std::sync::Arc;
68
69    use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
70
71    use super::*;
72
73    #[tokio::test]
74    async fn process_emits_one_row_per_checkpoint() {
75        let checkpoint = Arc::new(TestCheckpointBuilder::new(7).build_checkpoint());
76        let rows = CheckpointContents.process(&checkpoint).await.unwrap();
77        assert_eq!(rows.len(), 1);
78        assert_eq!(rows[0].seq, 7);
79    }
80}