sui_indexer_alt/handlers/
kv_checkpoints.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Context;
7use anyhow::Result;
8use async_trait::async_trait;
9use diesel::ExpressionMethods;
10use diesel::QueryDsl;
11use diesel_async::RunQueryDsl;
12use sui_indexer_alt_framework::pipeline::Processor;
13use sui_indexer_alt_framework::postgres::Connection;
14use sui_indexer_alt_framework::postgres::handler::Handler;
15use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
16use sui_indexer_alt_schema::checkpoints::StoredCheckpoint;
17use sui_indexer_alt_schema::schema::kv_checkpoints;
18
19pub(crate) struct KvCheckpoints;
20
21#[async_trait]
22impl Processor for KvCheckpoints {
23    const NAME: &'static str = "kv_checkpoints";
24
25    type Value = StoredCheckpoint;
26
27    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
28        let sequence_number = checkpoint.summary.sequence_number as i64;
29        let summary = checkpoint.summary.data();
30        let signatures = checkpoint.summary.auth_sig();
31        Ok(vec![StoredCheckpoint {
32            sequence_number,
33            checkpoint_contents: bcs::to_bytes(&checkpoint.contents)
34                .with_context(|| format!("Serializing checkpoint {sequence_number} contents"))?,
35            checkpoint_summary: bcs::to_bytes(summary)
36                .with_context(|| format!("Serializing checkpoint {sequence_number} summary"))?,
37            validator_signatures: bcs::to_bytes(signatures)
38                .with_context(|| format!("Serializing checkpoint {sequence_number} signatures"))?,
39        }])
40    }
41}
42
43#[async_trait]
44impl Handler for KvCheckpoints {
45    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
46        Ok(diesel::insert_into(kv_checkpoints::table)
47            .values(values)
48            .on_conflict_do_nothing()
49            .execute(conn)
50            .await?)
51    }
52
53    async fn prune<'a>(
54        &self,
55        from: u64,
56        to_exclusive: u64,
57        conn: &mut Connection<'a>,
58    ) -> Result<usize> {
59        let filter = kv_checkpoints::table
60            .filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));
61
62        Ok(diesel::delete(filter).execute(conn).await?)
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use diesel_async::RunQueryDsl;
69    use sui_indexer_alt_framework::Indexer;
70    use sui_indexer_alt_framework::types::test_checkpoint_data_builder::TestCheckpointBuilder;
71    use sui_indexer_alt_schema::MIGRATIONS;
72
73    use super::*;
74
75    async fn get_all_kv_checkpoints(conn: &mut Connection<'_>) -> Result<Vec<StoredCheckpoint>> {
76        let query = kv_checkpoints::table.load(conn).await?;
77        Ok(query)
78    }
79
80    /// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
81    /// checkpoint sequence number range.
82    #[tokio::test]
83    async fn test_kv_checkpoints_pruning() {
84        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
85        let mut conn = indexer.store().connect().await.unwrap();
86
87        // Create 3 checkpoints
88        let mut builder = TestCheckpointBuilder::new(0);
89        builder = builder.start_transaction(0).finish_transaction();
90        let checkpoint = Arc::new(builder.build_checkpoint());
91        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
92        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
93
94        builder = builder.start_transaction(0).finish_transaction();
95        let checkpoint = Arc::new(builder.build_checkpoint());
96        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
97        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
98
99        builder = builder.start_transaction(0).finish_transaction();
100        let checkpoint = Arc::new(builder.build_checkpoint());
101        let values = KvCheckpoints.process(&checkpoint).await.unwrap();
102        KvCheckpoints::commit(&values, &mut conn).await.unwrap();
103
104        // Prune checkpoints from `[0, 2)`
105        let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
106        assert_eq!(rows_pruned, 2);
107
108        // Checkpoint 2 remains
109        let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
110        assert_eq!(remaining_checkpoints.len(), 1);
111    }
112}