sui_data_ingestion_core/progress_store/
file.rs1use crate::progress_store::ProgressStore;
5use anyhow::Result;
6use async_trait::async_trait;
7use serde_json::{Number, Value};
8use std::path::PathBuf;
9use sui_types::messages_checkpoint::CheckpointSequenceNumber;
10
11pub struct FileProgressStore {
12 path: PathBuf,
13}
14
15impl FileProgressStore {
16 pub fn new(path: PathBuf) -> Self {
17 Self { path }
18 }
19}
20
21#[async_trait]
22impl ProgressStore for FileProgressStore {
23 async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
24 let content: Value = serde_json::from_slice(&std::fs::read(self.path.clone())?)?;
25 Ok(content
26 .get(&task_name)
27 .and_then(|v| v.as_u64())
28 .unwrap_or_default())
29 }
30 async fn save(
31 &mut self,
32 task_name: String,
33 checkpoint_number: CheckpointSequenceNumber,
34 ) -> Result<()> {
35 let raw_content = std::fs::read(self.path.clone())?;
36 let mut content: Value = if raw_content.is_empty() {
37 Value::Object(serde_json::Map::new())
38 } else {
39 serde_json::from_slice(&std::fs::read(self.path.clone())?)?
40 };
41 content[task_name] = Value::Number(Number::from(checkpoint_number));
42 std::fs::write(self.path.clone(), serde_json::to_string_pretty(&content)?)?;
43 Ok(())
44 }
45}