1use std::collections::BTreeMap;
5
6use async_trait::async_trait;
7use futures::{FutureExt, StreamExt};
8
9use serde::{Deserialize, Serialize};
10use sui_types::full_checkpoint_content::CheckpointData;
11use tokio_util::sync::CancellationToken;
12
13use crate::{
14 errors::IndexerError,
15 models::{
16 display::StoredDisplay,
17 epoch::{EndOfEpochUpdate, StartOfEpochUpdate},
18 obj_indices::StoredObjectVersion,
19 },
20 types::{
21 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
22 IndexedPackage, IndexedTransaction, IndexerResult, TxIndex,
23 },
24};
25
26pub mod checkpoint_handler;
27pub mod committer;
28pub mod objects_snapshot_handler;
29pub mod pruner;
30pub mod tx_processor;
31
32pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
33pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000;
34
35#[derive(Debug)]
36pub struct CheckpointDataToCommit {
37 pub checkpoint: IndexedCheckpoint,
38 pub transactions: Vec<IndexedTransaction>,
39 pub events: Vec<IndexedEvent>,
40 pub event_indices: Vec<EventIndex>,
41 pub tx_indices: Vec<TxIndex>,
42 pub display_updates: BTreeMap<String, StoredDisplay>,
43 pub object_changes: TransactionObjectChangesToCommit,
44 pub object_history_changes: TransactionObjectChangesToCommit,
45 pub object_versions: Vec<StoredObjectVersion>,
46 pub packages: Vec<IndexedPackage>,
47 pub epoch: Option<EpochToCommit>,
48}
49
50#[derive(Clone, Debug)]
51pub struct TransactionObjectChangesToCommit {
52 pub changed_objects: Vec<IndexedObject>,
53 pub deleted_objects: Vec<IndexedDeletedObject>,
54}
55
56#[derive(Clone, Debug)]
57pub struct EpochToCommit {
58 pub last_epoch: Option<EndOfEpochUpdate>,
59 pub new_epoch: StartOfEpochUpdate,
60}
61
62impl EpochToCommit {
63 pub fn new_epoch_id(&self) -> u64 {
64 self.new_epoch.epoch as u64
65 }
66
67 pub fn new_epoch_first_checkpoint_id(&self) -> u64 {
68 self.new_epoch.first_checkpoint_id as u64
69 }
70
71 pub fn last_epoch_total_transactions(&self) -> Option<u64> {
72 self.last_epoch
73 .as_ref()
74 .map(|e| e.epoch_total_transactions as u64)
75 }
76
77 pub fn new_epoch_first_tx_sequence_number(&self) -> u64 {
78 self.new_epoch.first_tx_sequence_number as u64
79 }
80}
81
82pub struct CommonHandler<T> {
83 handler: Box<dyn Handler<T>>,
84}
85
86impl<T> CommonHandler<T> {
87 pub fn new(handler: Box<dyn Handler<T>>) -> Self {
88 Self { handler }
89 }
90
91 async fn start_transform_and_load(
92 &self,
93 cp_receiver: mysten_metrics::metered_channel::Receiver<(CommitterWatermark, T)>,
94 cancel: CancellationToken,
95 start_checkpoint: u64,
96 end_checkpoint_opt: Option<u64>,
97 ) -> IndexerResult<()> {
98 let checkpoint_commit_batch_size = std::env::var("CHECKPOINT_COMMIT_BATCH_SIZE")
99 .unwrap_or(CHECKPOINT_COMMIT_BATCH_SIZE.to_string())
100 .parse::<usize>()
101 .unwrap();
102 let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(cp_receiver)
103 .ready_chunks(checkpoint_commit_batch_size);
104
105 let mut unprocessed: BTreeMap<u64, (CommitterWatermark, _)> = BTreeMap::new();
108 let mut tuple_batch = vec![];
109 let mut next_cp_to_process = start_checkpoint;
110
111 loop {
112 if cancel.is_cancelled() {
113 return Ok(());
114 }
115
116 if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT {
118 tracing::info!(
119 "Unprocessed checkpoint size reached limit {}, skip reading from stream...",
120 UNPROCESSED_CHECKPOINT_SIZE_LIMIT
121 );
122 } else {
123 match stream.next().now_or_never() {
125 Some(Some(tuple_chunk)) => {
126 if cancel.is_cancelled() {
127 return Ok(());
128 }
129 for tuple in tuple_chunk {
130 unprocessed.insert(tuple.0.checkpoint_hi_inclusive, tuple);
131 }
132 }
133 Some(None) => break, None => {} }
136 }
137
138 let checkpoint_lag_limiter = self.handler.get_max_committable_checkpoint().await?;
140 let max_commitable_cp = std::cmp::min(
141 checkpoint_lag_limiter,
142 end_checkpoint_opt.unwrap_or(u64::MAX),
143 );
144 while next_cp_to_process <= max_commitable_cp {
146 if let Some(data_tuple) = unprocessed.remove(&next_cp_to_process) {
147 tuple_batch.push(data_tuple);
148 next_cp_to_process += 1;
149 } else {
150 break;
151 }
152 }
153
154 if !tuple_batch.is_empty() {
155 let committer_watermark = tuple_batch.last().unwrap().0;
156 let batch = tuple_batch.into_iter().map(|t| t.1).collect();
157 self.handler.load(batch).await.map_err(|e| {
158 IndexerError::PostgresWriteError(format!(
159 "Failed to load transformed data into DB for handler {}: {}",
160 self.handler.name(),
161 e
162 ))
163 })?;
164 self.handler.set_watermark_hi(committer_watermark).await?;
165 tuple_batch = vec![];
166 }
167
168 if let Some(end_checkpoint) = end_checkpoint_opt
169 && next_cp_to_process > end_checkpoint
170 {
171 tracing::info!(
172 "Reached end checkpoint, stopping handler {}...",
173 self.handler.name()
174 );
175 return Ok(());
176 }
177 }
178 Err(IndexerError::ChannelClosed(format!(
179 "Checkpoint channel is closed unexpectedly for handler {}",
180 self.handler.name()
181 )))
182 }
183}
184
185#[async_trait]
186pub trait Handler<T>: Send + Sync {
187 fn name(&self) -> String;
189
190 async fn load(&self, batch: Vec<T>) -> IndexerResult<()>;
192
193 async fn get_watermark_hi(&self) -> IndexerResult<Option<u64>>;
195
196 async fn set_watermark_hi(&self, watermark: CommitterWatermark) -> IndexerResult<()>;
199
200 async fn get_max_committable_checkpoint(&self) -> IndexerResult<u64> {
205 Ok(u64::MAX)
206 }
207}
208
209#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
214pub struct CommitterWatermark {
215 pub epoch_hi_inclusive: u64,
216 pub checkpoint_hi_inclusive: u64,
217 pub tx_hi: u64,
218}
219
220impl From<&IndexedCheckpoint> for CommitterWatermark {
221 fn from(checkpoint: &IndexedCheckpoint) -> Self {
222 Self {
223 epoch_hi_inclusive: checkpoint.epoch,
224 checkpoint_hi_inclusive: checkpoint.sequence_number,
225 tx_hi: checkpoint.network_total_transactions,
226 }
227 }
228}
229
230impl From<&CheckpointData> for CommitterWatermark {
231 fn from(checkpoint: &CheckpointData) -> Self {
232 Self {
233 epoch_hi_inclusive: checkpoint.checkpoint_summary.epoch,
234 checkpoint_hi_inclusive: checkpoint.checkpoint_summary.sequence_number,
235 tx_hi: checkpoint.checkpoint_summary.network_total_transactions,
236 }
237 }
238}
239
240#[derive(
242 Debug,
243 Eq,
244 PartialEq,
245 strum_macros::Display,
246 strum_macros::EnumString,
247 strum_macros::EnumIter,
248 strum_macros::AsRefStr,
249 Hash,
250 Serialize,
251 Deserialize,
252 Clone,
253)]
254#[strum(serialize_all = "snake_case")]
255#[serde(rename_all = "snake_case")]
256pub enum CommitterTables {
257 ChainIdentifier,
259 Display,
260 Epochs,
261 FeatureFlags,
262 FullObjectsHistory,
263 Objects,
264 ObjectsVersion,
265 Packages,
266 ProtocolConfigs,
267 RawCheckpoints,
268
269 ObjectsHistory,
271 Transactions,
272 Events,
273
274 EventEmitPackage,
275 EventEmitModule,
276 EventSenders,
277 EventStructInstantiation,
278 EventStructModule,
279 EventStructName,
280 EventStructPackage,
281
282 TxAffectedAddresses,
283 TxAffectedObjects,
284 TxCallsPkg,
285 TxCallsMod,
286 TxCallsFun,
287 TxChangedObjects,
288 TxDigests,
289 TxInputObjects,
290 TxKinds,
291
292 Checkpoints,
293 PrunerCpWatermark,
294}
295
296#[derive(
298 Debug,
299 Eq,
300 PartialEq,
301 strum_macros::Display,
302 strum_macros::EnumString,
303 strum_macros::EnumIter,
304 strum_macros::AsRefStr,
305 Hash,
306 Serialize,
307 Deserialize,
308 Clone,
309)]
310#[strum(serialize_all = "snake_case")]
311#[serde(rename_all = "snake_case")]
312pub enum ObjectsSnapshotHandlerTables {
313 ObjectsSnapshot,
314}