1use itertools::Itertools;
5use mysten_common::ZipDebugEqIteratorExt;
6use mysten_common::fatal;
7use mysten_metrics::monitored_scope;
8use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
9use serde::Serialize;
10use sui_protocol_config::ProtocolConfig;
11use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
12use sui_types::committee::EpochId;
13use sui_types::digests::{ObjectDigest, TransactionDigest};
14use sui_types::in_memory_storage::InMemoryStorage;
15use sui_types::storage::{ObjectKey, ObjectStore};
16use tracing::debug;
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use fastcrypto::hash::MultisetHash;
22use sui_types::effects::TransactionEffects;
23use sui_types::effects::TransactionEffectsAPI;
24use sui_types::error::SuiResult;
25use sui_types::global_state_hash::GlobalStateHash;
26use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest};
27
28use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
29use crate::authority::authority_store_tables::LiveObject;
30
31pub struct GlobalStateHashMetrics {
32 inconsistent_state: IntGauge,
33}
34
35impl GlobalStateHashMetrics {
36 pub fn new(registry: &Registry) -> Arc<Self> {
37 let this = Self {
38 inconsistent_state: register_int_gauge_with_registry!(
39 "global_state_hasher_inconsistent_state",
40 "1 if accumulated live object set differs from GlobalStateHasher root state hash for the previous epoch",
41 registry
42 )
43 .unwrap(),
44 };
45 Arc::new(this)
46 }
47}
48
49pub struct GlobalStateHasher {
50 store: Arc<dyn GlobalStateHashStore>,
51 metrics: Arc<GlobalStateHashMetrics>,
52}
53
54pub trait GlobalStateHashStore: ObjectStore + Send + Sync {
55 fn get_object_ref_prior_to_key_deprecated(
58 &self,
59 object_id: &ObjectID,
60 version: VersionNumber,
61 ) -> SuiResult<Option<ObjectRef>>;
62
63 fn get_root_state_hash_for_epoch(
64 &self,
65 epoch: EpochId,
66 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>>;
67
68 fn get_root_state_hash_for_highest_epoch(
69 &self,
70 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>>;
71
72 fn insert_state_hash_for_epoch(
73 &self,
74 epoch: EpochId,
75 checkpoint_seq_num: &CheckpointSequenceNumber,
76 acc: &GlobalStateHash,
77 ) -> SuiResult;
78
79 fn iter_live_object_set(
80 &self,
81 include_wrapped_tombstone: bool,
82 ) -> Box<dyn Iterator<Item = LiveObject> + '_>;
83
84 fn iter_cached_live_object_set_for_testing(
85 &self,
86 include_wrapped_tombstone: bool,
87 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
88 self.iter_live_object_set(include_wrapped_tombstone)
89 }
90}
91
92impl GlobalStateHashStore for InMemoryStorage {
93 fn get_object_ref_prior_to_key_deprecated(
94 &self,
95 _object_id: &ObjectID,
96 _version: VersionNumber,
97 ) -> SuiResult<Option<ObjectRef>> {
98 unreachable!(
99 "get_object_ref_prior_to_key is only called by accumulate_effects_v1, while InMemoryStorage is used by testing and genesis only, which always uses latest protocol "
100 )
101 }
102
103 fn get_root_state_hash_for_epoch(
104 &self,
105 _epoch: EpochId,
106 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
107 unreachable!("not used for testing")
108 }
109
110 fn get_root_state_hash_for_highest_epoch(
111 &self,
112 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
113 unreachable!("not used for testing")
114 }
115
116 fn insert_state_hash_for_epoch(
117 &self,
118 _epoch: EpochId,
119 _checkpoint_seq_num: &CheckpointSequenceNumber,
120 _acc: &GlobalStateHash,
121 ) -> SuiResult {
122 unreachable!("not used for testing")
123 }
124
125 fn iter_live_object_set(
126 &self,
127 _include_wrapped_tombstone: bool,
128 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
129 unreachable!("not used for testing")
130 }
131}
132
133#[derive(Serialize, Debug)]
137pub struct WrappedObject {
138 id: ObjectID,
139 wrapped_at: SequenceNumber,
140 digest: ObjectDigest,
141}
142
143impl WrappedObject {
144 pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
145 Self {
146 id,
147 wrapped_at,
148 digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
149 }
150 }
151}
152
153pub fn accumulate_effects<T, S>(
154 store: S,
155 effects: &[TransactionEffects],
156 protocol_config: &ProtocolConfig,
157) -> GlobalStateHash
158where
159 S: std::ops::Deref<Target = T>,
160 T: GlobalStateHashStore + ?Sized,
161{
162 if protocol_config.enable_effects_v2() {
163 accumulate_effects_v3(effects)
164 } else if protocol_config.simplified_unwrap_then_delete() {
165 accumulate_effects_v2(store, effects)
166 } else {
167 accumulate_effects_v1(store, effects, protocol_config)
168 }
169}
170
171fn accumulate_effects_v1<T, S>(
172 store: S,
173 effects: &[TransactionEffects],
174 protocol_config: &ProtocolConfig,
175) -> GlobalStateHash
176where
177 S: std::ops::Deref<Target = T>,
178 T: GlobalStateHashStore + ?Sized,
179{
180 let mut acc = GlobalStateHash::default();
181
182 acc.insert_all(
184 effects
185 .iter()
186 .flat_map(|fx| {
187 fx.all_changed_objects()
188 .into_iter()
189 .map(|(oref, _, _)| oref.2)
190 })
191 .collect::<Vec<ObjectDigest>>(),
192 );
193
194 acc.insert_all(
197 effects
198 .iter()
199 .flat_map(|fx| {
200 fx.wrapped()
201 .iter()
202 .map(|oref| {
203 bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
204 .unwrap()
205 .to_vec()
206 })
207 .collect::<Vec<Vec<u8>>>()
208 })
209 .collect::<Vec<Vec<u8>>>(),
210 );
211
212 let all_unwrapped = effects
213 .iter()
214 .flat_map(|fx| {
215 fx.unwrapped()
216 .into_iter()
217 .map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
218 })
219 .chain(effects.iter().flat_map(|fx| {
220 fx.unwrapped_then_deleted()
221 .into_iter()
222 .map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
223 }))
224 .collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();
225
226 let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
227 .iter()
228 .map(|(digest, id, _)| (*digest, *id))
229 .into_group_map()
230 .iter()
231 .map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
232 .collect();
233
234 let modified_at_version_keys: Vec<ObjectKey> = effects
239 .iter()
240 .flat_map(|fx| {
241 fx.modified_at_versions()
242 .into_iter()
243 .map(|(id, seq_num)| (*fx.transaction_digest(), id, seq_num))
244 })
245 .filter_map(|(tx_digest, id, seq_num)| {
246 if let Some(ids) = unwrapped_ids.get(&tx_digest) {
248 if ids.contains(&id) {
250 return None;
251 }
252 }
253 Some(ObjectKey(id, seq_num))
254 })
255 .collect();
256
257 let modified_at_digests: Vec<_> = store
258 .multi_get_objects_by_key(&modified_at_version_keys.clone())
259 .into_iter()
260 .zip_debug_eq(modified_at_version_keys)
261 .map(|(obj, key)| {
262 obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
263 .compute_object_reference()
264 .2
265 })
266 .collect();
267 acc.remove_all(modified_at_digests);
268
269 let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
277 .iter()
278 .filter_map(|(_tx_digest, id, seq_num)| {
279 let objref = store
280 .get_object_ref_prior_to_key_deprecated(id, *seq_num)
281 .expect("read cannot fail");
282
283 objref.map(|(id, version, digest)| {
284 assert!(
285 !protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
286 "{:?}",
287 id
288 );
289 WrappedObject::new(id, version)
290 })
291 })
292 .collect();
293
294 acc.remove_all(
295 wrapped_objects_to_remove
296 .iter()
297 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
298 .collect::<Vec<Vec<u8>>>(),
299 );
300
301 acc
302}
303
304fn accumulate_effects_v2<T, S>(store: S, effects: &[TransactionEffects]) -> GlobalStateHash
305where
306 S: std::ops::Deref<Target = T>,
307 T: GlobalStateHashStore + ?Sized,
308{
309 let mut acc = GlobalStateHash::default();
310
311 acc.insert_all(
313 effects
314 .iter()
315 .flat_map(|fx| {
316 fx.all_changed_objects()
317 .into_iter()
318 .map(|(oref, _, _)| oref.2)
319 })
320 .collect::<Vec<ObjectDigest>>(),
321 );
322
323 let modified_at_version_keys: Vec<_> = effects
325 .iter()
326 .flat_map(|fx| {
327 fx.modified_at_versions()
328 .into_iter()
329 .map(|(id, version)| ObjectKey(id, version))
330 })
331 .collect();
332
333 let modified_at_digests: Vec<_> = store
334 .multi_get_objects_by_key(&modified_at_version_keys.clone())
335 .into_iter()
336 .zip_debug_eq(modified_at_version_keys)
337 .map(|(obj, key)| {
338 obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
339 .compute_object_reference()
340 .2
341 })
342 .collect();
343 acc.remove_all(modified_at_digests);
344
345 acc
346}
347
348fn accumulate_effects_v3(effects: &[TransactionEffects]) -> GlobalStateHash {
349 let mut acc = GlobalStateHash::default();
350
351 acc.insert_all(
353 effects
354 .iter()
355 .flat_map(|fx| {
356 fx.all_changed_objects()
357 .into_iter()
358 .map(|(object_ref, _, _)| object_ref.2)
359 })
360 .collect::<Vec<ObjectDigest>>(),
361 );
362
363 acc.remove_all(
365 effects
366 .iter()
367 .flat_map(|fx| {
368 fx.old_object_metadata()
369 .into_iter()
370 .map(|(object_ref, _owner)| object_ref.2)
371 })
372 .collect::<Vec<ObjectDigest>>(),
373 );
374
375 acc
376}
377
378impl GlobalStateHasher {
379 pub fn new(store: Arc<dyn GlobalStateHashStore>, metrics: Arc<GlobalStateHashMetrics>) -> Self {
380 Self { store, metrics }
381 }
382
383 pub fn new_for_tests(store: Arc<dyn GlobalStateHashStore>) -> Self {
384 Self::new(store, GlobalStateHashMetrics::new(&Registry::new()))
385 }
386
387 pub fn metrics(&self) -> Arc<GlobalStateHashMetrics> {
388 self.metrics.clone()
389 }
390
391 pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
392 self.metrics
393 .inconsistent_state
394 .set(is_inconsistent_state as i64);
395 }
396
397 pub fn accumulate_checkpoint(
399 &self,
400 effects: &[TransactionEffects],
401 checkpoint_seq_num: CheckpointSequenceNumber,
402 epoch_store: &AuthorityPerEpochStore,
403 ) -> SuiResult<GlobalStateHash> {
404 let _scope = monitored_scope("AccumulateCheckpoint");
405 if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
406 return Ok(acc);
407 }
408
409 let acc = self.accumulate_effects(effects, epoch_store.protocol_config());
410
411 epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
412 debug!("Accumulated checkpoint {}", checkpoint_seq_num);
413
414 epoch_store
415 .checkpoint_state_notify_read
416 .notify(&checkpoint_seq_num, &acc);
417
418 Ok(acc)
419 }
420
421 pub fn accumulate_cached_live_object_set_for_testing(
422 &self,
423 include_wrapped_tombstone: bool,
424 ) -> GlobalStateHash {
425 Self::accumulate_live_object_set_impl(
426 self.store
427 .iter_cached_live_object_set_for_testing(include_wrapped_tombstone),
428 )
429 }
430
431 pub fn accumulate_live_object_set(&self, include_wrapped_tombstone: bool) -> GlobalStateHash {
433 Self::accumulate_live_object_set_impl(
434 self.store.iter_live_object_set(include_wrapped_tombstone),
435 )
436 }
437
438 fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> GlobalStateHash {
439 let mut acc = GlobalStateHash::default();
440 iter.for_each(|live_object| {
441 Self::accumulate_live_object(&mut acc, &live_object);
442 });
443 acc
444 }
445
446 pub fn accumulate_live_object(acc: &mut GlobalStateHash, live_object: &LiveObject) {
447 match live_object {
448 LiveObject::Normal(object) => {
449 acc.insert(object.compute_object_reference().2);
450 }
451 LiveObject::Wrapped(key) => {
452 acc.insert(
453 bcs::to_bytes(&WrappedObject::new(key.0, key.1))
454 .expect("Failed to serialize WrappedObject"),
455 );
456 }
457 }
458 }
459
460 pub fn digest_live_object_set(
461 &self,
462 include_wrapped_tombstone: bool,
463 ) -> ECMHLiveObjectSetDigest {
464 let acc = self.accumulate_live_object_set(include_wrapped_tombstone);
465 acc.digest().into()
466 }
467
468 pub async fn digest_epoch(
469 &self,
470 epoch_store: Arc<AuthorityPerEpochStore>,
471 last_checkpoint_of_epoch: CheckpointSequenceNumber,
472 ) -> SuiResult<ECMHLiveObjectSetDigest> {
473 Ok(self
474 .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
475 .digest()
476 .into())
477 }
478
479 pub async fn wait_for_previous_running_root(
480 &self,
481 epoch_store: &AuthorityPerEpochStore,
482 checkpoint_seq_num: CheckpointSequenceNumber,
483 ) -> SuiResult {
484 assert!(checkpoint_seq_num > 0);
485
486 if self
489 .store
490 .get_root_state_hash_for_highest_epoch()?
491 .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
492 == Some(checkpoint_seq_num - 1)
493 {
494 return Ok(());
495 }
496
497 epoch_store
501 .notify_read_running_root(checkpoint_seq_num - 1)
502 .await?;
503 Ok(())
504 }
505
506 fn get_prior_root(
507 &self,
508 epoch_store: &AuthorityPerEpochStore,
509 checkpoint_seq_num: CheckpointSequenceNumber,
510 ) -> SuiResult<GlobalStateHash> {
511 if checkpoint_seq_num == 0 {
512 return Ok(GlobalStateHash::default());
513 }
514
515 if let Some(prior_running_root) =
516 epoch_store.get_running_root_state_hash(checkpoint_seq_num - 1)?
517 {
518 return Ok(prior_running_root);
519 }
520
521 if let Some((last_checkpoint_prev_epoch, prev_acc)) = self
522 .store
523 .get_root_state_hash_for_epoch(epoch_store.epoch() - 1)?
524 && last_checkpoint_prev_epoch == checkpoint_seq_num - 1
525 {
526 return Ok(prev_acc);
527 }
528
529 fatal!(
530 "Running root state hasher must exist for checkpoint {}",
531 checkpoint_seq_num - 1
532 );
533 }
534
535 pub fn accumulate_running_root(
538 &self,
539 epoch_store: &AuthorityPerEpochStore,
540 checkpoint_seq_num: CheckpointSequenceNumber,
541 checkpoint_acc: Option<GlobalStateHash>,
542 ) -> SuiResult {
543 let _scope = monitored_scope("AccumulateRunningRoot");
544 tracing::debug!(
545 "accumulating running root for checkpoint {}",
546 checkpoint_seq_num
547 );
548
549 if epoch_store
551 .get_running_root_state_hash(checkpoint_seq_num)?
552 .is_some()
553 {
554 debug!(
555 "accumulate_running_root {:?} {:?} already exists",
556 epoch_store.epoch(),
557 checkpoint_seq_num
558 );
559 return Ok(());
560 }
561
562 let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
563
564 let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
565 epoch_store
566 .get_state_hash_for_checkpoint(&checkpoint_seq_num)
567 .expect("Failed to get checkpoint accumulator from disk")
568 .expect("Expected checkpoint accumulator to exist")
569 });
570 running_root.union(&checkpoint_acc);
571 epoch_store.insert_running_root_state_hash(&checkpoint_seq_num, &running_root)?;
572 debug!(
573 "Accumulated checkpoint {} to running root accumulator",
574 checkpoint_seq_num,
575 );
576 Ok(())
577 }
578
579 pub fn accumulate_epoch(
580 &self,
581 epoch_store: Arc<AuthorityPerEpochStore>,
582 last_checkpoint_of_epoch: CheckpointSequenceNumber,
583 ) -> SuiResult<GlobalStateHash> {
584 let _scope = monitored_scope("AccumulateEpochV2");
585 let running_root = epoch_store
586 .get_running_root_state_hash(last_checkpoint_of_epoch)?
587 .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
588
589 self.store.insert_state_hash_for_epoch(
590 epoch_store.epoch(),
591 &last_checkpoint_of_epoch,
592 &running_root,
593 )?;
594 debug!(
595 "Finalized root state hash for epoch {} (up to checkpoint {})",
596 epoch_store.epoch(),
597 last_checkpoint_of_epoch
598 );
599 Ok(running_root.clone())
600 }
601
602 pub fn accumulate_effects(
603 &self,
604 effects: &[TransactionEffects],
605 protocol_config: &ProtocolConfig,
606 ) -> GlobalStateHash {
607 accumulate_effects(&*self.store, effects, protocol_config)
608 }
609}