sui_indexer/handlers/
objects_snapshot_handler.rs1use async_trait::async_trait;
5use mysten_metrics::get_metrics;
6use mysten_metrics::metered_channel::Sender;
7use mysten_metrics::spawn_monitored_task;
8use sui_data_ingestion_core::Worker;
9use sui_types::full_checkpoint_content::CheckpointData;
10use tokio_util::sync::CancellationToken;
11use tracing::info;
12
13use crate::config::SnapshotLagConfig;
14use crate::store::PgIndexerStore;
15use crate::types::IndexerResult;
16use crate::{metrics::IndexerMetrics, store::IndexerStore};
17
18use super::checkpoint_handler::CheckpointHandler;
19use super::{CommitterWatermark, ObjectsSnapshotHandlerTables, TransactionObjectChangesToCommit};
20use super::{CommonHandler, Handler};
21
22#[derive(Clone)]
23pub struct ObjectsSnapshotHandler {
24 pub store: PgIndexerStore,
25 pub sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
26 snapshot_config: SnapshotLagConfig,
27 metrics: IndexerMetrics,
28}
29
30pub struct CheckpointObjectChanges {
31 pub checkpoint_sequence_number: u64,
32 pub object_changes: TransactionObjectChangesToCommit,
33}
34
35#[async_trait]
36impl Worker for ObjectsSnapshotHandler {
37 type Result = ();
38 async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
39 let transformed_data = CheckpointHandler::index_objects(checkpoint, &self.metrics).await?;
40 self.sender
41 .send((CommitterWatermark::from(checkpoint), transformed_data))
42 .await?;
43 Ok(())
44 }
45}
46
47#[async_trait]
48impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {
49 fn name(&self) -> String {
50 "objects_snapshot_handler".to_string()
51 }
52
53 async fn load(
54 &self,
55 transformed_data: Vec<TransactionObjectChangesToCommit>,
56 ) -> IndexerResult<()> {
57 self.store
58 .persist_objects_snapshot(transformed_data)
59 .await?;
60 Ok(())
61 }
62
63 async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>> {
64 self.store
65 .get_latest_object_snapshot_checkpoint_sequence_number()
66 .await
67 }
68
69 async fn set_watermark_hi(&self, watermark: CommitterWatermark) -> IndexerResult<()> {
70 self.store
71 .update_watermarks_upper_bound::<ObjectsSnapshotHandlerTables>(watermark)
72 .await?;
73
74 self.metrics
75 .latest_object_snapshot_sequence_number
76 .set(watermark.checkpoint_hi_inclusive as i64);
77 Ok(())
78 }
79
80 async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
81 let latest_checkpoint = self.store.get_latest_checkpoint_sequence_number().await?;
82 Ok(latest_checkpoint
83 .map(|seq| seq.saturating_sub(self.snapshot_config.snapshot_min_lag as u64))
84 .unwrap_or_default()) }
86}
87
88pub async fn start_objects_snapshot_handler(
89 store: PgIndexerStore,
90 metrics: IndexerMetrics,
91 snapshot_config: SnapshotLagConfig,
92 cancel: CancellationToken,
93 start_checkpoint_opt: Option<u64>,
94 end_checkpoint_opt: Option<u64>,
95) -> IndexerResult<(ObjectsSnapshotHandler, u64)> {
96 info!("Starting object snapshot handler...");
97
98 let global_metrics = get_metrics().unwrap();
99 let (sender, receiver) = mysten_metrics::metered_channel::channel(
100 600,
101 &global_metrics
102 .channel_inflight
103 .with_label_values(&["objects_snapshot_handler_checkpoint_data"]),
104 );
105
106 let objects_snapshot_handler =
107 ObjectsSnapshotHandler::new(store.clone(), sender, metrics.clone(), snapshot_config);
108
109 let next_cp_from_db = objects_snapshot_handler
110 .get_watermark_hi()
111 .await?
112 .map(|cp| cp.saturating_add(1))
113 .unwrap_or_default();
114 let start_checkpoint = start_checkpoint_opt.unwrap_or(next_cp_from_db);
115 let common_handler = CommonHandler::new(Box::new(objects_snapshot_handler.clone()));
116 spawn_monitored_task!(common_handler.start_transform_and_load(
117 receiver,
118 cancel,
119 start_checkpoint,
120 end_checkpoint_opt,
121 ));
122 Ok((objects_snapshot_handler, start_checkpoint))
123}
124
125impl ObjectsSnapshotHandler {
126 pub fn new(
127 store: PgIndexerStore,
128 sender: Sender<(CommitterWatermark, TransactionObjectChangesToCommit)>,
129 metrics: IndexerMetrics,
130 snapshot_config: SnapshotLagConfig,
131 ) -> ObjectsSnapshotHandler {
132 Self {
133 store,
134 sender,
135 metrics,
136 snapshot_config,
137 }
138 }
139}