sui_indexer_alt/handlers/
kv_checkpoints.rs1use std::sync::Arc;
5
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use diesel::{ExpressionMethods, QueryDsl};
9use diesel_async::RunQueryDsl;
10use sui_indexer_alt_framework::{
11 pipeline::{Processor, concurrent::Handler},
12 postgres::{Connection, Db},
13 types::full_checkpoint_content::CheckpointData,
14};
15use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
16
17pub(crate) struct KvCheckpoints;
18
19#[async_trait]
20impl Processor for KvCheckpoints {
21 const NAME: &'static str = "kv_checkpoints";
22
23 type Value = StoredCheckpoint;
24
25 async fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
26 let sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
27 let checkpoint_summary = checkpoint.checkpoint_summary.data();
28 let signatures = checkpoint.checkpoint_summary.auth_sig();
29 Ok(vec![StoredCheckpoint {
30 sequence_number,
31 checkpoint_contents: bcs::to_bytes(&checkpoint.checkpoint_contents)
32 .with_context(|| format!("Serializing checkpoint {sequence_number} contents"))?,
33 checkpoint_summary: bcs::to_bytes(checkpoint_summary)
34 .with_context(|| format!("Serializing checkpoint {sequence_number} summary"))?,
35 validator_signatures: bcs::to_bytes(signatures)
36 .with_context(|| format!("Serializing checkpoint {sequence_number} signatures"))?,
37 }])
38 }
39}
40
41#[async_trait]
42impl Handler for KvCheckpoints {
43 type Store = Db;
44
45 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
46 Ok(diesel::insert_into(kv_checkpoints::table)
47 .values(values)
48 .on_conflict_do_nothing()
49 .execute(conn)
50 .await?)
51 }
52
53 async fn prune<'a>(
54 &self,
55 from: u64,
56 to_exclusive: u64,
57 conn: &mut Connection<'a>,
58 ) -> Result<usize> {
59 let filter = kv_checkpoints::table
60 .filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));
61
62 Ok(diesel::delete(filter).execute(conn).await?)
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69 use diesel_async::RunQueryDsl;
70 use sui_indexer_alt_framework::{
71 Indexer, types::test_checkpoint_data_builder::TestCheckpointDataBuilder,
72 };
73 use sui_indexer_alt_schema::MIGRATIONS;
74
75 async fn get_all_kv_checkpoints(conn: &mut Connection<'_>) -> Result<Vec<StoredCheckpoint>> {
76 let query = kv_checkpoints::table.load(conn).await?;
77 Ok(query)
78 }
79
80 #[tokio::test]
83 async fn test_kv_checkpoints_pruning() {
84 let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
85 let mut conn = indexer.store().connect().await.unwrap();
86
87 let mut builder = TestCheckpointDataBuilder::new(0);
89 builder = builder.start_transaction(0).finish_transaction();
90 let checkpoint = Arc::new(builder.build_checkpoint());
91 let values = KvCheckpoints.process(&checkpoint).await.unwrap();
92 KvCheckpoints::commit(&values, &mut conn).await.unwrap();
93
94 builder = builder.start_transaction(0).finish_transaction();
95 let checkpoint = Arc::new(builder.build_checkpoint());
96 let values = KvCheckpoints.process(&checkpoint).await.unwrap();
97 KvCheckpoints::commit(&values, &mut conn).await.unwrap();
98
99 builder = builder.start_transaction(0).finish_transaction();
100 let checkpoint = Arc::new(builder.build_checkpoint());
101 let values = KvCheckpoints.process(&checkpoint).await.unwrap();
102 KvCheckpoints::commit(&values, &mut conn).await.unwrap();
103
104 let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
106 assert_eq!(rows_pruned, 2);
107
108 let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
110 assert_eq!(remaining_checkpoints.len(), 1);
111 }
112}