sui_indexer/handlers/
objects_snapshot_handler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use mysten_metrics::get_metrics;
use mysten_metrics::metered_channel::Sender;
use mysten_metrics::spawn_monitored_task;
use sui_data_ingestion_core::Worker;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::config::SnapshotLagConfig;
use crate::store::PgIndexerStore;
use crate::types::IndexerResult;
use crate::{metrics::IndexerMetrics, store::IndexerStore};

use super::checkpoint_handler::CheckpointHandler;
use super::{CommitterWatermark, ObjectsSnapshotHandlerTables, TransactionObjectChangesToCommit};
use super::{CommonHandler, Handler};

#[derive(Clone)]
pub struct ObjectsSnapshotHandler {
    pub store: PgIndexerStore,
    pub sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
    snapshot_config: SnapshotLagConfig,
    metrics: IndexerMetrics,
}

pub struct CheckpointObjectChanges {
    pub checkpoint_sequence_number: u64,
    pub object_changes: TransactionObjectChangesToCommit,
}

#[async_trait]
impl Worker for ObjectsSnapshotHandler {
    type Result = ();
    async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
        let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?;
        self.sender
            .send((CommitterWatermark::from(checkpoint), transformed_data))
            .await?;
        Ok(())
    }
}

#[async_trait]
impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
    fn name(&self) -> String {
        "objects_snapshot_handler".to_string()
    }

    async fn load(
        &self,
        transformed_data: Vec<TransactionObjectChangesToCommit>,
    ) -> IndexerResult<()> {
        self.store
            .persist_objects_snapshot(transformed_data)
            .await?;
        Ok(())
    }

    async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>> {
        self.store
            .get_latest_object_snapshot_checkpoint_sequence_number()
            .await
    }

    async fn set_watermark_hi(&self, watermark: CommitterWatermark) -> IndexerResult<()> {
        self.store
            .update_watermarks_upper_bound::<ObjectsSnapshotHandlerTables>(watermark)
            .await?;

        self.metrics
            .latest_object_snapshot_sequence_number
            .set(watermark.checkpoint_hi_inclusive as i64);
        Ok(())
    }

    async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
        let latest_checkpoint = self.store.get_latest_checkpoint_sequence_number().await?;
        Ok(latest_checkpoint
            .map(|seq| seq.saturating_sub(self.snapshot_config.snapshot_min_lag as u64))
            .unwrap_or_default()) // hold snapshot handler until at least one checkpoint is in DB
    }
}

pub async fn start_objects_snapshot_handler(
    store: PgIndexerStore,
    metrics: IndexerMetrics,
    snapshot_config: SnapshotLagConfig,
    cancel: CancellationToken,
    start_checkpoint_opt: Option<u64>,
    end_checkpoint_opt: Option<u64>,
) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
    info!("Starting object snapshot handler...");

    let global_metrics = get_metrics().unwrap();
    let (sender, receiver) = mysten_metrics::metered_channel::channel(
        600,
        &global_metrics
            .channel_inflight
            .with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
    );

    let objects_snapshot_handler =
        ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);

    let next_cp_from_db = objects_snapshot_handler
        .get_watermark_hi()
        .await?
        .map(|cp| cp.saturating_add(1))
        .unwrap_or_default();
    let start_checkpoint = start_checkpoint_opt.unwrap_or(next_cp_from_db);
    let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
    spawn_monitored_task!(common_handler.start_transform_and_load(
        receiver,
        cancel,
        start_checkpoint,
        end_checkpoint_opt,
    ));
    Ok((objects_snapshot_handler, start_checkpoint))
}

impl ObjectsSnapshotHandler {
    pub fn new(
        store: PgIndexerStore,
        sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
        metrics: IndexerMetrics,
        snapshot_config: SnapshotLagConfig,
    ) -> ObjectsSnapshotHandler {
        Self {
            store,
            sender,
            metrics,
            snapshot_config,
        }
    }
}