sui_data_ingestion/
progress_store.rs1use anyhow::Result;
5use async_trait::async_trait;
6use aws_config::timeout::TimeoutConfig;
7use aws_sdk_dynamodb::Client;
8use aws_sdk_dynamodb::config::{Credentials, Region};
9use aws_sdk_dynamodb::error::SdkError;
10use aws_sdk_dynamodb::types::AttributeValue;
11use std::str::FromStr;
12use std::time::Duration;
13use sui_data_ingestion_core::ProgressStore;
14use sui_kvstore::BigTableProgressStore;
15use sui_types::messages_checkpoint::CheckpointSequenceNumber;
16
17pub struct DynamoDBProgressStore {
18 client: Client,
19 table_name: String,
20 is_backfill: bool,
21 bigtable_store: Option<BigTableProgressStore>,
22}
23
24impl DynamoDBProgressStore {
25 pub async fn new(
26 aws_access_key_id: &str,
27 aws_secret_access_key: &str,
28 aws_region: String,
29 table_name: String,
30 is_backfill: bool,
31 bigtable_store: Option<BigTableProgressStore>,
32 ) -> Self {
33 let credentials = Credentials::new(
34 aws_access_key_id,
35 aws_secret_access_key,
36 None,
37 None,
38 "dynamodb",
39 );
40 let timeout_config = TimeoutConfig::builder()
41 .operation_timeout(Duration::from_secs(3))
42 .operation_attempt_timeout(Duration::from_secs(10))
43 .connect_timeout(Duration::from_secs(3))
44 .build();
45 let aws_config = aws_config::from_env()
46 .credentials_provider(credentials)
47 .region(Region::new(aws_region))
48 .timeout_config(timeout_config)
49 .load()
50 .await;
51 let client = Client::new(&aws_config);
52 Self {
53 client,
54 table_name,
55 is_backfill,
56 bigtable_store,
57 }
58 }
59}
60
61#[async_trait]
62impl ProgressStore for DynamoDBProgressStore {
63 async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
64 let item = self
65 .client
66 .get_item()
67 .table_name(self.table_name.clone())
68 .key("task_name", AttributeValue::S(task_name))
69 .send()
70 .await?;
71 if let Some(output) = item.item()
72 && let AttributeValue::N(checkpoint_number) = &output["nstate"]
73 {
74 return Ok(CheckpointSequenceNumber::from_str(checkpoint_number)?);
75 }
76 Ok(0)
77 }
78 async fn save(
79 &mut self,
80 task_name: String,
81 checkpoint_number: CheckpointSequenceNumber,
82 ) -> Result<()> {
83 if self.is_backfill && !checkpoint_number.is_multiple_of(1000) {
84 return Ok(());
85 }
86 if task_name.starts_with("bigtable")
87 && let Some(ref mut bigtable_store) = self.bigtable_store
88 {
89 bigtable_store
90 .save(task_name.clone(), checkpoint_number)
91 .await?;
92 }
93 let backoff = backoff::ExponentialBackoff::default();
94 backoff::future::retry(backoff, || async {
95 let result = self
96 .client
97 .update_item()
98 .table_name(self.table_name.clone())
99 .key("task_name", AttributeValue::S(task_name.clone()))
100 .update_expression("SET #nstate = :newState")
101 .condition_expression("#nstate < :newState")
102 .expression_attribute_names("#nstate", "nstate")
103 .expression_attribute_values(
104 ":newState",
105 AttributeValue::N(checkpoint_number.to_string()),
106 )
107 .send()
108 .await;
109 match result {
110 Ok(_) => Ok(()),
111 Err(SdkError::ServiceError(err))
112 if err.err().is_conditional_check_failed_exception() =>
113 {
114 Ok(())
115 }
116 Err(err) => Err(backoff::Error::transient(err)),
117 }
118 })
119 .await?;
120 Ok(())
121 }
122}