sui_indexer/handlers/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5
6use async_trait::async_trait;
7use futures::{FutureExt, StreamExt};
8
9use serde::{Deserialize, Serialize};
10use sui_types::full_checkpoint_content::CheckpointData;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14    errors::IndexerError,
15    models::{
16        display::StoredDisplay,
17        epoch::{EndOfEpochUpdate, StartOfEpochUpdate},
18        obj_indices::StoredObjectVersion,
19    },
20    types::{
21        EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
22        IndexedPackage, IndexedTransaction, IndexerResult, TxIndex,
23    },
24};
25
26pub mod checkpoint_handler;
27pub mod committer;
28pub mod objects_snapshot_handler;
29pub mod pruner;
30pub mod tx_processor;
31
32pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
33pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000;
34
35#[derive(Debug)]
36pub struct CheckpointDataToCommit {
37    pub checkpoint: IndexedCheckpoint,
38    pub transactions: Vec<IndexedTransaction>,
39    pub events: Vec<IndexedEvent>,
40    pub event_indices: Vec<EventIndex>,
41    pub tx_indices: Vec<TxIndex>,
42    pub display_updates: BTreeMap<String, StoredDisplay>,
43    pub object_changes: TransactionObjectChangesToCommit,
44    pub object_history_changes: TransactionObjectChangesToCommit,
45    pub object_versions: Vec<StoredObjectVersion>,
46    pub packages: Vec<IndexedPackage>,
47    pub epoch: Option<EpochToCommit>,
48}
49
50#[derive(Clone, Debug)]
51pub struct TransactionObjectChangesToCommit {
52    pub changed_objects: Vec<IndexedObject>,
53    pub deleted_objects: Vec<IndexedDeletedObject>,
54}
55
56#[derive(Clone, Debug)]
57pub struct EpochToCommit {
58    pub last_epoch: Option<EndOfEpochUpdate>,
59    pub new_epoch: StartOfEpochUpdate,
60}
61
62impl EpochToCommit {
63    pub fn new_epoch_id(&self) -> u64 {
64        self.new_epoch.epoch as u64
65    }
66
67    pub fn new_epoch_first_checkpoint_id(&self) -> u64 {
68        self.new_epoch.first_checkpoint_id as u64
69    }
70
71    pub fn last_epoch_total_transactions(&self) -> Option<u64> {
72        self.last_epoch
73            .as_ref()
74            .map(|e| e.epoch_total_transactions as u64)
75    }
76
77    pub fn new_epoch_first_tx_sequence_number(&self) -> u64 {
78        self.new_epoch.first_tx_sequence_number as u64
79    }
80}
81
82pub struct CommonHandler<T> {
83    handler: Box<dyn Handler<T>>,
84}
85
86impl<T> CommonHandler<T> {
87    pub fn new(handler: Box<dyn Handler<T>>) -> Self {
88        Self { handler }
89    }
90
91    async fn start_transform_and_load(
92        &self,
93        cp_receiver: mysten_metrics::metered_channel::Receiver<(CommitterWatermark, T)>,
94        cancel: CancellationToken,
95        start_checkpoint: u64,
96        end_checkpoint_opt: Option<u64>,
97    ) -> IndexerResult<()> {
98        let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
99            .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
100            .parse::<usize>()
101            .unwrap();
102        let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(cp_receiver)
103            .ready_chunks(checkpoint_commit_batch_size);
104
105        // Mapping of ordered checkpoint data to ensure that we process them in order. The key is
106        // just the checkpoint sequence number, and the tuple is (CommitterWatermark, T).
107        let mut unprocessed: BTreeMap<u64, (CommitterWatermark, _)> = BTreeMap::new();
108        let mut tuple_batch = vec![];
109        let mut next_cp_to_process = start_checkpoint;
110
111        loop {
112            if cancel.is_cancelled() {
113                return Ok(());
114            }
115
116            // Try to fetch new data tuple from the stream
117            if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT {
118                tracing::info!(
119                    "Unprocessed checkpoint size reached limit {}, skip reading from stream...",
120                    UNPROCESSED_CHECKPOINT_SIZE_LIMIT
121                );
122            } else {
123                // Try to fetch new data tuple from the stream
124                match stream.next().now_or_never() {
125                    Some(Some(tuple_chunk)) => {
126                        if cancel.is_cancelled() {
127                            return Ok(());
128                        }
129                        for tuple in tuple_chunk {
130                            unprocessed.insert(tuple.0.checkpoint_hi_inclusive, tuple);
131                        }
132                    }
133                    Some(None) => break, // Stream has ended
134                    None => {}           // No new data tuple available right now
135                }
136            }
137
138            // Process unprocessed checkpoints, even no new checkpoints from stream
139            let checkpoint_lag_limiter = self.handler.get_max_committable_checkpoint().await?;
140            let max_commitable_cp = std::cmp::min(
141                checkpoint_lag_limiter,
142                end_checkpoint_opt.unwrap_or(u64::MAX),
143            );
144            // Stop pushing to tuple_batch if we've reached the end checkpoint.
145            while next_cp_to_process <= max_commitable_cp {
146                if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
147                    tuple_batch.push(data_tuple);
148                    next_cp_to_process += 1;
149                } else {
150                    break;
151                }
152            }
153
154            if !tuple_batch.is_empty() {
155                let committer_watermark = tuple_batch.last().unwrap().0;
156                let batch = tuple_batch.into_iter().map(|t| t.1).collect();
157                self.handler.load(batch).await.map_err(|e| {
158                    IndexerError::PostgresWriteError(format!(
159                        "Failed to load transformed data into DB for handler {}: {}",
160                        self.handler.name(),
161                        e
162                    ))
163                })?;
164                self.handler.set_watermark_hi(committer_watermark).await?;
165                tuple_batch = vec![];
166            }
167
168            if let Some(end_checkpoint) = end_checkpoint_opt
169                && next_cp_to_process > end_checkpoint
170            {
171                tracing::info!(
172                    "Reached end checkpoint, stopping handler {}...",
173                    self.handler.name()
174                );
175                return Ok(());
176            }
177        }
178        Err(IndexerError::ChannelClosed(format!(
179            "Checkpoint channel is closed unexpectedly for handler {}",
180            self.handler.name()
181        )))
182    }
183}
184
185#[async_trait]
186pub trait Handler<T>: Send + Sync {
187    /// return handler name
188    fn name(&self) -> String;
189
190    /// commit batch of transformed data to DB
191    async fn load(&self, batch: Vec<T>) -> IndexerResult<()>;
192
193    /// read high watermark of the table DB
194    async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>>;
195
196    /// Updates the relevant entries on the `watermarks` table with the full `CommitterWatermark`,
197    /// which tracks the latest epoch, cp, and tx sequence number of the committed batch.
198    async fn set_watermark_hi(&self, watermark: CommitterWatermark) -> IndexerResult<()>;
199
200    /// By default, return u64::MAX, which means no extra waiting is needed before commiting;
201    /// get max committable checkpoint, for handlers that want to wait for some condition before commiting,
202    /// one use-case is the objects snapshot handler,
203    /// which waits for the lag between snapshot and latest checkpoint to reach a certain threshold.
204    async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
205        Ok(u64::MAX)
206    }
207}
208
209/// The indexer writer operates on checkpoint data, which contains information on the current epoch,
210/// checkpoint, and transaction. These three numbers form the watermark upper bound for each
211/// committed table. The reader and pruner are responsible for determining which of the three units
212/// will be used for a particular table.
213#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
214pub struct CommitterWatermark {
215    pub epoch_hi_inclusive: u64,
216    pub checkpoint_hi_inclusive: u64,
217    pub tx_hi: u64,
218}
219
220impl From<&IndexedCheckpoint> for CommitterWatermark {
221    fn from(checkpoint: &IndexedCheckpoint) -> Self {
222        Self {
223            epoch_hi_inclusive: checkpoint.epoch,
224            checkpoint_hi_inclusive: checkpoint.sequence_number,
225            tx_hi: checkpoint.network_total_transactions,
226        }
227    }
228}
229
230impl From<&CheckpointData> for CommitterWatermark {
231    fn from(checkpoint: &CheckpointData) -> Self {
232        Self {
233            epoch_hi_inclusive: checkpoint.checkpoint_summary.epoch,
234            checkpoint_hi_inclusive: checkpoint.checkpoint_summary.sequence_number,
235            tx_hi: checkpoint.checkpoint_summary.network_total_transactions,
236        }
237    }
238}
239
240/// Enum representing tables that the committer handler writes to.
241#[derive(
242    Debug,
243    Eq,
244    PartialEq,
245    strum_macros::Display,
246    strum_macros::EnumString,
247    strum_macros::EnumIter,
248    strum_macros::AsRefStr,
249    Hash,
250    Serialize,
251    Deserialize,
252    Clone,
253)]
254#[strum(serialize_all = "snake_case")]
255#[serde(rename_all = "snake_case")]
256pub enum CommitterTables {
257    // Unpruned tables
258    ChainIdentifier,
259    Display,
260    Epochs,
261    FeatureFlags,
262    FullObjectsHistory,
263    Objects,
264    ObjectsVersion,
265    Packages,
266    ProtocolConfigs,
267    RawCheckpoints,
268
269    // Prunable tables
270    ObjectsHistory,
271    Transactions,
272    Events,
273
274    EventEmitPackage,
275    EventEmitModule,
276    EventSenders,
277    EventStructInstantiation,
278    EventStructModule,
279    EventStructName,
280    EventStructPackage,
281
282    TxAffectedAddresses,
283    TxAffectedObjects,
284    TxCallsPkg,
285    TxCallsMod,
286    TxCallsFun,
287    TxChangedObjects,
288    TxDigests,
289    TxInputObjects,
290    TxKinds,
291
292    Checkpoints,
293    PrunerCpWatermark,
294}
295
296/// Enum representing tables that the objects snapshot handler writes to.
297#[derive(
298    Debug,
299    Eq,
300    PartialEq,
301    strum_macros::Display,
302    strum_macros::EnumString,
303    strum_macros::EnumIter,
304    strum_macros::AsRefStr,
305    Hash,
306    Serialize,
307    Deserialize,
308    Clone,
309)]
310#[strum(serialize_all = "snake_case")]
311#[serde(rename_all = "snake_case")]
312pub enum ObjectsSnapshotHandlerTables {
313    ObjectsSnapshot,
314}