sui_data_ingestion/
progress_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Result;
5use async_trait::async_trait;
6use aws_config::timeout::TimeoutConfig;
7use aws_sdk_dynamodb::Client;
8use aws_sdk_dynamodb::config::{Credentials, Region};
9use aws_sdk_dynamodb::error::SdkError;
10use aws_sdk_dynamodb::types::AttributeValue;
11use std::str::FromStr;
12use std::time::Duration;
13use sui_data_ingestion_core::ProgressStore;
14use sui_kvstore::BigTableProgressStore;
15use sui_types::messages_checkpoint::CheckpointSequenceNumber;
16
17pub struct DynamoDBProgressStore {
18    client: Client,
19    table_name: String,
20    is_backfill: bool,
21    bigtable_store: Option<BigTableProgressStore>,
22}
23
24impl DynamoDBProgressStore {
25    pub async fn new(
26        aws_access_key_id: &str,
27        aws_secret_access_key: &str,
28        aws_region: String,
29        table_name: String,
30        is_backfill: bool,
31        bigtable_store: Option<BigTableProgressStore>,
32    ) -> Self {
33        let credentials = Credentials::new(
34            aws_access_key_id,
35            aws_secret_access_key,
36            None,
37            None,
38            "dynamodb",
39        );
40        let timeout_config = TimeoutConfig::builder()
41            .operation_timeout(Duration::from_secs(3))
42            .operation_attempt_timeout(Duration::from_secs(10))
43            .connect_timeout(Duration::from_secs(3))
44            .build();
45        let aws_config = aws_config::from_env()
46            .credentials_provider(credentials)
47            .region(Region::new(aws_region))
48            .timeout_config(timeout_config)
49            .load()
50            .await;
51        let client = Client::new(&aws_config);
52        Self {
53            client,
54            table_name,
55            is_backfill,
56            bigtable_store,
57        }
58    }
59}
60
61#[async_trait]
62impl ProgressStore for DynamoDBProgressStore {
63    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
64        let item = self
65            .client
66            .get_item()
67            .table_name(self.table_name.clone())
68            .key("task_name", AttributeValue::S(task_name))
69            .send()
70            .await?;
71        if let Some(output) = item.item()
72            && let AttributeValue::N(checkpoint_number) = &output["nstate"]
73        {
74            return Ok(CheckpointSequenceNumber::from_str(checkpoint_number)?);
75        }
76        Ok(0)
77    }
78    async fn save(
79        &mut self,
80        task_name: String,
81        checkpoint_number: CheckpointSequenceNumber,
82    ) -> Result<()> {
83        if self.is_backfill && !checkpoint_number.is_multiple_of(1000) {
84            return Ok(());
85        }
86        if task_name.starts_with("bigtable")
87            && let Some(ref mut bigtable_store) = self.bigtable_store
88        {
89            bigtable_store
90                .save(task_name.clone(), checkpoint_number)
91                .await?;
92        }
93        let backoff = backoff::ExponentialBackoff::default();
94        backoff::future::retry(backoff, || async {
95            let result = self
96                .client
97                .update_item()
98                .table_name(self.table_name.clone())
99                .key("task_name", AttributeValue::S(task_name.clone()))
100                .update_expression("SET #nstate = :newState")
101                .condition_expression("#nstate < :newState")
102                .expression_attribute_names("#nstate", "nstate")
103                .expression_attribute_values(
104                    ":newState",
105                    AttributeValue::N(checkpoint_number.to_string()),
106                )
107                .send()
108                .await;
109            match result {
110                Ok(_) => Ok(()),
111                Err(SdkError::ServiceError(err))
112                    if err.err().is_conditional_check_failed_exception() =>
113                {
114                    Ok(())
115                }
116                Err(err) => Err(backoff::Error::transient(err)),
117            }
118        })
119        .await?;
120        Ok(())
121    }
122}