sui_core/
global_state_hasher.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use itertools::Itertools;
5use mysten_common::ZipDebugEqIteratorExt;
6use mysten_common::fatal;
7use mysten_metrics::monitored_scope;
8use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
9use serde::Serialize;
10use sui_protocol_config::ProtocolConfig;
11use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
12use sui_types::committee::EpochId;
13use sui_types::digests::{ObjectDigest, TransactionDigest};
14use sui_types::in_memory_storage::InMemoryStorage;
15use sui_types::storage::{ObjectKey, ObjectStore};
16use tracing::debug;
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use fastcrypto::hash::MultisetHash;
22use sui_types::effects::TransactionEffects;
23use sui_types::effects::TransactionEffectsAPI;
24use sui_types::error::SuiResult;
25use sui_types::global_state_hash::GlobalStateHash;
26use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest};
27
28use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
29use crate::authority::authority_store_tables::LiveObject;
30
31pub struct GlobalStateHashMetrics {
32    inconsistent_state: IntGauge,
33}
34
35impl GlobalStateHashMetrics {
36    pub fn new(registry: &Registry) -> Arc<Self> {
37        let this = Self {
38            inconsistent_state: register_int_gauge_with_registry!(
39                "global_state_hasher_inconsistent_state",
40                "1 if accumulated live object set differs from GlobalStateHasher root state hash for the previous epoch",
41                registry
42            )
43            .unwrap(),
44        };
45        Arc::new(this)
46    }
47}
48
49pub struct GlobalStateHasher {
50    store: Arc<dyn GlobalStateHashStore>,
51    metrics: Arc<GlobalStateHashMetrics>,
52}
53
54pub trait GlobalStateHashStore: ObjectStore + Send + Sync {
55    /// This function is only called in older protocol versions, and should no longer be used.
56    /// It creates an explicit dependency to tombstones which is not desired.
57    fn get_object_ref_prior_to_key_deprecated(
58        &self,
59        object_id: &ObjectID,
60        version: VersionNumber,
61    ) -> SuiResult<Option<ObjectRef>>;
62
63    fn get_root_state_hash_for_epoch(
64        &self,
65        epoch: EpochId,
66    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>>;
67
68    fn get_root_state_hash_for_highest_epoch(
69        &self,
70    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>>;
71
72    fn insert_state_hash_for_epoch(
73        &self,
74        epoch: EpochId,
75        checkpoint_seq_num: &CheckpointSequenceNumber,
76        acc: &GlobalStateHash,
77    ) -> SuiResult;
78
79    fn iter_live_object_set(
80        &self,
81        include_wrapped_tombstone: bool,
82    ) -> Box<dyn Iterator<Item = LiveObject> + '_>;
83
84    fn iter_cached_live_object_set_for_testing(
85        &self,
86        include_wrapped_tombstone: bool,
87    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
88        self.iter_live_object_set(include_wrapped_tombstone)
89    }
90}
91
92impl GlobalStateHashStore for InMemoryStorage {
93    fn get_object_ref_prior_to_key_deprecated(
94        &self,
95        _object_id: &ObjectID,
96        _version: VersionNumber,
97    ) -> SuiResult<Option<ObjectRef>> {
98        unreachable!(
99            "get_object_ref_prior_to_key is only called by accumulate_effects_v1, while InMemoryStorage is used by testing and genesis only, which always uses latest protocol "
100        )
101    }
102
103    fn get_root_state_hash_for_epoch(
104        &self,
105        _epoch: EpochId,
106    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
107        unreachable!("not used for testing")
108    }
109
110    fn get_root_state_hash_for_highest_epoch(
111        &self,
112    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
113        unreachable!("not used for testing")
114    }
115
116    fn insert_state_hash_for_epoch(
117        &self,
118        _epoch: EpochId,
119        _checkpoint_seq_num: &CheckpointSequenceNumber,
120        _acc: &GlobalStateHash,
121    ) -> SuiResult {
122        unreachable!("not used for testing")
123    }
124
125    fn iter_live_object_set(
126        &self,
127        _include_wrapped_tombstone: bool,
128    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
129        unreachable!("not used for testing")
130    }
131}
132
133/// Serializable representation of the ObjectRef of an
134/// object that has been wrapped
135/// TODO: This can be replaced with ObjectKey.
136#[derive(Serialize, Debug)]
137pub struct WrappedObject {
138    id: ObjectID,
139    wrapped_at: SequenceNumber,
140    digest: ObjectDigest,
141}
142
143impl WrappedObject {
144    pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
145        Self {
146            id,
147            wrapped_at,
148            digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
149        }
150    }
151}
152
153pub fn accumulate_effects<T, S>(
154    store: S,
155    effects: &[TransactionEffects],
156    protocol_config: &ProtocolConfig,
157) -> GlobalStateHash
158where
159    S: std::ops::Deref<Target = T>,
160    T: GlobalStateHashStore + ?Sized,
161{
162    if protocol_config.enable_effects_v2() {
163        accumulate_effects_v3(effects)
164    } else if protocol_config.simplified_unwrap_then_delete() {
165        accumulate_effects_v2(store, effects)
166    } else {
167        accumulate_effects_v1(store, effects, protocol_config)
168    }
169}
170
171fn accumulate_effects_v1<T, S>(
172    store: S,
173    effects: &[TransactionEffects],
174    protocol_config: &ProtocolConfig,
175) -> GlobalStateHash
176where
177    S: std::ops::Deref<Target = T>,
178    T: GlobalStateHashStore + ?Sized,
179{
180    let mut acc = GlobalStateHash::default();
181
182    // process insertions to the set
183    acc.insert_all(
184        effects
185            .iter()
186            .flat_map(|fx| {
187                fx.all_changed_objects()
188                    .into_iter()
189                    .map(|(oref, _, _)| oref.2)
190            })
191            .collect::<Vec<ObjectDigest>>(),
192    );
193
194    // insert wrapped tombstones. We use a custom struct in order to contain the tombstone
195    // against the object id and sequence number, as the tombstone by itself is not unique.
196    acc.insert_all(
197        effects
198            .iter()
199            .flat_map(|fx| {
200                fx.wrapped()
201                    .iter()
202                    .map(|oref| {
203                        bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
204                            .unwrap()
205                            .to_vec()
206                    })
207                    .collect::<Vec<Vec<u8>>>()
208            })
209            .collect::<Vec<Vec<u8>>>(),
210    );
211
212    let all_unwrapped = effects
213        .iter()
214        .flat_map(|fx| {
215            fx.unwrapped()
216                .into_iter()
217                .map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
218        })
219        .chain(effects.iter().flat_map(|fx| {
220            fx.unwrapped_then_deleted()
221                .into_iter()
222                .map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
223        }))
224        .collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();
225
226    let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
227        .iter()
228        .map(|(digest, id, _)| (*digest, *id))
229        .into_group_map()
230        .iter()
231        .map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
232        .collect();
233
234    // Collect keys from modified_at_versions to remove from the hasher.
235    // Filter all unwrapped objects (from unwrapped or unwrapped_then_deleted effects)
236    // as these were inserted into the hasher as a WrappedObject. Will handle these
237    // separately.
238    let modified_at_version_keys: Vec<ObjectKey> = effects
239        .iter()
240        .flat_map(|fx| {
241            fx.modified_at_versions()
242                .into_iter()
243                .map(|(id, seq_num)| (*fx.transaction_digest(), id, seq_num))
244        })
245        .filter_map(|(tx_digest, id, seq_num)| {
246            // unwrapped tx
247            if let Some(ids) = unwrapped_ids.get(&tx_digest) {
248                // object unwrapped in this tx. We handle it later
249                if ids.contains(&id) {
250                    return None;
251                }
252            }
253            Some(ObjectKey(id, seq_num))
254        })
255        .collect();
256
257    let modified_at_digests: Vec<_> = store
258        .multi_get_objects_by_key(&modified_at_version_keys.clone())
259        .into_iter()
260        .zip_debug_eq(modified_at_version_keys)
261        .map(|(obj, key)| {
262            obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
263                .compute_object_reference()
264                .2
265        })
266        .collect();
267    acc.remove_all(modified_at_digests);
268
269    // Process unwrapped and unwrapped_then_deleted effects, which need to be
270    // removed as WrappedObject using the last sequence number it was tombstoned
271    // against. Since this happened in a past transaction, and the child object may
272    // have been modified since (and hence its sequence number incremented), we
273    // seek the version prior to the unwrapped version from the objects table directly.
274    // If the tombstone is not found, then assume this is a newly created wrapped object hence
275    // we don't expect to find it in the table.
276    let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
277        .iter()
278        .filter_map(|(_tx_digest, id, seq_num)| {
279            let objref = store
280                .get_object_ref_prior_to_key_deprecated(id, *seq_num)
281                .expect("read cannot fail");
282
283            objref.map(|(id, version, digest)| {
284                assert!(
285                    !protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
286                    "{:?}",
287                    id
288                );
289                WrappedObject::new(id, version)
290            })
291        })
292        .collect();
293
294    acc.remove_all(
295        wrapped_objects_to_remove
296            .iter()
297            .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
298            .collect::<Vec<Vec<u8>>>(),
299    );
300
301    acc
302}
303
304fn accumulate_effects_v2<T, S>(store: S, effects: &[TransactionEffects]) -> GlobalStateHash
305where
306    S: std::ops::Deref<Target = T>,
307    T: GlobalStateHashStore + ?Sized,
308{
309    let mut acc = GlobalStateHash::default();
310
311    // process insertions to the set
312    acc.insert_all(
313        effects
314            .iter()
315            .flat_map(|fx| {
316                fx.all_changed_objects()
317                    .into_iter()
318                    .map(|(oref, _, _)| oref.2)
319            })
320            .collect::<Vec<ObjectDigest>>(),
321    );
322
323    // Collect keys from modified_at_versions to remove from the hasher.
324    let modified_at_version_keys: Vec<_> = effects
325        .iter()
326        .flat_map(|fx| {
327            fx.modified_at_versions()
328                .into_iter()
329                .map(|(id, version)| ObjectKey(id, version))
330        })
331        .collect();
332
333    let modified_at_digests: Vec<_> = store
334        .multi_get_objects_by_key(&modified_at_version_keys.clone())
335        .into_iter()
336        .zip_debug_eq(modified_at_version_keys)
337        .map(|(obj, key)| {
338            obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
339                .compute_object_reference()
340                .2
341        })
342        .collect();
343    acc.remove_all(modified_at_digests);
344
345    acc
346}
347
348fn accumulate_effects_v3(effects: &[TransactionEffects]) -> GlobalStateHash {
349    let mut acc = GlobalStateHash::default();
350
351    // process insertions to the set
352    acc.insert_all(
353        effects
354            .iter()
355            .flat_map(|fx| {
356                fx.all_changed_objects()
357                    .into_iter()
358                    .map(|(object_ref, _, _)| object_ref.2)
359            })
360            .collect::<Vec<ObjectDigest>>(),
361    );
362
363    // process modified objects to the set
364    acc.remove_all(
365        effects
366            .iter()
367            .flat_map(|fx| {
368                fx.old_object_metadata()
369                    .into_iter()
370                    .map(|(object_ref, _owner)| object_ref.2)
371            })
372            .collect::<Vec<ObjectDigest>>(),
373    );
374
375    acc
376}
377
378impl GlobalStateHasher {
379    pub fn new(store: Arc<dyn GlobalStateHashStore>, metrics: Arc<GlobalStateHashMetrics>) -> Self {
380        Self { store, metrics }
381    }
382
383    pub fn new_for_tests(store: Arc<dyn GlobalStateHashStore>) -> Self {
384        Self::new(store, GlobalStateHashMetrics::new(&Registry::new()))
385    }
386
387    pub fn metrics(&self) -> Arc<GlobalStateHashMetrics> {
388        self.metrics.clone()
389    }
390
391    pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
392        self.metrics
393            .inconsistent_state
394            .set(is_inconsistent_state as i64);
395    }
396
397    /// Accumulates the effects of a single checkpoint and persists the hasher.
398    pub fn accumulate_checkpoint(
399        &self,
400        effects: &[TransactionEffects],
401        checkpoint_seq_num: CheckpointSequenceNumber,
402        epoch_store: &AuthorityPerEpochStore,
403    ) -> SuiResult<GlobalStateHash> {
404        let _scope = monitored_scope("AccumulateCheckpoint");
405        if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
406            return Ok(acc);
407        }
408
409        let acc = self.accumulate_effects(effects, epoch_store.protocol_config());
410
411        epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
412        debug!("Accumulated checkpoint {}", checkpoint_seq_num);
413
414        epoch_store
415            .checkpoint_state_notify_read
416            .notify(&checkpoint_seq_num, &acc);
417
418        Ok(acc)
419    }
420
421    pub fn accumulate_cached_live_object_set_for_testing(
422        &self,
423        include_wrapped_tombstone: bool,
424    ) -> GlobalStateHash {
425        Self::accumulate_live_object_set_impl(
426            self.store
427                .iter_cached_live_object_set_for_testing(include_wrapped_tombstone),
428        )
429    }
430
431    /// Returns the result of accumulating the live object set, without side effects
432    pub fn accumulate_live_object_set(&self, include_wrapped_tombstone: bool) -> GlobalStateHash {
433        Self::accumulate_live_object_set_impl(
434            self.store.iter_live_object_set(include_wrapped_tombstone),
435        )
436    }
437
438    fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> GlobalStateHash {
439        let mut acc = GlobalStateHash::default();
440        iter.for_each(|live_object| {
441            Self::accumulate_live_object(&mut acc, &live_object);
442        });
443        acc
444    }
445
446    pub fn accumulate_live_object(acc: &mut GlobalStateHash, live_object: &LiveObject) {
447        match live_object {
448            LiveObject::Normal(object) => {
449                acc.insert(object.compute_object_reference().2);
450            }
451            LiveObject::Wrapped(key) => {
452                acc.insert(
453                    bcs::to_bytes(&WrappedObject::new(key.0, key.1))
454                        .expect("Failed to serialize WrappedObject"),
455                );
456            }
457        }
458    }
459
460    pub fn digest_live_object_set(
461        &self,
462        include_wrapped_tombstone: bool,
463    ) -> ECMHLiveObjectSetDigest {
464        let acc = self.accumulate_live_object_set(include_wrapped_tombstone);
465        acc.digest().into()
466    }
467
468    pub async fn digest_epoch(
469        &self,
470        epoch_store: Arc<AuthorityPerEpochStore>,
471        last_checkpoint_of_epoch: CheckpointSequenceNumber,
472    ) -> SuiResult<ECMHLiveObjectSetDigest> {
473        Ok(self
474            .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
475            .digest()
476            .into())
477    }
478
479    pub async fn wait_for_previous_running_root(
480        &self,
481        epoch_store: &AuthorityPerEpochStore,
482        checkpoint_seq_num: CheckpointSequenceNumber,
483    ) -> SuiResult {
484        assert!(checkpoint_seq_num > 0);
485
486        // Check if this is the first checkpoint of the new epoch, in which case
487        // there is nothing to wait for.
488        if self
489            .store
490            .get_root_state_hash_for_highest_epoch()?
491            .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
492            == Some(checkpoint_seq_num - 1)
493        {
494            return Ok(());
495        }
496
497        // There is an edge case here where checkpoint_seq_num is 1. This means the previous
498        // checkpoint is the genesis checkpoint. CheckpointExecutor is guaranteed to execute
499        // and accumulate the genesis checkpoint, so this will resolve.
500        epoch_store
501            .notify_read_running_root(checkpoint_seq_num - 1)
502            .await?;
503        Ok(())
504    }
505
506    fn get_prior_root(
507        &self,
508        epoch_store: &AuthorityPerEpochStore,
509        checkpoint_seq_num: CheckpointSequenceNumber,
510    ) -> SuiResult<GlobalStateHash> {
511        if checkpoint_seq_num == 0 {
512            return Ok(GlobalStateHash::default());
513        }
514
515        if let Some(prior_running_root) =
516            epoch_store.get_running_root_state_hash(checkpoint_seq_num - 1)?
517        {
518            return Ok(prior_running_root);
519        }
520
521        if let Some((last_checkpoint_prev_epoch, prev_acc)) = self
522            .store
523            .get_root_state_hash_for_epoch(epoch_store.epoch() - 1)?
524            && last_checkpoint_prev_epoch == checkpoint_seq_num - 1
525        {
526            return Ok(prev_acc);
527        }
528
529        fatal!(
530            "Running root state hasher must exist for checkpoint {}",
531            checkpoint_seq_num - 1
532        );
533    }
534
535    // Accumulate the running root.
536    // The previous checkpoint must be accumulated before calling this function, or it will panic.
537    pub fn accumulate_running_root(
538        &self,
539        epoch_store: &AuthorityPerEpochStore,
540        checkpoint_seq_num: CheckpointSequenceNumber,
541        checkpoint_acc: Option<GlobalStateHash>,
542    ) -> SuiResult {
543        let _scope = monitored_scope("AccumulateRunningRoot");
544        tracing::debug!(
545            "accumulating running root for checkpoint {}",
546            checkpoint_seq_num
547        );
548
549        // Idempotency.
550        if epoch_store
551            .get_running_root_state_hash(checkpoint_seq_num)?
552            .is_some()
553        {
554            debug!(
555                "accumulate_running_root {:?} {:?} already exists",
556                epoch_store.epoch(),
557                checkpoint_seq_num
558            );
559            return Ok(());
560        }
561
562        let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
563
564        let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
565            epoch_store
566                .get_state_hash_for_checkpoint(&checkpoint_seq_num)
567                .expect("Failed to get checkpoint accumulator from disk")
568                .expect("Expected checkpoint accumulator to exist")
569        });
570        running_root.union(&checkpoint_acc);
571        epoch_store.insert_running_root_state_hash(&checkpoint_seq_num, &running_root)?;
572        debug!(
573            "Accumulated checkpoint {} to running root accumulator",
574            checkpoint_seq_num,
575        );
576        Ok(())
577    }
578
579    pub fn accumulate_epoch(
580        &self,
581        epoch_store: Arc<AuthorityPerEpochStore>,
582        last_checkpoint_of_epoch: CheckpointSequenceNumber,
583    ) -> SuiResult<GlobalStateHash> {
584        let _scope = monitored_scope("AccumulateEpochV2");
585        let running_root = epoch_store
586            .get_running_root_state_hash(last_checkpoint_of_epoch)?
587            .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
588
589        self.store.insert_state_hash_for_epoch(
590            epoch_store.epoch(),
591            &last_checkpoint_of_epoch,
592            &running_root,
593        )?;
594        debug!(
595            "Finalized root state hash for epoch {} (up to checkpoint {})",
596            epoch_store.epoch(),
597            last_checkpoint_of_epoch
598        );
599        Ok(running_root.clone())
600    }
601
602    pub fn accumulate_effects(
603        &self,
604        effects: &[TransactionEffects],
605        protocol_config: &ProtocolConfig,
606    ) -> GlobalStateHash {
607        accumulate_effects(&*self.store, effects, protocol_config)
608    }
609}