sui_indexer_alt_framework/ingestion/
local_client.rs1use async_trait::async_trait;
5use axum::body::Bytes;
6use std::path::PathBuf;
7
8use crate::ingestion::ingestion_client::FetchData;
9use crate::ingestion::ingestion_client::FetchError;
10use crate::ingestion::ingestion_client::FetchResult;
11use crate::ingestion::ingestion_client::IngestionClientTrait;
12
13pub struct LocalIngestionClient {
16 path: PathBuf,
17}
18
19impl LocalIngestionClient {
20 pub fn new(path: PathBuf) -> Self {
21 LocalIngestionClient { path }
22 }
23}
24
25#[async_trait]
26impl IngestionClientTrait for LocalIngestionClient {
27 async fn fetch(&self, checkpoint: u64) -> FetchResult {
28 let path = self.path.join(format!("{}.chk", checkpoint));
29 let bytes = tokio::fs::read(path).await.map_err(|e| {
30 if e.kind() == std::io::ErrorKind::NotFound {
31 FetchError::NotFound
32 } else {
33 FetchError::Transient {
34 reason: "io_error",
35 error: e.into(),
36 }
37 }
38 })?;
39 Ok(FetchData::Raw(Bytes::from(bytes)))
40 }
41}
42
43#[cfg(test)]
44pub(crate) mod tests {
45 use crate::ingestion::ingestion_client::IngestionClient;
46 use crate::ingestion::test_utils::test_checkpoint_data;
47 use crate::metrics::tests::test_ingestion_metrics;
48 use sui_storage::blob::Blob;
49 use sui_storage::blob::BlobEncoding;
50
51 #[tokio::test]
52 async fn local_test_fetch() {
53 let tempdir = tempfile::tempdir().unwrap().keep();
54 let path = tempdir.join("1.chk");
55 let test_checkpoint_data = test_checkpoint_data(1);
56 tokio::fs::write(&path, &test_checkpoint_data)
57 .await
58 .unwrap();
59
60 let local_client = IngestionClient::new_local(tempdir, test_ingestion_metrics());
61 let checkpoint = local_client.fetch(1).await.unwrap();
62
63 let checkpoint: crate::types::full_checkpoint_content::CheckpointData =
65 (*checkpoint).clone().into();
66 assert_eq!(
67 Blob::encode(&checkpoint, BlobEncoding::Bcs)
68 .unwrap()
69 .to_bytes(),
70 test_checkpoint_data
71 );
72 }
73}