sui_indexer_alt/handlers/
kv_checkpoints.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
    db::{Connection, Db},
    pipeline::{concurrent::Handler, Processor},
    types::full_checkpoint_content::CheckpointData,
};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};

pub(crate) struct KvCheckpoints;

impl Processor for KvCheckpoints {
    const NAME: &'static str = "kv_checkpoints";

    type Value = StoredCheckpoint;

    fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
        let sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
        let checkpoint_summary = checkpoint.checkpoint_summary.data();
        let signatures = checkpoint.checkpoint_summary.auth_sig();
        Ok(vec![StoredCheckpoint {
            sequence_number,
            checkpoint_contents: bcs::to_bytes(&checkpoint.checkpoint_contents)
                .with_context(|| format!("Serializing checkpoint {sequence_number} contents"))?,
            checkpoint_summary: bcs::to_bytes(checkpoint_summary)
                .with_context(|| format!("Serializing checkpoint {sequence_number} summary"))?,
            validator_signatures: bcs::to_bytes(signatures)
                .with_context(|| format!("Serializing checkpoint {sequence_number} signatures"))?,
        }])
    }
}

#[async_trait::async_trait]
impl Handler for KvCheckpoints {
    type Store = Db;

    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
        Ok(diesel::insert_into(kv_checkpoints::table)
            .values(values)
            .on_conflict_do_nothing()
            .execute(conn)
            .await?)
    }

    async fn prune<'a>(
        &self,
        from: u64,
        to_exclusive: u64,
        conn: &mut Connection<'a>,
    ) -> Result<usize> {
        let filter = kv_checkpoints::table
            .filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));

        Ok(diesel::delete(filter).execute(conn).await?)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use diesel_async::RunQueryDsl;
    use sui_indexer_alt_framework::{
        types::test_checkpoint_data_builder::TestCheckpointDataBuilder, Indexer,
    };
    use sui_indexer_alt_schema::MIGRATIONS;

    async fn get_all_kv_checkpoints(conn: &mut Connection<'_>) -> Result<Vec<StoredCheckpoint>> {
        let query = kv_checkpoints::table.load(conn).await?;
        Ok(query)
    }

    /// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
    /// checkpoint sequence number range.
    #[tokio::test]
    async fn test_kv_checkpoints_pruning() {
        let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
        let mut conn = indexer.db().connect().await.unwrap();

        // Create 3 checkpoints
        let mut builder = TestCheckpointDataBuilder::new(0);
        builder = builder.start_transaction(0).finish_transaction();
        let checkpoint = Arc::new(builder.build_checkpoint());
        let values = KvCheckpoints.process(&checkpoint).unwrap();
        KvCheckpoints::commit(&values, &mut conn).await.unwrap();

        builder = builder.start_transaction(0).finish_transaction();
        let checkpoint = Arc::new(builder.build_checkpoint());
        let values = KvCheckpoints.process(&checkpoint).unwrap();
        KvCheckpoints::commit(&values, &mut conn).await.unwrap();

        builder = builder.start_transaction(0).finish_transaction();
        let checkpoint = Arc::new(builder.build_checkpoint());
        let values = KvCheckpoints.process(&checkpoint).unwrap();
        KvCheckpoints::commit(&values, &mut conn).await.unwrap();

        // Prune checkpoints from `[0, 2)`
        let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
        assert_eq!(rows_pruned, 2);

        // Checkpoint 2 remains
        let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
        assert_eq!(remaining_checkpoints.len(), 1);
    }
}