sui_core/
global_state_hasher.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use itertools::Itertools;
5use mysten_common::fatal;
6use mysten_metrics::monitored_scope;
7use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
8use serde::Serialize;
9use sui_protocol_config::ProtocolConfig;
10use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
11use sui_types::committee::EpochId;
12use sui_types::digests::{ObjectDigest, TransactionDigest};
13use sui_types::in_memory_storage::InMemoryStorage;
14use sui_types::storage::{ObjectKey, ObjectStore};
15use tracing::debug;
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19
20use fastcrypto::hash::MultisetHash;
21use sui_types::effects::TransactionEffects;
22use sui_types::effects::TransactionEffectsAPI;
23use sui_types::error::SuiResult;
24use sui_types::global_state_hash::GlobalStateHash;
25use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest};
26
27use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
28use crate::authority::authority_store_tables::LiveObject;
29
30pub struct GlobalStateHashMetrics {
31    inconsistent_state: IntGauge,
32}
33
34impl GlobalStateHashMetrics {
35    pub fn new(registry: &Registry) -> Arc<Self> {
36        let this = Self {
37            inconsistent_state: register_int_gauge_with_registry!(
38                "global_state_hasher_inconsistent_state",
39                "1 if accumulated live object set differs from GlobalStateHasher root state hash for the previous epoch",
40                registry
41            )
42            .unwrap(),
43        };
44        Arc::new(this)
45    }
46}
47
48pub struct GlobalStateHasher {
49    store: Arc<dyn GlobalStateHashStore>,
50    metrics: Arc<GlobalStateHashMetrics>,
51}
52
53pub trait GlobalStateHashStore: ObjectStore + Send + Sync {
54    /// This function is only called in older protocol versions, and should no longer be used.
55    /// It creates an explicit dependency to tombstones which is not desired.
56    fn get_object_ref_prior_to_key_deprecated(
57        &self,
58        object_id: &ObjectID,
59        version: VersionNumber,
60    ) -> SuiResult<Option<ObjectRef>>;
61
62    fn get_root_state_hash_for_epoch(
63        &self,
64        epoch: EpochId,
65    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>>;
66
67    fn get_root_state_hash_for_highest_epoch(
68        &self,
69    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>>;
70
71    fn insert_state_hash_for_epoch(
72        &self,
73        epoch: EpochId,
74        checkpoint_seq_num: &CheckpointSequenceNumber,
75        acc: &GlobalStateHash,
76    ) -> SuiResult;
77
78    fn iter_live_object_set(
79        &self,
80        include_wrapped_tombstone: bool,
81    ) -> Box<dyn Iterator<Item = LiveObject> + '_>;
82
83    fn iter_cached_live_object_set_for_testing(
84        &self,
85        include_wrapped_tombstone: bool,
86    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
87        self.iter_live_object_set(include_wrapped_tombstone)
88    }
89}
90
91impl GlobalStateHashStore for InMemoryStorage {
92    fn get_object_ref_prior_to_key_deprecated(
93        &self,
94        _object_id: &ObjectID,
95        _version: VersionNumber,
96    ) -> SuiResult<Option<ObjectRef>> {
97        unreachable!(
98            "get_object_ref_prior_to_key is only called by accumulate_effects_v1, while InMemoryStorage is used by testing and genesis only, which always uses latest protocol "
99        )
100    }
101
102    fn get_root_state_hash_for_epoch(
103        &self,
104        _epoch: EpochId,
105    ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
106        unreachable!("not used for testing")
107    }
108
109    fn get_root_state_hash_for_highest_epoch(
110        &self,
111    ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
112        unreachable!("not used for testing")
113    }
114
115    fn insert_state_hash_for_epoch(
116        &self,
117        _epoch: EpochId,
118        _checkpoint_seq_num: &CheckpointSequenceNumber,
119        _acc: &GlobalStateHash,
120    ) -> SuiResult {
121        unreachable!("not used for testing")
122    }
123
124    fn iter_live_object_set(
125        &self,
126        _include_wrapped_tombstone: bool,
127    ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
128        unreachable!("not used for testing")
129    }
130}
131
132/// Serializable representation of the ObjectRef of an
133/// object that has been wrapped
134/// TODO: This can be replaced with ObjectKey.
135#[derive(Serialize, Debug)]
136pub struct WrappedObject {
137    id: ObjectID,
138    wrapped_at: SequenceNumber,
139    digest: ObjectDigest,
140}
141
142impl WrappedObject {
143    pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
144        Self {
145            id,
146            wrapped_at,
147            digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
148        }
149    }
150}
151
152pub fn accumulate_effects<T, S>(
153    store: S,
154    effects: &[TransactionEffects],
155    protocol_config: &ProtocolConfig,
156) -> GlobalStateHash
157where
158    S: std::ops::Deref<Target = T>,
159    T: GlobalStateHashStore + ?Sized,
160{
161    if protocol_config.enable_effects_v2() {
162        accumulate_effects_v3(effects)
163    } else if protocol_config.simplified_unwrap_then_delete() {
164        accumulate_effects_v2(store, effects)
165    } else {
166        accumulate_effects_v1(store, effects, protocol_config)
167    }
168}
169
170fn accumulate_effects_v1<T, S>(
171    store: S,
172    effects: &[TransactionEffects],
173    protocol_config: &ProtocolConfig,
174) -> GlobalStateHash
175where
176    S: std::ops::Deref<Target = T>,
177    T: GlobalStateHashStore + ?Sized,
178{
179    let mut acc = GlobalStateHash::default();
180
181    // process insertions to the set
182    acc.insert_all(
183        effects
184            .iter()
185            .flat_map(|fx| {
186                fx.all_changed_objects()
187                    .into_iter()
188                    .map(|(oref, _, _)| oref.2)
189            })
190            .collect::<Vec<ObjectDigest>>(),
191    );
192
193    // insert wrapped tombstones. We use a custom struct in order to contain the tombstone
194    // against the object id and sequence number, as the tombstone by itself is not unique.
195    acc.insert_all(
196        effects
197            .iter()
198            .flat_map(|fx| {
199                fx.wrapped()
200                    .iter()
201                    .map(|oref| {
202                        bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
203                            .unwrap()
204                            .to_vec()
205                    })
206                    .collect::<Vec<Vec<u8>>>()
207            })
208            .collect::<Vec<Vec<u8>>>(),
209    );
210
211    let all_unwrapped = effects
212        .iter()
213        .flat_map(|fx| {
214            fx.unwrapped()
215                .into_iter()
216                .map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
217        })
218        .chain(effects.iter().flat_map(|fx| {
219            fx.unwrapped_then_deleted()
220                .into_iter()
221                .map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
222        }))
223        .collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();
224
225    let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
226        .iter()
227        .map(|(digest, id, _)| (*digest, *id))
228        .into_group_map()
229        .iter()
230        .map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
231        .collect();
232
233    // Collect keys from modified_at_versions to remove from the hasher.
234    // Filter all unwrapped objects (from unwrapped or unwrapped_then_deleted effects)
235    // as these were inserted into the hasher as a WrappedObject. Will handle these
236    // separately.
237    let modified_at_version_keys: Vec<ObjectKey> = effects
238        .iter()
239        .flat_map(|fx| {
240            fx.modified_at_versions()
241                .into_iter()
242                .map(|(id, seq_num)| (*fx.transaction_digest(), id, seq_num))
243        })
244        .filter_map(|(tx_digest, id, seq_num)| {
245            // unwrapped tx
246            if let Some(ids) = unwrapped_ids.get(&tx_digest) {
247                // object unwrapped in this tx. We handle it later
248                if ids.contains(&id) {
249                    return None;
250                }
251            }
252            Some(ObjectKey(id, seq_num))
253        })
254        .collect();
255
256    let modified_at_digests: Vec<_> = store
257        .multi_get_objects_by_key(&modified_at_version_keys.clone())
258        .into_iter()
259        .zip(modified_at_version_keys)
260        .map(|(obj, key)| {
261            obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
262                .compute_object_reference()
263                .2
264        })
265        .collect();
266    acc.remove_all(modified_at_digests);
267
268    // Process unwrapped and unwrapped_then_deleted effects, which need to be
269    // removed as WrappedObject using the last sequence number it was tombstoned
270    // against. Since this happened in a past transaction, and the child object may
271    // have been modified since (and hence its sequence number incremented), we
272    // seek the version prior to the unwrapped version from the objects table directly.
273    // If the tombstone is not found, then assume this is a newly created wrapped object hence
274    // we don't expect to find it in the table.
275    let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
276        .iter()
277        .filter_map(|(_tx_digest, id, seq_num)| {
278            let objref = store
279                .get_object_ref_prior_to_key_deprecated(id, *seq_num)
280                .expect("read cannot fail");
281
282            objref.map(|(id, version, digest)| {
283                assert!(
284                    !protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
285                    "{:?}",
286                    id
287                );
288                WrappedObject::new(id, version)
289            })
290        })
291        .collect();
292
293    acc.remove_all(
294        wrapped_objects_to_remove
295            .iter()
296            .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
297            .collect::<Vec<Vec<u8>>>(),
298    );
299
300    acc
301}
302
303fn accumulate_effects_v2<T, S>(store: S, effects: &[TransactionEffects]) -> GlobalStateHash
304where
305    S: std::ops::Deref<Target = T>,
306    T: GlobalStateHashStore + ?Sized,
307{
308    let mut acc = GlobalStateHash::default();
309
310    // process insertions to the set
311    acc.insert_all(
312        effects
313            .iter()
314            .flat_map(|fx| {
315                fx.all_changed_objects()
316                    .into_iter()
317                    .map(|(oref, _, _)| oref.2)
318            })
319            .collect::<Vec<ObjectDigest>>(),
320    );
321
322    // Collect keys from modified_at_versions to remove from the hasher.
323    let modified_at_version_keys: Vec<_> = effects
324        .iter()
325        .flat_map(|fx| {
326            fx.modified_at_versions()
327                .into_iter()
328                .map(|(id, version)| ObjectKey(id, version))
329        })
330        .collect();
331
332    let modified_at_digests: Vec<_> = store
333        .multi_get_objects_by_key(&modified_at_version_keys.clone())
334        .into_iter()
335        .zip(modified_at_version_keys)
336        .map(|(obj, key)| {
337            obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
338                .compute_object_reference()
339                .2
340        })
341        .collect();
342    acc.remove_all(modified_at_digests);
343
344    acc
345}
346
347fn accumulate_effects_v3(effects: &[TransactionEffects]) -> GlobalStateHash {
348    let mut acc = GlobalStateHash::default();
349
350    // process insertions to the set
351    acc.insert_all(
352        effects
353            .iter()
354            .flat_map(|fx| {
355                fx.all_changed_objects()
356                    .into_iter()
357                    .map(|(object_ref, _, _)| object_ref.2)
358            })
359            .collect::<Vec<ObjectDigest>>(),
360    );
361
362    // process modified objects to the set
363    acc.remove_all(
364        effects
365            .iter()
366            .flat_map(|fx| {
367                fx.old_object_metadata()
368                    .into_iter()
369                    .map(|(object_ref, _owner)| object_ref.2)
370            })
371            .collect::<Vec<ObjectDigest>>(),
372    );
373
374    acc
375}
376
377impl GlobalStateHasher {
378    pub fn new(store: Arc<dyn GlobalStateHashStore>, metrics: Arc<GlobalStateHashMetrics>) -> Self {
379        Self { store, metrics }
380    }
381
382    pub fn new_for_tests(store: Arc<dyn GlobalStateHashStore>) -> Self {
383        Self::new(store, GlobalStateHashMetrics::new(&Registry::new()))
384    }
385
386    pub fn metrics(&self) -> Arc<GlobalStateHashMetrics> {
387        self.metrics.clone()
388    }
389
390    pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
391        self.metrics
392            .inconsistent_state
393            .set(is_inconsistent_state as i64);
394    }
395
396    /// Accumulates the effects of a single checkpoint and persists the hasher.
397    pub fn accumulate_checkpoint(
398        &self,
399        effects: &[TransactionEffects],
400        checkpoint_seq_num: CheckpointSequenceNumber,
401        epoch_store: &AuthorityPerEpochStore,
402    ) -> SuiResult<GlobalStateHash> {
403        let _scope = monitored_scope("AccumulateCheckpoint");
404        if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
405            return Ok(acc);
406        }
407
408        let acc = self.accumulate_effects(effects, epoch_store.protocol_config());
409
410        epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
411        debug!("Accumulated checkpoint {}", checkpoint_seq_num);
412
413        epoch_store
414            .checkpoint_state_notify_read
415            .notify(&checkpoint_seq_num, &acc);
416
417        Ok(acc)
418    }
419
420    pub fn accumulate_cached_live_object_set_for_testing(
421        &self,
422        include_wrapped_tombstone: bool,
423    ) -> GlobalStateHash {
424        Self::accumulate_live_object_set_impl(
425            self.store
426                .iter_cached_live_object_set_for_testing(include_wrapped_tombstone),
427        )
428    }
429
430    /// Returns the result of accumulating the live object set, without side effects
431    pub fn accumulate_live_object_set(&self, include_wrapped_tombstone: bool) -> GlobalStateHash {
432        Self::accumulate_live_object_set_impl(
433            self.store.iter_live_object_set(include_wrapped_tombstone),
434        )
435    }
436
437    fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> GlobalStateHash {
438        let mut acc = GlobalStateHash::default();
439        iter.for_each(|live_object| {
440            Self::accumulate_live_object(&mut acc, &live_object);
441        });
442        acc
443    }
444
445    pub fn accumulate_live_object(acc: &mut GlobalStateHash, live_object: &LiveObject) {
446        match live_object {
447            LiveObject::Normal(object) => {
448                acc.insert(object.compute_object_reference().2);
449            }
450            LiveObject::Wrapped(key) => {
451                acc.insert(
452                    bcs::to_bytes(&WrappedObject::new(key.0, key.1))
453                        .expect("Failed to serialize WrappedObject"),
454                );
455            }
456        }
457    }
458
459    pub fn digest_live_object_set(
460        &self,
461        include_wrapped_tombstone: bool,
462    ) -> ECMHLiveObjectSetDigest {
463        let acc = self.accumulate_live_object_set(include_wrapped_tombstone);
464        acc.digest().into()
465    }
466
467    pub async fn digest_epoch(
468        &self,
469        epoch_store: Arc<AuthorityPerEpochStore>,
470        last_checkpoint_of_epoch: CheckpointSequenceNumber,
471    ) -> SuiResult<ECMHLiveObjectSetDigest> {
472        Ok(self
473            .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
474            .digest()
475            .into())
476    }
477
478    pub async fn wait_for_previous_running_root(
479        &self,
480        epoch_store: &AuthorityPerEpochStore,
481        checkpoint_seq_num: CheckpointSequenceNumber,
482    ) -> SuiResult {
483        assert!(checkpoint_seq_num > 0);
484
485        // Check if this is the first checkpoint of the new epoch, in which case
486        // there is nothing to wait for.
487        if self
488            .store
489            .get_root_state_hash_for_highest_epoch()?
490            .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
491            == Some(checkpoint_seq_num - 1)
492        {
493            return Ok(());
494        }
495
496        // There is an edge case here where checkpoint_seq_num is 1. This means the previous
497        // checkpoint is the genesis checkpoint. CheckpointExecutor is guaranteed to execute
498        // and accumulate the genesis checkpoint, so this will resolve.
499        epoch_store
500            .notify_read_running_root(checkpoint_seq_num - 1)
501            .await?;
502        Ok(())
503    }
504
505    fn get_prior_root(
506        &self,
507        epoch_store: &AuthorityPerEpochStore,
508        checkpoint_seq_num: CheckpointSequenceNumber,
509    ) -> SuiResult<GlobalStateHash> {
510        if checkpoint_seq_num == 0 {
511            return Ok(GlobalStateHash::default());
512        }
513
514        if let Some(prior_running_root) =
515            epoch_store.get_running_root_state_hash(checkpoint_seq_num - 1)?
516        {
517            return Ok(prior_running_root);
518        }
519
520        if let Some((last_checkpoint_prev_epoch, prev_acc)) = self
521            .store
522            .get_root_state_hash_for_epoch(epoch_store.epoch() - 1)?
523            && last_checkpoint_prev_epoch == checkpoint_seq_num - 1
524        {
525            return Ok(prev_acc);
526        }
527
528        fatal!(
529            "Running root state hasher must exist for checkpoint {}",
530            checkpoint_seq_num - 1
531        );
532    }
533
534    // Accumulate the running root.
535    // The previous checkpoint must be accumulated before calling this function, or it will panic.
536    pub fn accumulate_running_root(
537        &self,
538        epoch_store: &AuthorityPerEpochStore,
539        checkpoint_seq_num: CheckpointSequenceNumber,
540        checkpoint_acc: Option<GlobalStateHash>,
541    ) -> SuiResult {
542        let _scope = monitored_scope("AccumulateRunningRoot");
543        tracing::info!(
544            "accumulating running root for checkpoint {}",
545            checkpoint_seq_num
546        );
547
548        // Idempotency.
549        if epoch_store
550            .get_running_root_state_hash(checkpoint_seq_num)?
551            .is_some()
552        {
553            debug!(
554                "accumulate_running_root {:?} {:?} already exists",
555                epoch_store.epoch(),
556                checkpoint_seq_num
557            );
558            return Ok(());
559        }
560
561        let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
562
563        let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
564            epoch_store
565                .get_state_hash_for_checkpoint(&checkpoint_seq_num)
566                .expect("Failed to get checkpoint accumulator from disk")
567                .expect("Expected checkpoint accumulator to exist")
568        });
569        running_root.union(&checkpoint_acc);
570        epoch_store.insert_running_root_state_hash(&checkpoint_seq_num, &running_root)?;
571        debug!(
572            "Accumulated checkpoint {} to running root accumulator",
573            checkpoint_seq_num,
574        );
575        Ok(())
576    }
577
578    pub fn accumulate_epoch(
579        &self,
580        epoch_store: Arc<AuthorityPerEpochStore>,
581        last_checkpoint_of_epoch: CheckpointSequenceNumber,
582    ) -> SuiResult<GlobalStateHash> {
583        let _scope = monitored_scope("AccumulateEpochV2");
584        let running_root = epoch_store
585            .get_running_root_state_hash(last_checkpoint_of_epoch)?
586            .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
587
588        self.store.insert_state_hash_for_epoch(
589            epoch_store.epoch(),
590            &last_checkpoint_of_epoch,
591            &running_root,
592        )?;
593        debug!(
594            "Finalized root state hash for epoch {} (up to checkpoint {})",
595            epoch_store.epoch(),
596            last_checkpoint_of_epoch
597        );
598        Ok(running_root.clone())
599    }
600
601    pub fn accumulate_effects(
602        &self,
603        effects: &[TransactionEffects],
604        protocol_config: &ProtocolConfig,
605    ) -> GlobalStateHash {
606        accumulate_effects(&*self.store, effects, protocol_config)
607    }
608}