sui_indexer_alt_framework/ingestion/
local_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use axum::body::Bytes;
6use std::path::PathBuf;
7
8use crate::ingestion::ingestion_client::{
9    FetchData, FetchError, FetchResult, IngestionClientTrait,
10};
11
12// FIXME: To productionize this, we need to add garbage collection to remove old checkpoint files.
13
14pub struct LocalIngestionClient {
15    path: PathBuf,
16}
17
18impl LocalIngestionClient {
19    pub fn new(path: PathBuf) -> Self {
20        LocalIngestionClient { path }
21    }
22}
23
24#[async_trait]
25impl IngestionClientTrait for LocalIngestionClient {
26    async fn fetch(&self, checkpoint: u64) -> FetchResult {
27        let path = self.path.join(format!("{}.chk", checkpoint));
28        let bytes = tokio::fs::read(path).await.map_err(|e| {
29            if e.kind() == std::io::ErrorKind::NotFound {
30                FetchError::NotFound
31            } else {
32                FetchError::Transient {
33                    reason: "io_error",
34                    error: e.into(),
35                }
36            }
37        })?;
38        Ok(FetchData::Raw(Bytes::from(bytes)))
39    }
40}
41
42#[cfg(test)]
43pub(crate) mod tests {
44    use crate::ingestion::ingestion_client::IngestionClient;
45    use crate::ingestion::test_utils::test_checkpoint_data;
46    use crate::metrics::tests::test_ingestion_metrics;
47    use sui_storage::blob::{Blob, BlobEncoding};
48
49    #[tokio::test]
50    async fn local_test_fetch() {
51        let tempdir = tempfile::tempdir().unwrap().keep();
52        let path = tempdir.join("1.chk");
53        let test_checkpoint_data = test_checkpoint_data(1);
54        tokio::fs::write(&path, &test_checkpoint_data)
55            .await
56            .unwrap();
57
58        let local_client = IngestionClient::new_local(tempdir, test_ingestion_metrics());
59        let checkpoint = local_client.fetch(1).await.unwrap();
60
61        // Convert checkpoint back to CheckpointData for serialization comparison
62        let checkpoint: crate::types::full_checkpoint_content::CheckpointData =
63            (*checkpoint).clone().into();
64        assert_eq!(
65            Blob::encode(&checkpoint, BlobEncoding::Bcs)
66                .unwrap()
67                .to_bytes(),
68            test_checkpoint_data
69        );
70    }
71}