sui_kvstore/bigtable/
worker.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter, TransactionData};
5use async_trait::async_trait;
6use sui_data_ingestion_core::Worker;
7use sui_types::full_checkpoint_content::CheckpointData;
8
9pub struct KvWorker {
10    pub client: BigTableClient,
11}
12
13#[async_trait]
14impl Worker for KvWorker {
15    type Result = ();
16
17    async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
18        let mut client = self.client.clone();
19        let mut objects = vec![];
20        let mut transactions = vec![];
21        for transaction in &checkpoint.transactions {
22            let full_transaction = TransactionData {
23                transaction: transaction.transaction.clone(),
24                effects: transaction.effects.clone(),
25                events: transaction.events.clone(),
26                checkpoint_number: checkpoint.checkpoint_summary.sequence_number,
27                timestamp: checkpoint.checkpoint_summary.timestamp_ms,
28            };
29            for object in &transaction.output_objects {
30                objects.push(object);
31            }
32            transactions.push(full_transaction);
33        }
34        client
35            .save_objects(&objects, checkpoint.checkpoint_summary.timestamp_ms)
36            .await?;
37        client.save_transactions(&transactions).await?;
38        client.save_checkpoint(checkpoint).await?;
39        if let Some(epoch_info) = checkpoint.epoch_info()? {
40            if epoch_info.epoch > 0
41                && let Some(mut prev) = client.get_epoch(epoch_info.epoch - 1).await?
42            {
43                prev.end_checkpoint = epoch_info.start_checkpoint.map(|sq| sq - 1);
44                prev.end_timestamp_ms = epoch_info.start_timestamp_ms;
45                client.save_epoch(prev).await?;
46            }
47            client.save_epoch(epoch_info).await?;
48        }
49        Ok(())
50    }
51}