sui_indexer/handlers/
checkpoint_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use itertools::Itertools;
9use sui_types::dynamic_field::DynamicFieldInfo;
10use tokio_util::sync::CancellationToken;
11use tracing::{info, warn};
12
13use move_core_types::language_storage::{StructTag, TypeTag};
14use mysten_metrics::{get_metrics, spawn_monitored_task};
15use sui_data_ingestion_core::Worker;
16use sui_types::dynamic_field::DynamicFieldType;
17use sui_types::effects::{ObjectChange, TransactionEffectsAPI};
18use sui_types::event::SystemEpochInfoEvent;
19use sui_types::full_checkpoint_content::{CheckpointData, CheckpointTransaction};
20use sui_types::messages_checkpoint::{
21    CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
22};
23use sui_types::object::Object;
24use sui_types::object::Owner;
25use sui_types::sui_system_state::{SuiSystemStateTrait, get_sui_system_state};
26use sui_types::transaction::TransactionDataAPI;
27
28use crate::errors::IndexerError;
29use crate::handlers::committer::start_tx_checkpoint_commit_task;
30use crate::metrics::IndexerMetrics;
31use crate::models::display::StoredDisplay;
32use crate::models::epoch::{EndOfEpochUpdate, EpochEndInfo, EpochStartInfo, StartOfEpochUpdate};
33use crate::models::obj_indices::StoredObjectVersion;
34use crate::store::{IndexerStore, PgIndexerStore};
35use crate::types::{
36    EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEvent, IndexedObject,
37    IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex,
38};
39
40use super::CheckpointDataToCommit;
41use super::EpochToCommit;
42use super::TransactionObjectChangesToCommit;
43use super::tx_processor::EpochEndIndexingObjectStore;
44use super::tx_processor::TxChangesProcessor;
45
46const CHECKPOINT_QUEUE_SIZE: usize = 100;
47
48pub async fn new_handlers(
49    state: PgIndexerStore,
50    metrics: IndexerMetrics,
51    cancel: CancellationToken,
52    start_checkpoint_opt: Option<CheckpointSequenceNumber>,
53    end_checkpoint_opt: Option<CheckpointSequenceNumber>,
54) -> Result<(CheckpointHandler, u64), IndexerError> {
55    let start_checkpoint = match start_checkpoint_opt {
56        Some(start_checkpoint) => start_checkpoint,
57        None => state
58            .get_latest_checkpoint_sequence_number()
59            .await?
60            .map(|seq| seq.saturating_add(1))
61            .unwrap_or_default(),
62    };
63
64    let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
65        .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
66        .parse::<usize>()
67        .unwrap();
68    let global_metrics = get_metrics().unwrap();
69    let (indexed_checkpoint_sender, indexed_checkpoint_receiver) =
70        mysten_metrics::metered_channel::channel(
71            checkpoint_queue_size,
72            &global_metrics
73                .channel_inflight
74                .with_label_values(&["checkpoint_indexing"]),
75        );
76
77    let state_clone = state.clone();
78    let metrics_clone = metrics.clone();
79    spawn_monitored_task!(start_tx_checkpoint_commit_task(
80        state_clone,
81        metrics_clone,
82        indexed_checkpoint_receiver,
83        cancel.clone(),
84        start_checkpoint,
85        end_checkpoint_opt,
86    ));
87    Ok((
88        CheckpointHandler::new(state, metrics, indexed_checkpoint_sender),
89        start_checkpoint,
90    ))
91}
92
93pub struct CheckpointHandler {
94    state: PgIndexerStore,
95    metrics: IndexerMetrics,
96    indexed_checkpoint_sender: mysten_metrics::metered_channel::Sender<CheckpointDataToCommit>,
97}
98
99#[async_trait]
100impl Worker for CheckpointHandler {
101    type Result = ();
102    async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
103        let time_now_ms = chrono::Utc::now().timestamp_millis();
104        let cp_download_lag = time_now_ms - checkpoint.checkpoint_summary.timestamp_ms as i64;
105        info!(
106            "checkpoint download lag for cp {}: {} ms",
107            checkpoint.checkpoint_summary.sequence_number, cp_download_lag
108        );
109        self.metrics.download_lag_ms.set(cp_download_lag);
110        self.metrics
111            .max_downloaded_checkpoint_sequence_number
112            .set(checkpoint.checkpoint_summary.sequence_number as i64);
113        self.metrics
114            .downloaded_checkpoint_timestamp_ms
115            .set(checkpoint.checkpoint_summary.timestamp_ms as i64);
116        info!(
117            "Indexer lag: downloaded checkpoint {} with time now {} and checkpoint time {}",
118            checkpoint.checkpoint_summary.sequence_number,
119            time_now_ms,
120            checkpoint.checkpoint_summary.timestamp_ms
121        );
122        let checkpoint_data = Self::index_checkpoint(
123            &self.state,
124            checkpoint,
125            Arc::new(self.metrics.clone()),
126            Self::index_packages(std::slice::from_ref(checkpoint), &self.metrics),
127        )
128        .await?;
129        self.indexed_checkpoint_sender.send(checkpoint_data).await?;
130        Ok(())
131    }
132}
133
134impl CheckpointHandler {
135    fn new(
136        state: PgIndexerStore,
137        metrics: IndexerMetrics,
138        indexed_checkpoint_sender: mysten_metrics::metered_channel::Sender<CheckpointDataToCommit>,
139    ) -> Self {
140        Self {
141            state,
142            metrics,
143            indexed_checkpoint_sender,
144        }
145    }
146
147    async fn index_epoch(
148        state: &PgIndexerStore,
149        data: &CheckpointData,
150    ) -> Result<Option<EpochToCommit>, IndexerError> {
151        let checkpoint_object_store = EpochEndIndexingObjectStore::new(data);
152
153        let CheckpointData {
154            transactions,
155            checkpoint_summary,
156            checkpoint_contents: _,
157        } = data;
158
159        // Genesis epoch
160        if *checkpoint_summary.sequence_number() == 0 {
161            info!("Processing genesis epoch");
162            let system_state_summary =
163                get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
164            return Ok(Some(EpochToCommit {
165                last_epoch: None,
166                new_epoch: StartOfEpochUpdate::new(system_state_summary, EpochStartInfo::default()),
167            }));
168        }
169
170        // If not end of epoch, return
171        if checkpoint_summary.end_of_epoch_data.is_none() {
172            return Ok(None);
173        }
174
175        let system_state_summary =
176            get_sui_system_state(&checkpoint_object_store)?.into_sui_system_state_summary();
177
178        let epoch_event_opt = transactions
179            .iter()
180            .find_map(|t| {
181                t.events.as_ref()?.data.iter().find_map(|ev| {
182                    if ev.is_system_epoch_info_event() {
183                        Some(bcs::from_bytes::<SystemEpochInfoEvent>(&ev.contents))
184                    } else {
185                        None
186                    }
187                })
188            })
189            .transpose()?;
190        if epoch_event_opt.is_none() {
191            warn!(
192                "No SystemEpochInfoEvent found at end of epoch {}, some epoch data will be set to default.",
193                checkpoint_summary.epoch,
194            );
195            assert!(
196                system_state_summary.safe_mode,
197                "Sui is not in safe mode but no SystemEpochInfoEvent found at end of epoch {}",
198                checkpoint_summary.epoch
199            );
200        }
201
202        // At some point while committing data in epoch X - 1, we will encounter a new epoch X. We
203        // want to retrieve X - 2's network total transactions to calculate the number of
204        // transactions that occurred in epoch X - 1.
205        let first_tx_sequence_number = match system_state_summary.epoch {
206            // If first epoch change, this number is 0
207            1 => Ok(0),
208            _ => {
209                let last_epoch = system_state_summary.epoch - 2;
210                state
211                    .get_network_total_transactions_by_end_of_epoch(last_epoch)
212                    .await?
213                    .ok_or_else(|| {
214                        IndexerError::PersistentStorageDataCorruptionError(format!(
215                            "Network total transactions for epoch {} not found",
216                            last_epoch
217                        ))
218                    })
219            }
220        }?;
221
222        let epoch_end_info = EpochEndInfo::new(epoch_event_opt.as_ref());
223        let epoch_start_info = EpochStartInfo::new(
224            checkpoint_summary.sequence_number.saturating_add(1),
225            checkpoint_summary.network_total_transactions,
226            epoch_event_opt.as_ref(),
227        );
228
229        Ok(Some(EpochToCommit {
230            last_epoch: Some(EndOfEpochUpdate::new(
231                checkpoint_summary,
232                first_tx_sequence_number,
233                epoch_end_info,
234            )),
235            new_epoch: StartOfEpochUpdate::new(system_state_summary, epoch_start_info),
236        }))
237    }
238
239    fn derive_object_versions(
240        object_history_changes: &TransactionObjectChangesToCommit,
241    ) -> Vec<StoredObjectVersion> {
242        let mut object_versions = vec![];
243        for changed_obj in object_history_changes.changed_objects.iter() {
244            object_versions.push(StoredObjectVersion {
245                object_id: changed_obj.object.id().to_vec(),
246                object_version: changed_obj.object.version().value() as i64,
247                cp_sequence_number: changed_obj.checkpoint_sequence_number as i64,
248            });
249        }
250        for deleted_obj in object_history_changes.deleted_objects.iter() {
251            object_versions.push(StoredObjectVersion {
252                object_id: deleted_obj.object_id.to_vec(),
253                object_version: deleted_obj.object_version as i64,
254                cp_sequence_number: deleted_obj.checkpoint_sequence_number as i64,
255            });
256        }
257        object_versions
258    }
259
260    async fn index_checkpoint(
261        state: &PgIndexerStore,
262        data: &CheckpointData,
263        metrics: Arc<IndexerMetrics>,
264        packages: Vec<IndexedPackage>,
265    ) -> Result<CheckpointDataToCommit, IndexerError> {
266        let checkpoint_seq = data.checkpoint_summary.sequence_number;
267        info!(checkpoint_seq, "Indexing checkpoint data blob");
268
269        // Index epoch
270        let epoch = Self::index_epoch(state, data).await?;
271
272        // Index Objects
273        let object_changes: TransactionObjectChangesToCommit =
274            Self::index_objects(data, &metrics).await?;
275        let object_history_changes: TransactionObjectChangesToCommit =
276            Self::index_objects_history(data).await?;
277        let object_versions = Self::derive_object_versions(&object_history_changes);
278
279        let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = {
280            let CheckpointData {
281                transactions,
282                checkpoint_summary,
283                checkpoint_contents,
284            } = data;
285
286            let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) =
287                Self::index_transactions(
288                    transactions,
289                    checkpoint_summary,
290                    checkpoint_contents,
291                    &metrics,
292                )
293                .await?;
294
295            let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum();
296            (
297                IndexedCheckpoint::from_sui_checkpoint(
298                    checkpoint_summary,
299                    checkpoint_contents,
300                    successful_tx_num as usize,
301                ),
302                db_transactions,
303                db_events,
304                db_tx_indices,
305                db_event_indices,
306                db_displays,
307            )
308        };
309        let time_now_ms = chrono::Utc::now().timestamp_millis();
310        metrics
311            .index_lag_ms
312            .set(time_now_ms - checkpoint.timestamp_ms as i64);
313        metrics
314            .max_indexed_checkpoint_sequence_number
315            .set(checkpoint.sequence_number as i64);
316        metrics
317            .indexed_checkpoint_timestamp_ms
318            .set(checkpoint.timestamp_ms as i64);
319        info!(
320            "Indexer lag: indexed checkpoint {} with time now {} and checkpoint time {}",
321            checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
322        );
323
324        Ok(CheckpointDataToCommit {
325            checkpoint,
326            transactions: db_transactions,
327            events: db_events,
328            tx_indices: db_tx_indices,
329            event_indices: db_event_indices,
330            display_updates: db_displays,
331            object_changes,
332            object_history_changes,
333            object_versions,
334            packages,
335            epoch,
336        })
337    }
338
339    async fn index_transactions(
340        transactions: &[CheckpointTransaction],
341        checkpoint_summary: &CertifiedCheckpointSummary,
342        checkpoint_contents: &CheckpointContents,
343        metrics: &IndexerMetrics,
344    ) -> IndexerResult<(
345        Vec<IndexedTransaction>,
346        Vec<IndexedEvent>,
347        Vec<TxIndex>,
348        Vec<EventIndex>,
349        BTreeMap<String, StoredDisplay>,
350    )> {
351        let checkpoint_seq = checkpoint_summary.sequence_number();
352
353        let mut tx_seq_num_iter = checkpoint_contents
354            .enumerate_transactions(checkpoint_summary)
355            .map(|(seq, execution_digest)| (execution_digest.transaction, seq));
356
357        if checkpoint_contents.size() != transactions.len() {
358            return Err(IndexerError::FullNodeReadingError(format!(
359                "CheckpointContents has different size {} compared to Transactions {} for checkpoint {}",
360                checkpoint_contents.size(),
361                transactions.len(),
362                checkpoint_seq
363            )));
364        }
365
366        let mut db_transactions = Vec::new();
367        let mut db_events = Vec::new();
368        let mut db_displays = BTreeMap::new();
369        let mut db_tx_indices = Vec::new();
370        let mut db_event_indices = Vec::new();
371
372        for tx in transactions {
373            let CheckpointTransaction {
374                transaction: sender_signed_data,
375                effects: fx,
376                events,
377                input_objects,
378                output_objects,
379            } = tx;
380            // Unwrap safe - we checked they have equal length above
381            let (tx_digest, tx_sequence_number) = tx_seq_num_iter.next().unwrap();
382            if tx_digest != *sender_signed_data.digest() {
383                return Err(IndexerError::FullNodeReadingError(format!(
384                    "Transactions has different ordering from CheckpointContents, for checkpoint {}, Mismatch found at {} v.s. {}",
385                    checkpoint_seq,
386                    tx_digest,
387                    sender_signed_data.digest()
388                )));
389            }
390
391            let tx = sender_signed_data.transaction_data();
392            let events = events
393                .as_ref()
394                .map(|events| events.data.clone())
395                .unwrap_or_default();
396
397            let transaction_kind = if tx.is_system_tx() {
398                TransactionKind::SystemTransaction
399            } else {
400                TransactionKind::ProgrammableTransaction
401            };
402
403            db_events.extend(events.iter().enumerate().map(|(idx, event)| {
404                IndexedEvent::from_event(
405                    tx_sequence_number,
406                    idx as u64,
407                    *checkpoint_seq,
408                    tx_digest,
409                    event,
410                    checkpoint_summary.timestamp_ms,
411                )
412            }));
413
414            db_event_indices.extend(
415                events.iter().enumerate().map(|(idx, event)| {
416                    EventIndex::from_event(tx_sequence_number, idx as u64, event)
417                }),
418            );
419
420            db_displays.extend(
421                events
422                    .iter()
423                    .flat_map(StoredDisplay::try_from_event)
424                    .map(|display| (display.object_type.clone(), display)),
425            );
426
427            let objects: Vec<_> = input_objects.iter().chain(output_objects.iter()).collect();
428
429            let (balance_change, object_changes) =
430                TxChangesProcessor::new(&objects, metrics.clone())
431                    .get_changes(tx, fx, &tx_digest)
432                    .await?;
433
434            let db_txn = IndexedTransaction {
435                tx_sequence_number,
436                tx_digest,
437                checkpoint_sequence_number: *checkpoint_summary.sequence_number(),
438                timestamp_ms: checkpoint_summary.timestamp_ms,
439                sender_signed_data: sender_signed_data.data().clone(),
440                effects: fx.clone(),
441                object_changes,
442                balance_change,
443                events,
444                transaction_kind: transaction_kind.clone(),
445                successful_tx_num: if fx.status().is_ok() {
446                    tx.kind().tx_count() as u64
447                } else {
448                    0
449                },
450            };
451
452            db_transactions.push(db_txn);
453
454            // Input Objects
455            let input_objects = tx
456                .input_objects()
457                .expect("committed txns have been validated")
458                .into_iter()
459                .map(|obj_kind| obj_kind.object_id())
460                .collect();
461
462            // Changed Objects
463            let changed_objects = fx
464                .all_changed_objects()
465                .into_iter()
466                .map(|(object_ref, _owner, _write_kind)| object_ref.0)
467                .collect();
468
469            // Affected Objects
470            let affected_objects = fx
471                .object_changes()
472                .into_iter()
473                .map(|ObjectChange { id, .. }| id)
474                .collect();
475
476            // Payers
477            let payers = vec![tx.gas_owner()];
478
479            // Sender
480            let sender = tx.sender();
481
482            // Recipients
483            let recipients = fx
484                .all_changed_objects()
485                .into_iter()
486                .filter_map(|(_object_ref, owner, _write_kind)| match owner {
487                    Owner::AddressOwner(address) => Some(address),
488                    _ => None,
489                })
490                .unique()
491                .collect();
492
493            // Move Calls
494            let move_calls = tx
495                .move_calls()
496                .into_iter()
497                .map(|(p, m, f)| (*p, m.to_string(), f.to_string()))
498                .collect();
499
500            db_tx_indices.push(TxIndex {
501                tx_sequence_number,
502                transaction_digest: tx_digest,
503                checkpoint_sequence_number: *checkpoint_seq,
504                input_objects,
505                changed_objects,
506                affected_objects,
507                sender,
508                payers,
509                recipients,
510                move_calls,
511                tx_kind: transaction_kind,
512            });
513        }
514        Ok((
515            db_transactions,
516            db_events,
517            db_tx_indices,
518            db_event_indices,
519            db_displays,
520        ))
521    }
522
523    pub(crate) async fn index_objects(
524        data: &CheckpointData,
525        metrics: &IndexerMetrics,
526    ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
527        let _timer = metrics.indexing_objects_latency.start_timer();
528        let checkpoint_seq = data.checkpoint_summary.sequence_number;
529
530        let eventually_removed_object_refs_post_version =
531            data.eventually_removed_object_refs_post_version();
532        let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version
533            .into_iter()
534            .map(|obj_ref| IndexedDeletedObject {
535                object_id: obj_ref.0,
536                object_version: obj_ref.1.into(),
537                checkpoint_sequence_number: checkpoint_seq,
538            })
539            .collect();
540
541        let latest_live_output_objects = data.latest_live_output_objects();
542        let changed_objects = latest_live_output_objects
543            .into_iter()
544            .map(|o| {
545                try_extract_df_kind(o)
546                    .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
547            })
548            .collect::<Result<Vec<_>, _>>()?;
549
550        Ok(TransactionObjectChangesToCommit {
551            changed_objects,
552            deleted_objects: indexed_eventually_removed_objects,
553        })
554    }
555
556    // similar to index_objects, but objects_history keeps all versions of objects
557    async fn index_objects_history(
558        data: &CheckpointData,
559    ) -> Result<TransactionObjectChangesToCommit, IndexerError> {
560        let checkpoint_seq = data.checkpoint_summary.sequence_number;
561        let deleted_objects = data
562            .transactions
563            .iter()
564            .flat_map(|tx| tx.removed_object_refs_post_version())
565            .collect::<Vec<_>>();
566        let indexed_deleted_objects: Vec<IndexedDeletedObject> = deleted_objects
567            .into_iter()
568            .map(|obj_ref| IndexedDeletedObject {
569                object_id: obj_ref.0,
570                object_version: obj_ref.1.into(),
571                checkpoint_sequence_number: checkpoint_seq,
572            })
573            .collect();
574
575        let output_objects: Vec<_> = data
576            .transactions
577            .iter()
578            .flat_map(|tx| &tx.output_objects)
579            .collect();
580
581        // TODO(gegaowp): the current df_info implementation is not correct,
582        // but we have decided remove all df_* except df_kind.
583        let changed_objects = output_objects
584            .into_iter()
585            .map(|o| {
586                try_extract_df_kind(o)
587                    .map(|df_kind| IndexedObject::from_object(checkpoint_seq, o.clone(), df_kind))
588            })
589            .collect::<Result<Vec<_>, _>>()?;
590
591        Ok(TransactionObjectChangesToCommit {
592            changed_objects,
593            deleted_objects: indexed_deleted_objects,
594        })
595    }
596
597    fn index_packages(
598        checkpoint_data: &[CheckpointData],
599        metrics: &IndexerMetrics,
600    ) -> Vec<IndexedPackage> {
601        let _timer = metrics.indexing_packages_latency.start_timer();
602        checkpoint_data
603            .iter()
604            .flat_map(|data| {
605                let checkpoint_sequence_number = data.checkpoint_summary.sequence_number;
606                data.transactions
607                    .iter()
608                    .flat_map(|tx| &tx.output_objects)
609                    .filter_map(|o| {
610                        if let sui_types::object::Data::Package(p) = &o.data {
611                            Some(IndexedPackage {
612                                package_id: o.id(),
613                                move_package: p.clone(),
614                                checkpoint_sequence_number,
615                            })
616                        } else {
617                            None
618                        }
619                    })
620                    .collect::<Vec<_>>()
621            })
622            .collect()
623    }
624}
625
626/// If `o` is a dynamic `Field<K, V>`, determine whether it represents a Dynamic Field or a Dynamic
627/// Object Field based on its type.
628fn try_extract_df_kind(o: &Object) -> IndexerResult<Option<DynamicFieldType>> {
629    // Skip if not a move object
630    let Some(move_object) = o.data.try_as_move() else {
631        return Ok(None);
632    };
633
634    if !move_object.type_().is_dynamic_field() {
635        return Ok(None);
636    }
637
638    let type_: StructTag = move_object.type_().clone().into();
639    let [name, _] = type_.type_params.as_slice() else {
640        return Ok(None);
641    };
642
643    Ok(Some(
644        if matches!(name, TypeTag::Struct(s) if DynamicFieldInfo::is_dynamic_object_field_wrapper(s))
645        {
646            DynamicFieldType::DynamicObject
647        } else {
648            DynamicFieldType::DynamicField
649        },
650    ))
651}