sui_indexer_alt_framework/ingestion/
local_client.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use axum::body::Bytes;
6use std::path::PathBuf;
7
8use crate::ingestion::ingestion_client::FetchData;
9use crate::ingestion::ingestion_client::FetchError;
10use crate::ingestion::ingestion_client::FetchResult;
11use crate::ingestion::ingestion_client::IngestionClientTrait;
12
13// FIXME: To productionize this, we need to add garbage collection to remove old checkpoint files.
14
15pub struct LocalIngestionClient {
16    path: PathBuf,
17}
18
19impl LocalIngestionClient {
20    pub fn new(path: PathBuf) -> Self {
21        LocalIngestionClient { path }
22    }
23}
24
25#[async_trait]
26impl IngestionClientTrait for LocalIngestionClient {
27    async fn fetch(&self, checkpoint: u64) -> FetchResult {
28        let path = self.path.join(format!("{}.chk", checkpoint));
29        let bytes = tokio::fs::read(path).await.map_err(|e| {
30            if e.kind() == std::io::ErrorKind::NotFound {
31                FetchError::NotFound
32            } else {
33                FetchError::Transient {
34                    reason: "io_error",
35                    error: e.into(),
36                }
37            }
38        })?;
39        Ok(FetchData::Raw(Bytes::from(bytes)))
40    }
41}
42
43#[cfg(test)]
44pub(crate) mod tests {
45    use crate::ingestion::ingestion_client::IngestionClient;
46    use crate::ingestion::test_utils::test_checkpoint_data;
47    use crate::metrics::tests::test_ingestion_metrics;
48    use sui_storage::blob::Blob;
49    use sui_storage::blob::BlobEncoding;
50
51    #[tokio::test]
52    async fn local_test_fetch() {
53        let tempdir = tempfile::tempdir().unwrap().keep();
54        let path = tempdir.join("1.chk");
55        let test_checkpoint_data = test_checkpoint_data(1);
56        tokio::fs::write(&path, &test_checkpoint_data)
57            .await
58            .unwrap();
59
60        let local_client = IngestionClient::new_local(tempdir, test_ingestion_metrics());
61        let checkpoint = local_client.fetch(1).await.unwrap();
62
63        // Convert checkpoint back to CheckpointData for serialization comparison
64        let checkpoint: crate::types::full_checkpoint_content::CheckpointData =
65            (*checkpoint).clone().into();
66        assert_eq!(
67            Blob::encode(&checkpoint, BlobEncoding::Bcs)
68                .unwrap()
69                .to_bytes(),
70            test_checkpoint_data
71        );
72    }
73}