sui_data_ingestion/
progress_store.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use async_trait::async_trait;
use aws_config::timeout::TimeoutConfig;
use aws_sdk_dynamodb::config::{Credentials, Region};
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::Client;
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::BigTableProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
    client: Client,
    table_name: String,
    is_backfill: bool,
    bigtable_store: Option<BigTableProgressStore>,
}

impl DynamoDBProgressStore {
    pub async fn new(
        aws_access_key_id: &str,
        aws_secret_access_key: &str,
        aws_region: String,
        table_name: String,
        is_backfill: bool,
        bigtable_store: Option<BigTableProgressStore>,
    ) -> Self {
        let credentials = Credentials::new(
            aws_access_key_id,
            aws_secret_access_key,
            None,
            None,
            "dynamodb",
        );
        let timeout_config = TimeoutConfig::builder()
            .operation_timeout(Duration::from_secs(3))
            .operation_attempt_timeout(Duration::from_secs(10))
            .connect_timeout(Duration::from_secs(3))
            .build();
        let aws_config = aws_config::from_env()
            .credentials_provider(credentials)
            .region(Region::new(aws_region))
            .timeout_config(timeout_config)
            .load()
            .await;
        let client = Client::new(&aws_config);
        Self {
            client,
            table_name,
            is_backfill,
            bigtable_store,
        }
    }
}

#[async_trait]
impl ProgressStore for DynamoDBProgressStore {
    async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
        let item = self
            .client
            .get_item()
            .table_name(self.table_name.clone())
            .key("task_name", AttributeValue::S(task_name))
            .send()
            .await?;
        if let Some(output) = item.item() {
            if let AttributeValue::N(checkpoint_number) = &output["nstate"] {
                return Ok(CheckpointSequenceNumber::from_str(checkpoint_number)?);
            }
        }
        Ok(0)
    }
    async fn save(
        &mut self,
        task_name: String,
        checkpoint_number: CheckpointSequenceNumber,
    ) -> Result<()> {
        if self.is_backfill && checkpoint_number % 1000 != 0 {
            return Ok(());
        }
        if task_name == "bigtable" {
            if let Some(ref mut bigtable_store) = self.bigtable_store {
                bigtable_store
                    .save(task_name.clone(), checkpoint_number)
                    .await?;
            }
        }
        let backoff = backoff::ExponentialBackoff::default();
        backoff::future::retry(backoff, || async {
            let result = self
                .client
                .update_item()
                .table_name(self.table_name.clone())
                .key("task_name", AttributeValue::S(task_name.clone()))
                .update_expression("SET #nstate = :newState")
                .condition_expression("#nstate < :newState")
                .expression_attribute_names("#nstate", "nstate")
                .expression_attribute_values(
                    ":newState",
                    AttributeValue::N(checkpoint_number.to_string()),
                )
                .send()
                .await;
            match result {
                Ok(_) => Ok(()),
                Err(SdkError::ServiceError(err))
                    if err.err().is_conditional_check_failed_exception() =>
                {
                    Ok(())
                }
                Err(err) => Err(backoff::Error::transient(err)),
            }
        })
        .await?;
        Ok(())
    }
}