sui_rpc_store/indexer/
checkpoint_summary.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::pipeline::sequential;
14use sui_types::full_checkpoint_content::Checkpoint;
15
16use crate::indexer::Schema;
17use crate::indexer::Store;
18use crate::schema::checkpoint_summary;
19use crate::schema::primitives::U64Be;
20
21pub struct CheckpointSummary;
23
24pub struct Row {
28 pub seq: u64,
29 pub value: checkpoint_summary::Value,
30}
31
32#[async_trait]
33impl Processor for CheckpointSummary {
34 const NAME: &'static str = "checkpoint_summary";
35 type Value = Row;
36
37 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> anyhow::Result<Vec<Row>> {
38 Ok(vec![Row {
39 seq: checkpoint.summary.data().sequence_number,
40 value: checkpoint_summary::store(
41 checkpoint.summary.data(),
42 checkpoint.summary.auth_sig(),
43 ),
44 }])
45 }
46}
47
48#[async_trait]
49impl sequential::Handler for CheckpointSummary {
50 type Store = Store;
51 type Batch = Vec<Row>;
52
53 fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Row>) {
54 batch.extend(values);
55 }
56
57 async fn commit<'a>(
58 &self,
59 batch: &Self::Batch,
60 conn: &mut sui_consistent_store::Connection<'a, Schema>,
61 ) -> anyhow::Result<usize> {
62 let cf = &conn.store.schema().checkpoint_summary;
63 for row in batch {
64 conn.batch.put(cf, &U64Be(row.seq), &row.value)?;
65 }
66 Ok(batch.len())
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use std::sync::Arc;
73
74 use sui_types::test_checkpoint_data_builder::TestCheckpointBuilder;
75
76 use super::*;
77
78 #[tokio::test]
79 async fn process_emits_one_row_per_checkpoint() {
80 let checkpoint = Arc::new(TestCheckpointBuilder::new(42).build_checkpoint());
81 let rows = CheckpointSummary.process(&checkpoint).await.unwrap();
82 assert_eq!(rows.len(), 1);
83 assert_eq!(rows[0].seq, 42);
84 }
85}