sui_rpc_store/indexer/
checkpoint_contents.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::schema::checkpoint_contents;
19use crate::schema::primitives::U64Be;
20
21pub struct CheckpointContents;
23
24pub struct Row {
26 pub seq: u64,
27 pub value: checkpoint_contents::Value,
28}
29
30#[async_trait]
31impl Processor for CheckpointContents {
32 const NAME: &'static str = "checkpoint_contents";
33 type Value = Row;
34
35 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
36 Ok(vec![Row {
37 seq: checkpoint.summary.data().sequence_number,
38 value: checkpoint_contents::store(&checkpoint.contents),
39 }])
40 }
41}
42
43#[async_trait]
44impl sequential::Handler for CheckpointContents {
45 type Store = Store;
46 type Batch = Vec<Row>;
47
48 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
49 batch.extend(values);
50 }
51
52 async fn commit<'a>(
53 &self,
54 batch: &Self::Batch,
55 conn: &mut sui_consistent_store::Connection<'a, Schema>,
56 ) -> anyhow::Result<usize> {
57 let cf = &conn.store.schema().checkpoint_contents;
58 for row in batch {
59 conn.batch.put(cf, &U64Be(row.seq), &row.value)?;
60 }
61 Ok(batch.len())
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use std::sync::Arc;
68
69 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
70
71 use super::*;
72
73 #[tokio::test]
74 async fn process_emits_one_row_per_checkpoint() {
75 let checkpoint = Arc::new(TestCheckpointBuilder::new(7).build_checkpoint());
76 let rows = CheckpointContents.process(&checkpoint).await.unwrap();
77 assert_eq!(rows.len(), 1);
78 assert_eq!(rows[0].seq, 7);
79 }
80}