sui_indexer/handlers/
objects_snapshot_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use mysten_metrics::get_metrics;
6use mysten_metrics::metered_channel::Sender;
7use mysten_metrics::spawn_monitored_task;
8use sui_data_ingestion_core::Worker;
9use sui_types::full_checkpoint_content::CheckpointData;
10use tokio_util::sync::CancellationToken;
11use tracing::info;
12
13use crate::config::SnapshotLagConfig;
14use crate::store::PgIndexerStore;
15use crate::types::IndexerResult;
16use crate::{metrics::IndexerMetrics, store::IndexerStore};
17
18use super::checkpoint_handler::CheckpointHandler;
19use super::{CommitterWatermark, ObjectsSnapshotHandlerTables, TransactionObjectChangesToCommit};
20use super::{CommonHandler, Handler};
21
22#[derive(Clone)]
23pub struct ObjectsSnapshotHandler {
24    pub store: PgIndexerStore,
25    pub sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
26    snapshot_config: SnapshotLagConfig,
27    metrics: IndexerMetrics,
28}
29
30pub struct CheckpointObjectChanges {
31    pub checkpoint_sequence_number: u64,
32    pub object_changes: TransactionObjectChangesToCommit,
33}
34
35#[async_trait]
36impl Worker for ObjectsSnapshotHandler {
37    type Result = ();
38    async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
39        let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?;
40        self.sender
41            .send((CommitterWatermark::from(checkpoint), transformed_data))
42            .await?;
43        Ok(())
44    }
45}
46
47#[async_trait]
48impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
49    fn name(&self) -> String {
50        "objects_snapshot_handler".to_string()
51    }
52
53    async fn load(
54        &self,
55        transformed_data: Vec<TransactionObjectChangesToCommit>,
56    ) -> IndexerResult<()> {
57        self.store
58            .persist_objects_snapshot(transformed_data)
59            .await?;
60        Ok(())
61    }
62
63    async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>> {
64        self.store
65            .get_latest_object_snapshot_checkpoint_sequence_number()
66            .await
67    }
68
69    async fn set_watermark_hi(&self, watermark: CommitterWatermark) -> IndexerResult<()> {
70        self.store
71            .update_watermarks_upper_bound::<ObjectsSnapshotHandlerTables>(watermark)
72            .await?;
73
74        self.metrics
75            .latest_object_snapshot_sequence_number
76            .set(watermark.checkpoint_hi_inclusive as i64);
77        Ok(())
78    }
79
80    async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
81        let latest_checkpoint = self.store.get_latest_checkpoint_sequence_number().await?;
82        Ok(latest_checkpoint
83            .map(|seq| seq.saturating_sub(self.snapshot_config.snapshot_min_lag as u64))
84            .unwrap_or_default()) // hold snapshot handler until at least one checkpoint is in DB
85    }
86}
87
88pub async fn start_objects_snapshot_handler(
89    store: PgIndexerStore,
90    metrics: IndexerMetrics,
91    snapshot_config: SnapshotLagConfig,
92    cancel: CancellationToken,
93    start_checkpoint_opt: Option<u64>,
94    end_checkpoint_opt: Option<u64>,
95) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
96    info!("Starting object snapshot handler...");
97
98    let global_metrics = get_metrics().unwrap();
99    let (sender, receiver) = mysten_metrics::metered_channel::channel(
100        600,
101        &global_metrics
102            .channel_inflight
103            .with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
104    );
105
106    let objects_snapshot_handler =
107        ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);
108
109    let next_cp_from_db = objects_snapshot_handler
110        .get_watermark_hi()
111        .await?
112        .map(|cp| cp.saturating_add(1))
113        .unwrap_or_default();
114    let start_checkpoint = start_checkpoint_opt.unwrap_or(next_cp_from_db);
115    let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
116    spawn_monitored_task!(common_handler.start_transform_and_load(
117        receiver,
118        cancel,
119        start_checkpoint,
120        end_checkpoint_opt,
121    ));
122    Ok((objects_snapshot_handler, start_checkpoint))
123}
124
125impl ObjectsSnapshotHandler {
126    pub fn new(
127        store: PgIndexerStore,
128        sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
129        metrics: IndexerMetrics,
130        snapshot_config: SnapshotLagConfig,
131    ) -> ObjectsSnapshotHandler {
132        Self {
133            store,
134            sender,
135            metrics,
136            snapshot_config,
137        }
138    }
139}