1use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use itertools::Itertools;
9use sui_types::dynamic_field::DynamicFieldInfo;
10use tokio_util::sync::CancellationToken;
11use tracing::{info, warn};
12
13use move_core_types::language_storage::{StructTag, TypeTag};
14use mysten_metrics::{get_metrics, spawn_monitored_task};
15use sui_data_ingestion_core::Worker;
16use sui_types::dynamic_field::DynamicFieldType;
17use sui_types::effects::{ObjectChange, TransactionEffectsAPI};
18use sui_types::event::SystemEpochInfoEvent;
19use sui_types::full_checkpoint_content::{CheckpointData, CheckpointTransaction};
20use sui_types::messages_checkpoint::{
21 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
22};
23use sui_types::object::Object;
24use sui_types::object::Owner;
25use sui_types::sui_system_state::{SuiSystemStateTrait, get_sui_system_state};
26use sui_types::transaction::TransactionDataAPI;
27
28use crate::errors::IndexerError;
29use crate::handlers::committer::start_tx_checkpoint_commit_task;
30use crate::metrics::IndexerMetrics;
31use crate::models::display::StoredDisplay;
32use crate::models::epoch::{EndOfEpochUpdate, EpochEndInfo, EpochStartInfo, StartOfEpochUpdate};
33use crate::models::obj_indices::StoredObjectVersion;
34use crate::store::{IndexerStore, PgIndexerStore};
35use crate::types::{
36 EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
37 IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
38};
39
40use super::CheckpointDataToCommit;
41use super::EpochToCommit;
42use super::TransactionObjectChangesToCommit;
43use super::tx_processor::EpochEndIndexingObjectStore;
44use super::tx_processor::TxChangesProcessor;
45
46const CHECKPOINT_QUEUE_SIZE: usize = 100;
47
48pub async fn new_handlers(
49 state: PgIndexerStore,
50 metrics: IndexerMetrics,
51 cancel: CancellationToken,
52 start_checkpoint_opt: Option<CheckpointSequenceNumber>,
53 end_checkpoint_opt: Option<CheckpointSequenceNumber>,
54) -> Result<(CheckpointHandler, u64), IndexerError> {
55 let start_checkpoint = match start_checkpoint_opt {
56 Some(start_checkpoint) => start_checkpoint,
57 None => state
58 .get_latest_checkpoint_sequence_number()
59 .await?
60 .map(|seq| seq.saturating_add(1))
61 .unwrap_or_default(),
62 };
63
64 let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
65 .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
66 .parse::<usize>()
67 .unwrap();
68 let global_metrics = get_metrics().unwrap();
69 let (indexed_checkpoint_sender, indexed_checkpoint_receiver) =
70 mysten_metrics::metered_channel::channel(
71 checkpoint_queue_size,
72 &global_metrics
73 .channel_inflight
74 .with_label_values(&["checkpoint_indexing"]),
75 );
76
77 let state_clone = state.clone();
78 let metrics_clone = metrics.clone();
79 spawn_monitored_task!(start_tx_checkpoint_commit_task(
80 state_clone,
81 metrics_clone,
82 indexed_checkpoint_receiver,
83 cancel.clone(),
84 start_checkpoint,
85 end_checkpoint_opt,
86 ));
87 Ok((
88 CheckpointHandler::new(state, metrics, indexed_checkpoint_sender),
89 start_checkpoint,
90 ))
91}
92
93pub struct CheckpointHandler {
94 state: PgIndexerStore,
95 metrics: IndexerMetrics,
96 indexed_checkpoint_sender: mysten_metrics::metered_channel::Sender<CheckpointDataToCommit>,
97}
98
99#[async_trait]
100impl Worker for CheckpointHandler {
101 type Result = ();
102 async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
103 let time_now_ms = chrono::Utc::now().timestamp_millis();
104 let cp_download_lag = time_now_ms - checkpoint.checkpoint_summary.timestamp_ms as i64;
105 info!(
106 "checkpoint download lag for cp {}: {} ms",
107 checkpoint.checkpoint_summary.sequence_number, cp_download_lag
108 );
109 self.metrics.download_lag_ms.set(cp_download_lag);
110 self.metrics
111 .max_downloaded_checkpoint_sequence_number
112 .set(checkpoint.checkpoint_summary.sequence_number as i64);
113 self.metrics
114 .downloaded_checkpoint_timestamp_ms
115 .set(checkpoint.checkpoint_summary.timestamp_ms as i64);
116 info!(
117 "Indexer lag: downloaded checkpoint {} with time now {} and checkpoint time {}",
118 checkpoint.checkpoint_summary.sequence_number,
119 time_now_ms,
120 checkpoint.checkpoint_summary.timestamp_ms
121 );
122 let checkpoint_data = Self::index_checkpoint(
123 &self.state,
124 checkpoint,
125 Arc::new(self.metrics.clone()),
126 Self::index_packages(std::slice::from_ref(checkpoint), &self.metrics),
127 )
128 .await?;
129 self.indexed_checkpoint_sender.send(checkpoint_data).await?;
130 Ok(())
131 }
132}
133
134impl CheckpointHandler {
135 fn new(
136 state: PgIndexerStore,
137 metrics: IndexerMetrics,
138 indexed_checkpoint_sender: mysten_metrics::metered_channel::Sender<CheckpointDataToCommit>,
139 ) -> Self {
140 Self {
141 state,
142 metrics,
143 indexed_checkpoint_sender,
144 }
145 }
146
147 async fn index_epoch(
148 state: &PgIndexerStore,
149 data: &CheckpointData,
150 ) -> Result<Option<EpochToCommit>, IndexerError> {
151 let checkpoint_object_store = EpochEndIndexingObjectStore::new(data);
152
153 let CheckpointData {
154 transactions,
155 checkpoint_summary,
156 checkpoint_contents: _,
157 } = data;
158
159 if *checkpoint_summary.sequence_number() == 0 {
161 info!("Processing genesis epoch");
162 let system_state_summary =
163 get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
164 return Ok(Some(EpochToCommit {
165 last_epoch: None,
166 new_epoch: StartOfEpochUpdate::new(system_state_summary, EpochStartInfo::default()),
167 }));
168 }
169
170 if checkpoint_summary.end_of_epoch_data.is_none() {
172 return Ok(None);
173 }
174
175 let system_state_summary =
176 get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
177
178 let epoch_event_opt = transactions
179 .iter()
180 .find_map(|t| {
181 t.events.as_ref()?.data.iter().find_map(|ev| {
182 if ev.is_system_epoch_info_event() {
183 Some(bcs::from_bytes::<SystemEpochInfoEvent>(&ev.contents))
184 } else {
185 None
186 }
187 })
188 })
189 .transpose()?;
190 if epoch_event_opt.is_none() {
191 warn!(
192 "No SystemEpochInfoEvent found at end of epoch {}, some epoch data will be set to default.",
193 checkpoint_summary.epoch,
194 );
195 assert!(
196 system_state_summary.safe_mode,
197 "Sui is not in safe mode but no SystemEpochInfoEvent found at end of epoch {}",
198 checkpoint_summary.epoch
199 );
200 }
201
202 let first_tx_sequence_number = match system_state_summary.epoch {
206 1 => Ok(0),
208 _ => {
209 let last_epoch = system_state_summary.epoch - 2;
210 state
211 .get_network_total_transactions_by_end_of_epoch(last_epoch)
212 .await?
213 .ok_or_else(|| {
214 IndexerError::PersistentStorageDataCorruptionError(format!(
215 "Network total transactions for epoch {} not found",
216 last_epoch
217 ))
218 })
219 }
220 }?;
221
222 let epoch_end_info = EpochEndInfo::new(epoch_event_opt.as_ref());
223 let epoch_start_info = EpochStartInfo::new(
224 checkpoint_summary.sequence_number.saturating_add(1),
225 checkpoint_summary.network_total_transactions,
226 epoch_event_opt.as_ref(),
227 );
228
229 Ok(Some(EpochToCommit {
230 last_epoch: Some(EndOfEpochUpdate::new(
231 checkpoint_summary,
232 first_tx_sequence_number,
233 epoch_end_info,
234 )),
235 new_epoch: StartOfEpochUpdate::new(system_state_summary, epoch_start_info),
236 }))
237 }
238
239 fn derive_object_versions(
240 object_history_changes: &TransactionObjectChangesToCommit,
241 ) -> Vec<StoredObjectVersion> {
242 let mut object_versions = vec![];
243 for changed_obj in object_history_changes.changed_objects.iter() {
244 object_versions.push(StoredObjectVersion {
245 object_id: changed_obj.object.id().to_vec(),
246 object_version: changed_obj.object.version().value() as i64,
247 cp_sequence_number: changed_obj.checkpoint_sequence_number as i64,
248 });
249 }
250 for deleted_obj in object_history_changes.deleted_objects.iter() {
251 object_versions.push(StoredObjectVersion {
252 object_id: deleted_obj.object_id.to_vec(),
253 object_version: deleted_obj.object_version as i64,
254 cp_sequence_number: deleted_obj.checkpoint_sequence_number as i64,
255 });
256 }
257 object_versions
258 }
259
260 async fn index_checkpoint(
261 state: &PgIndexerStore,
262 data: &CheckpointData,
263 metrics: Arc<IndexerMetrics>,
264 packages: Vec<IndexedPackage>,
265 ) -> Result<CheckpointDataToCommit, IndexerError> {
266 let checkpoint_seq = data.checkpoint_summary.sequence_number;
267 info!(checkpoint_seq, "Indexing checkpoint data blob");
268
269 let epoch = Self::index_epoch(state, data).await?;
271
272 let object_changes: TransactionObjectChangesToCommit =
274 Self::index_objects(data, &metrics).await?;
275 let object_history_changes: TransactionObjectChangesToCommit =
276 Self::index_objects_history(data).await?;
277 let object_versions = Self::derive_object_versions(&object_history_changes);
278
279 let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = {
280 let CheckpointData {
281 transactions,
282 checkpoint_summary,
283 checkpoint_contents,
284 } = data;
285
286 let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) =
287 Self::index_transactions(
288 transactions,
289 checkpoint_summary,
290 checkpoint_contents,
291 &metrics,
292 )
293 .await?;
294
295 let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum();
296 (
297 IndexedCheckpoint::from_sui_checkpoint(
298 checkpoint_summary,
299 checkpoint_contents,
300 successful_tx_num as usize,
301 ),
302 db_transactions,
303 db_events,
304 db_tx_indices,
305 db_event_indices,
306 db_displays,
307 )
308 };
309 let time_now_ms = chrono::Utc::now().timestamp_millis();
310 metrics
311 .index_lag_ms
312 .set(time_now_ms - checkpoint.timestamp_ms as i64);
313 metrics
314 .max_indexed_checkpoint_sequence_number
315 .set(checkpoint.sequence_number as i64);
316 metrics
317 .indexed_checkpoint_timestamp_ms
318 .set(checkpoint.timestamp_ms as i64);
319 info!(
320 "Indexer lag: indexed checkpoint {} with time now {} and checkpoint time {}",
321 checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
322 );
323
324 Ok(CheckpointDataToCommit {
325 checkpoint,
326 transactions: db_transactions,
327 events: db_events,
328 tx_indices: db_tx_indices,
329 event_indices: db_event_indices,
330 display_updates: db_displays,
331 object_changes,
332 object_history_changes,
333 object_versions,
334 packages,
335 epoch,
336 })
337 }
338
339 async fn index_transactions(
340 transactions: &[CheckpointTransaction],
341 checkpoint_summary: &CertifiedCheckpointSummary,
342 checkpoint_contents: &CheckpointContents,
343 metrics: &IndexerMetrics,
344 ) -> IndexerResult<(
345 Vec<IndexedTransaction>,
346 Vec<IndexedEvent>,
347 Vec<TxIndex>,
348 Vec<EventIndex>,
349 BTreeMap<String, StoredDisplay>,
350 )> {
351 let checkpoint_seq = checkpoint_summary.sequence_number();
352
353 let mut tx_seq_num_iter = checkpoint_contents
354 .enumerate_transactions(checkpoint_summary)
355 .map(|(seq, execution_digest)| (execution_digest.transaction, seq));
356
357 if checkpoint_contents.size() != transactions.len() {
358 return Err(IndexerError::FullNodeReadingError(format!(
359 "CheckpointContents has different size {} compared to Transactions {} for checkpoint {}",
360 checkpoint_contents.size(),
361 transactions.len(),
362 checkpoint_seq
363 )));
364 }
365
366 let mut db_transactions = Vec::new();
367 let mut db_events = Vec::new();
368 let mut db_displays = BTreeMap::new();
369 let mut db_tx_indices = Vec::new();
370 let mut db_event_indices = Vec::new();
371
372 for tx in transactions {
373 let CheckpointTransaction {
374 transaction: sender_signed_data,
375 effects: fx,
376 events,
377 input_objects,
378 output_objects,
379 } = tx;
380 let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap();
382 if tx_digest != *sender_signed_data.digest() {
383 return Err(IndexerError::FullNodeReadingError(format!(
384 "Transactions has different ordering from CheckpointContents, for checkpoint {}, Mismatch found at {} v.s. {}",
385 checkpoint_seq,
386 tx_digest,
387 sender_signed_data.digest()
388 )));
389 }
390
391 let tx = sender_signed_data.transaction_data();
392 let events = events
393 .as_ref()
394 .map(|events| events.data.clone())
395 .unwrap_or_default();
396
397 let transaction_kind = if tx.is_system_tx() {
398 TransactionKind::SystemTransaction
399 } else {
400 TransactionKind::ProgrammableTransaction
401 };
402
403 db_events.extend(events.iter().enumerate().map(|(idx, event)| {
404 IndexedEvent::from_event(
405 tx_sequence_number,
406 idx as u64,
407 *checkpoint_seq,
408 tx_digest,
409 event,
410 checkpoint_summary.timestamp_ms,
411 )
412 }));
413
414 db_event_indices.extend(
415 events.iter().enumerate().map(|(idx, event)| {
416 EventIndex::from_event(tx_sequence_number, idx as u64, event)
417 }),
418 );
419
420 db_displays.extend(
421 events
422 .iter()
423 .flat_map(StoredDisplay::try_from_event)
424 .map(|display| (display.object_type.clone(), display)),
425 );
426
427 let objects: Vec<_> = input_objects.iter().chain(output_objects.iter()).collect();
428
429 let (balance_change, object_changes) =
430 TxChangesProcessor::new(&objects, metrics.clone())
431 .get_changes(tx, fx, &tx_digest)
432 .await?;
433
434 let db_txn = IndexedTransaction {
435 tx_sequence_number,
436 tx_digest,
437 checkpoint_sequence_number: *checkpoint_summary.sequence_number(),
438 timestamp_ms: checkpoint_summary.timestamp_ms,
439 sender_signed_data: sender_signed_data.data().clone(),
440 effects: fx.clone(),
441 object_changes,
442 balance_change,
443 events,
444 transaction_kind: transaction_kind.clone(),
445 successful_tx_num: if fx.status().is_ok() {
446 tx.kind().tx_count() as u64
447 } else {
448 0
449 },
450 };
451
452 db_transactions.push(db_txn);
453
454 let input_objects = tx
456 .input_objects()
457 .expect("committed txns have been validated")
458 .into_iter()
459 .map(|obj_kind| obj_kind.object_id())
460 .collect();
461
462 let changed_objects = fx
464 .all_changed_objects()
465 .into_iter()
466 .map(|(object_ref, _owner, _write_kind)| object_ref.0)
467 .collect();
468
469 let affected_objects = fx
471 .object_changes()
472 .into_iter()
473 .map(|ObjectChange { id, .. }| id)
474 .collect();
475
476 let payers = vec![tx.gas_owner()];
478
479 let sender = tx.sender();
481
482 let recipients = fx
484 .all_changed_objects()
485 .into_iter()
486 .filter_map(|(_object_ref, owner, _write_kind)| match owner {
487 Owner::AddressOwner(address) => Some(address),
488 _ => None,
489 })
490 .unique()
491 .collect();
492
493 let move_calls = tx
495 .move_calls()
496 .into_iter()
497 .map(|(p, m, f)| (*p, m.to_string(), f.to_string()))
498 .collect();
499
500 db_tx_indices.push(TxIndex {
501 tx_sequence_number,
502 transaction_digest: tx_digest,
503 checkpoint_sequence_number: *checkpoint_seq,
504 input_objects,
505 changed_objects,
506 affected_objects,
507 sender,
508 payers,
509 recipients,
510 move_calls,
511 tx_kind: transaction_kind,
512 });
513 }
514 Ok((
515 db_transactions,
516 db_events,
517 db_tx_indices,
518 db_event_indices,
519 db_displays,
520 ))
521 }
522
523 pub(crate) async fn index_objects(
524 data: &CheckpointData,
525 metrics: &IndexerMetrics,
526 ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
527 let _timer = metrics.indexing_objects_latency.start_timer();
528 let checkpoint_seq = data.checkpoint_summary.sequence_number;
529
530 let eventually_removed_object_refs_post_version =
531 data.eventually_removed_object_refs_post_version();
532 let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version
533 .into_iter()
534 .map(|obj_ref| IndexedDeletedObject {
535 object_id: obj_ref.0,
536 object_version: obj_ref.1.into(),
537 checkpoint_sequence_number: checkpoint_seq,
538 })
539 .collect();
540
541 let latest_live_output_objects = data.latest_live_output_objects();
542 let changed_objects = latest_live_output_objects
543 .into_iter()
544 .map(|o| {
545 try_extract_df_kind(o)
546 .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
547 })
548 .collect::<Result<Vec<_>, _>>()?;
549
550 Ok(TransactionObjectChangesToCommit {
551 changed_objects,
552 deleted_objects: indexed_eventually_removed_objects,
553 })
554 }
555
556 async fn index_objects_history(
558 data: &CheckpointData,
559 ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
560 let checkpoint_seq = data.checkpoint_summary.sequence_number;
561 let deleted_objects = data
562 .transactions
563 .iter()
564 .flat_map(|tx| tx.removed_object_refs_post_version())
565 .collect::<Vec<_>>();
566 let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
567 .into_iter()
568 .map(|obj_ref| IndexedDeletedObject {
569 object_id: obj_ref.0,
570 object_version: obj_ref.1.into(),
571 checkpoint_sequence_number: checkpoint_seq,
572 })
573 .collect();
574
575 let output_objects: Vec<_> = data
576 .transactions
577 .iter()
578 .flat_map(|tx| &tx.output_objects)
579 .collect();
580
581 let changed_objects = output_objects
584 .into_iter()
585 .map(|o| {
586 try_extract_df_kind(o)
587 .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
588 })
589 .collect::<Result<Vec<_>, _>>()?;
590
591 Ok(TransactionObjectChangesToCommit {
592 changed_objects,
593 deleted_objects: indexed_deleted_objects,
594 })
595 }
596
597 fn index_packages(
598 checkpoint_data: &[CheckpointData],
599 metrics: &IndexerMetrics,
600 ) -> Vec<IndexedPackage> {
601 let _timer = metrics.indexing_packages_latency.start_timer();
602 checkpoint_data
603 .iter()
604 .flat_map(|data| {
605 let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
606 data.transactions
607 .iter()
608 .flat_map(|tx| &tx.output_objects)
609 .filter_map(|o| {
610 if let sui_types::object::Data::Package(p) = &o.data {
611 Some(IndexedPackage {
612 package_id: o.id(),
613 move_package: p.clone(),
614 checkpoint_sequence_number,
615 })
616 } else {
617 None
618 }
619 })
620 .collect::<Vec<_>>()
621 })
622 .collect()
623 }
624}
625
626fn try_extract_df_kind(o: &Object) -> IndexerResult<Option<DynamicFieldType>> {
629 let Some(move_object) = o.data.try_as_move() else {
631 return Ok(None);
632 };
633
634 if !move_object.type_().is_dynamic_field() {
635 return Ok(None);
636 }
637
638 let type_: StructTag = move_object.type_().clone().into();
639 let [name, _] = type_.type_params.as_slice() else {
640 return Ok(None);
641 };
642
643 Ok(Some(
644 if matches!(name, TypeTag::Struct(s) if DynamicFieldInfo::is_dynamic_object_field_wrapper(s))
645 {
646 DynamicFieldType::DynamicObject
647 } else {
648 DynamicFieldType::DynamicField
649 },
650 ))
651}