sui_core/
rpc_store_ingestion_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! [`IngestionClientTrait`] backed by a fullnode's local checkpoint and
5//! perpetual stores.
6//!
7//! The embedded `sui-rpc-store` indexer fetches historical checkpoints by
8//! sequence number -- to backfill the history cohort and to fill gaps the live
9//! broadcast stream drops. On a fullnode every executed checkpoint's data
10//! already lives in the local stores, so this client assembles a [`Checkpoint`]
11//! from them via [`ReadStore::get_checkpoint_data`] rather than fetching from a
12//! remote object store or gRPC endpoint.
13//!
14//! [`Checkpoint`]: sui_types::full_checkpoint_content::Checkpoint
15
16use async_trait::async_trait;
17use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointError;
18use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointResult;
19use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait;
20use sui_types::digests::ChainIdentifier;
21use sui_types::storage::ReadStore;
22
23/// An [`IngestionClientTrait`] over any [`ReadStore`]. In production `R` is the
24/// fullnode's [`RocksDbStore`], which reads the local checkpoint and perpetual
25/// stores; the generic bound keeps it unit-testable against an in-memory store.
26///
27/// `chain_id` is captured at construction rather than derived from the genesis
28/// checkpoint, because genesis may have been pruned away on a long-running
29/// fullnode.
30///
31/// [`RocksDbStore`]: crate::storage::RocksDbStore
32pub struct PerpetualStoreIngestionClient<R> {
33    store: R,
34    chain_id: ChainIdentifier,
35}
36
37impl<R> PerpetualStoreIngestionClient<R> {
38    pub fn new(store: R, chain_id: ChainIdentifier) -> Self {
39        Self { store, chain_id }
40    }
41}
42
43#[async_trait]
44impl<R> IngestionClientTrait for PerpetualStoreIngestionClient<R>
45where
46    R: ReadStore + Send + Sync + 'static,
47{
48    async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
49        Ok(self.chain_id)
50    }
51
52    async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
53        // A checkpoint below the perpetual store's pruning watermark, or above
54        // the highest executed one, is simply absent. Report it as NotFound so
55        // the framework's `wait_for` can retry checkpoints that have not been
56        // executed yet, while pruned checkpoints stay permanently unavailable.
57        let lowest = self
58            .store
59            .get_lowest_available_checkpoint()
60            .map_err(|e| CheckpointError::Fetch(e.into()))?;
61        if checkpoint < lowest {
62            tracing::error!(
63                "Indexer requested checkpont below the perpetual store's pruning watermark"
64            );
65        }
66
67        let summary = self
68            .store
69            .get_checkpoint_by_sequence_number(checkpoint)
70            .ok_or(CheckpointError::NotFound)?;
71        let contents = self
72            .store
73            .get_checkpoint_contents_by_digest(&summary.content_digest)
74            .ok_or(CheckpointError::NotFound)?;
75        self.store
76            .get_checkpoint_data(summary, contents)
77            .map_err(|e| CheckpointError::Fetch(anyhow::Error::from(e)))
78    }
79
80    async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
81        Ok(self.store.get_latest_checkpoint_sequence_number()?)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::rpc_store_test_utils::store_with;
89    use crate::rpc_store_test_utils::test_chain_id;
90
91    #[tokio::test]
92    async fn chain_id_returns_configured_value() {
93        let client = PerpetualStoreIngestionClient::new(store_with([]), test_chain_id());
94        assert_eq!(client.chain_id().await.unwrap(), test_chain_id());
95    }
96
97    #[tokio::test]
98    async fn checkpoint_missing_is_not_found() {
99        let client = PerpetualStoreIngestionClient::new(store_with([1, 2]), test_chain_id());
100        assert!(matches!(
101            client.checkpoint(5).await,
102            Err(CheckpointError::NotFound)
103        ));
104    }
105
106    #[tokio::test]
107    async fn checkpoint_present_round_trips() {
108        let client = PerpetualStoreIngestionClient::new(store_with([0, 1, 2]), test_chain_id());
109        let cp = client.checkpoint(1).await.unwrap();
110        assert_eq!(*cp.summary.sequence_number(), 1);
111    }
112
113    #[tokio::test]
114    async fn checkpoint_summary_without_contents_is_not_found() {
115        let mut store = store_with([3]);
116        store.drop_contents_for = Some(3);
117        let client = PerpetualStoreIngestionClient::new(store, test_chain_id());
118        assert!(matches!(
119            client.checkpoint(3).await,
120            Err(CheckpointError::NotFound)
121        ));
122    }
123
124    #[tokio::test]
125    async fn latest_checkpoint_number_is_highest_executed() {
126        let client = PerpetualStoreIngestionClient::new(store_with([0, 4, 9]), test_chain_id());
127        assert_eq!(client.latest_checkpoint_number().await.unwrap(), 9);
128    }
129}