sui_indexer_alt/handlers/
kv_checkpoints.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use diesel::{ExpressionMethods, QueryDsl};
9use diesel_async::RunQueryDsl;
10use sui_indexer_alt_framework::{
11    pipeline::Processor,
12    postgres::{Connection, handler::Handler},
13    types::full_checkpoint_content::Checkpoint,
14};
15use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
16
17pub(crate) struct KvCheckpoints;
18
19#[async_trait]
20impl Processor for KvCheckpoints {
21    const NAME: &'static str = "kv_checkpoints";
22
23    type Value = StoredCheckpoint;
24
25    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
26        let sequence_number = checkpoint.summary.sequence_number as i64;
27        let summary = checkpoint.summary.data();
28        let signatures = checkpoint.summary.auth_sig();
29        Ok(vec![StoredCheckpoint {
30            sequence_number,
31            checkpoint_contents: bcs::to_bytes(&checkpoint.contents)
32                .with_context(|| format!("Serializing checkpoint {sequence_number} contents"))?,
33            checkpoint_summary: bcs::to_bytes(summary)
34                .with_context(|| format!("Serializing checkpoint {sequence_number} summary"))?,
35            validator_signatures: bcs::to_bytes(signatures)
36                .with_context(|| format!("Serializing checkpoint {sequence_number} signatures"))?,
37        }])
38    }
39}
40
41#[async_trait]
42impl Handler for KvCheckpoints {
43    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
44        Ok(diesel::insert_into(kv_checkpoints::table)
45            .values(values)
46            .on_conflict_do_nothing()
47            .execute(conn)
48            .await?)
49    }
50
51    async fn prune<'a>(
52        &self,
53        from: u64,
54        to_exclusive: u64,
55        conn: &mut Connection<'a>,
56    ) -> Result<usize> {
57        let filter = kv_checkpoints::table
58            .filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));
59
60        Ok(diesel::delete(filter).execute(conn).await?)
61    }
62}
63
64#[cfg(test)]
65mod tests {
66    use super::*;
67    use diesel_async::RunQueryDsl;
68    use sui_indexer_alt_framework::{
69        Indexer, types::test_checkpoint_data_builder::TestCheckpointBuilder,
70    };
71    use sui_indexer_alt_schema::MIGRATIONS;
72
73    async fn get_all_kv_checkpoints(conn: &mut Connection<'_>) -> Result<Vec<StoredCheckpoint>> {
74        let query = kv_checkpoints::table.load(conn).await?;
75        Ok(query)
76    }
77
78    /// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
79    /// checkpoint sequence number range.
80    #[tokio::test]
81    async fn test_kv_checkpoints_pruning() {
82        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
83        let mut conn = indexer.store().connect().await.unwrap();
84
85        // Create 3 checkpoints
86        let mut builder = TestCheckpointBuilder::new(0);
87        builder = builder.start_transaction(0).finish_transaction();
88        let checkpoint = Arc::new(builder.build_checkpoint());
89        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
90        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
91
92        builder = builder.start_transaction(0).finish_transaction();
93        let checkpoint = Arc::new(builder.build_checkpoint());
94        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
95        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
96
97        builder = builder.start_transaction(0).finish_transaction();
98        let checkpoint = Arc::new(builder.build_checkpoint());
99        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
100        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
101
102        // Prune checkpoints from `[0, 2)`
103        let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
104        assert_eq!(rows_pruned, 2);
105
106        // Checkpoint 2 remains
107        let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
108        assert_eq!(remaining_checkpoints.len(), 1);
109    }
110}