sui_data_ingestion_core/progress_store/
file.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::progress_store::ProgressStore;
5use anyhow::Result;
6use async_trait::async_trait;
7use serde_json::{Number, Value};
8use std::path::PathBuf;
9use sui_types::messages_checkpoint::CheckpointSequenceNumber;
10
11pub struct FileProgressStore {
12    path: PathBuf,
13}
14
15impl FileProgressStore {
16    pub fn new(path: PathBuf) -> Self {
17        Self { path }
18    }
19}
20
21#[async_trait]
22impl ProgressStore for FileProgressStore {
23    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
24        let content: Value = serde_json::from_slice(&std::fs::read(self.path.clone())?)?;
25        Ok(content
26            .get(&task_name)
27            .and_then(|v| v.as_u64())
28            .unwrap_or_default())
29    }
30    async fn save(
31        &mut self,
32        task_name: String,
33        checkpoint_number: CheckpointSequenceNumber,
34    ) -> Result<()> {
35        let raw_content = std::fs::read(self.path.clone())?;
36        let mut content: Value = if raw_content.is_empty() {
37            Value::Object(serde_json::Map::new())
38        } else {
39            serde_json::from_slice(&std::fs::read(self.path.clone())?)?
40        };
41        content[task_name] = Value::Number(Number::from(checkpoint_number));
42        std::fs::write(self.path.clone(), serde_json::to_string_pretty(&content)?)?;
43        Ok(())
44    }
45}