sui_core/
rpc_store_ingestion_client.rs1use async_trait::async_trait;
17use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointError;
18use sui_indexer_alt_framework::ingestion::ingestion_client::CheckpointResult;
19use sui_indexer_alt_framework::ingestion::ingestion_client::IngestionClientTrait;
20use sui_types::digests::ChainIdentifier;
21use sui_types::storage::ReadStore;
22
23pub struct PerpetualStoreIngestionClient<R> {
33 store: R,
34 chain_id: ChainIdentifier,
35}
36
37impl<R> PerpetualStoreIngestionClient<R> {
38 pub fn new(store: R, chain_id: ChainIdentifier) -> Self {
39 Self { store, chain_id }
40 }
41}
42
43#[async_trait]
44impl<R> IngestionClientTrait for PerpetualStoreIngestionClient<R>
45where
46 R: ReadStore + Send + Sync + 'static,
47{
48 async fn chain_id(&self) -> anyhow::Result<ChainIdentifier> {
49 Ok(self.chain_id)
50 }
51
52 async fn checkpoint(&self, checkpoint: u64) -> CheckpointResult {
53 let lowest = self
58 .store
59 .get_lowest_available_checkpoint()
60 .map_err(|e| CheckpointError::Fetch(e.into()))?;
61 if checkpoint < lowest {
62 tracing::error!(
63 "Indexer requested checkpont below the perpetual store's pruning watermark"
64 );
65 }
66
67 let summary = self
68 .store
69 .get_checkpoint_by_sequence_number(checkpoint)
70 .ok_or(CheckpointError::NotFound)?;
71 let contents = self
72 .store
73 .get_checkpoint_contents_by_digest(&summary.content_digest)
74 .ok_or(CheckpointError::NotFound)?;
75 self.store
76 .get_checkpoint_data(summary, contents)
77 .map_err(|e| CheckpointError::Fetch(anyhow::Error::from(e)))
78 }
79
80 async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
81 Ok(self.store.get_latest_checkpoint_sequence_number()?)
82 }
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::rpc_store_test_utils::store_with;
89 use crate::rpc_store_test_utils::test_chain_id;
90
91 #[tokio::test]
92 async fn chain_id_returns_configured_value() {
93 let client = PerpetualStoreIngestionClient::new(store_with([]), test_chain_id());
94 assert_eq!(client.chain_id().await.unwrap(), test_chain_id());
95 }
96
97 #[tokio::test]
98 async fn checkpoint_missing_is_not_found() {
99 let client = PerpetualStoreIngestionClient::new(store_with([1, 2]), test_chain_id());
100 assert!(matches!(
101 client.checkpoint(5).await,
102 Err(CheckpointError::NotFound)
103 ));
104 }
105
106 #[tokio::test]
107 async fn checkpoint_present_round_trips() {
108 let client = PerpetualStoreIngestionClient::new(store_with([0, 1, 2]), test_chain_id());
109 let cp = client.checkpoint(1).await.unwrap();
110 assert_eq!(*cp.summary.sequence_number(), 1);
111 }
112
113 #[tokio::test]
114 async fn checkpoint_summary_without_contents_is_not_found() {
115 let mut store = store_with([3]);
116 store.drop_contents_for = Some(3);
117 let client = PerpetualStoreIngestionClient::new(store, test_chain_id());
118 assert!(matches!(
119 client.checkpoint(3).await,
120 Err(CheckpointError::NotFound)
121 ));
122 }
123
124 #[tokio::test]
125 async fn latest_checkpoint_number_is_highest_executed() {
126 let client = PerpetualStoreIngestionClient::new(store_with([0, 4, 9]), test_chain_id());
127 assert_eq!(client.latest_checkpoint_number().await.unwrap(), 9);
128 }
129}