sui_indexer_alt_framework/ingestion/
local_client.rs1use async_trait::async_trait;
5use axum::body::Bytes;
6use std::path::PathBuf;
7
8use crate::ingestion::ingestion_client::{
9 FetchData, FetchError, FetchResult, IngestionClientTrait,
10};
11
12pub struct LocalIngestionClient {
15 path: PathBuf,
16}
17
18impl LocalIngestionClient {
19 pub fn new(path: PathBuf) -> Self {
20 LocalIngestionClient { path }
21 }
22}
23
24#[async_trait]
25impl IngestionClientTrait for LocalIngestionClient {
26 async fn fetch(&self, checkpoint: u64) -> FetchResult {
27 let path = self.path.join(format!("{}.chk", checkpoint));
28 let bytes = tokio::fs::read(path).await.map_err(|e| {
29 if e.kind() == std::io::ErrorKind::NotFound {
30 FetchError::NotFound
31 } else {
32 FetchError::Transient {
33 reason: "io_error",
34 error: e.into(),
35 }
36 }
37 })?;
38 Ok(FetchData::Raw(Bytes::from(bytes)))
39 }
40}
41
42#[cfg(test)]
43pub(crate) mod tests {
44 use crate::ingestion::ingestion_client::IngestionClient;
45 use crate::ingestion::test_utils::test_checkpoint_data;
46 use crate::metrics::tests::test_ingestion_metrics;
47 use sui_storage::blob::{Blob, BlobEncoding};
48
49 #[tokio::test]
50 async fn local_test_fetch() {
51 let tempdir = tempfile::tempdir().unwrap().keep();
52 let path = tempdir.join("1.chk");
53 let test_checkpoint_data = test_checkpoint_data(1);
54 tokio::fs::write(&path, &test_checkpoint_data)
55 .await
56 .unwrap();
57
58 let local_client = IngestionClient::new_local(tempdir, test_ingestion_metrics());
59 let checkpoint = local_client.fetch(1).await.unwrap();
60
61 let checkpoint: crate::types::full_checkpoint_content::CheckpointData =
63 (*checkpoint).clone().into();
64 assert_eq!(
65 Blob::encode(&checkpoint, BlobEncoding::Bcs)
66 .unwrap()
67 .to_bytes(),
68 test_checkpoint_data
69 );
70 }
71}