sui_kvstore/bigtable/
worker.rs1use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter, TransactionData};
5use async_trait::async_trait;
6use sui_data_ingestion_core::Worker;
7use sui_types::full_checkpoint_content::CheckpointData;
8
9pub struct KvWorker {
10 pub client: BigTableClient,
11}
12
13#[async_trait]
14impl Worker for KvWorker {
15 type Result = ();
16
17 async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
18 let mut client = self.client.clone();
19 let mut objects = vec![];
20 let mut transactions = vec![];
21 for transaction in &checkpoint.transactions {
22 let full_transaction = TransactionData {
23 transaction: transaction.transaction.clone(),
24 effects: transaction.effects.clone(),
25 events: transaction.events.clone(),
26 checkpoint_number: checkpoint.checkpoint_summary.sequence_number,
27 timestamp: checkpoint.checkpoint_summary.timestamp_ms,
28 };
29 for object in &transaction.output_objects {
30 objects.push(object);
31 }
32 transactions.push(full_transaction);
33 }
34 client
35 .save_objects(&objects, checkpoint.checkpoint_summary.timestamp_ms)
36 .await?;
37 client.save_transactions(&transactions).await?;
38 client.save_checkpoint(checkpoint).await?;
39 if let Some(epoch_info) = checkpoint.epoch_info()? {
40 if epoch_info.epoch > 0
41 && let Some(mut prev) = client.get_epoch(epoch_info.epoch - 1).await?
42 {
43 prev.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
44 prev.end_timestamp_ms = epoch_info.start_timestamp_ms;
45 client.save_epoch(prev).await?;
46 }
47 client.save_epoch(epoch_info).await?;
48 }
49 Ok(())
50 }
51}