1use itertools::Itertools;
5use mysten_common::fatal;
6use mysten_metrics::monitored_scope;
7use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
8use serde::Serialize;
9use sui_protocol_config::ProtocolConfig;
10use sui_types::base_types::{ObjectID, ObjectRef, SequenceNumber, VersionNumber};
11use sui_types::committee::EpochId;
12use sui_types::digests::{ObjectDigest, TransactionDigest};
13use sui_types::in_memory_storage::InMemoryStorage;
14use sui_types::storage::{ObjectKey, ObjectStore};
15use tracing::debug;
16
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19
20use fastcrypto::hash::MultisetHash;
21use sui_types::effects::TransactionEffects;
22use sui_types::effects::TransactionEffectsAPI;
23use sui_types::error::SuiResult;
24use sui_types::global_state_hash::GlobalStateHash;
25use sui_types::messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest};
26
27use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
28use crate::authority::authority_store_tables::LiveObject;
29
30pub struct GlobalStateHashMetrics {
31 inconsistent_state: IntGauge,
32}
33
34impl GlobalStateHashMetrics {
35 pub fn new(registry: &Registry) -> Arc<Self> {
36 let this = Self {
37 inconsistent_state: register_int_gauge_with_registry!(
38 "global_state_hasher_inconsistent_state",
39 "1 if accumulated live object set differs from GlobalStateHasher root state hash for the previous epoch",
40 registry
41 )
42 .unwrap(),
43 };
44 Arc::new(this)
45 }
46}
47
48pub struct GlobalStateHasher {
49 store: Arc<dyn GlobalStateHashStore>,
50 metrics: Arc<GlobalStateHashMetrics>,
51}
52
53pub trait GlobalStateHashStore: ObjectStore + Send + Sync {
54 fn get_object_ref_prior_to_key_deprecated(
57 &self,
58 object_id: &ObjectID,
59 version: VersionNumber,
60 ) -> SuiResult<Option<ObjectRef>>;
61
62 fn get_root_state_hash_for_epoch(
63 &self,
64 epoch: EpochId,
65 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>>;
66
67 fn get_root_state_hash_for_highest_epoch(
68 &self,
69 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>>;
70
71 fn insert_state_hash_for_epoch(
72 &self,
73 epoch: EpochId,
74 checkpoint_seq_num: &CheckpointSequenceNumber,
75 acc: &GlobalStateHash,
76 ) -> SuiResult;
77
78 fn iter_live_object_set(
79 &self,
80 include_wrapped_tombstone: bool,
81 ) -> Box<dyn Iterator<Item = LiveObject> + '_>;
82
83 fn iter_cached_live_object_set_for_testing(
84 &self,
85 include_wrapped_tombstone: bool,
86 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
87 self.iter_live_object_set(include_wrapped_tombstone)
88 }
89}
90
91impl GlobalStateHashStore for InMemoryStorage {
92 fn get_object_ref_prior_to_key_deprecated(
93 &self,
94 _object_id: &ObjectID,
95 _version: VersionNumber,
96 ) -> SuiResult<Option<ObjectRef>> {
97 unreachable!(
98 "get_object_ref_prior_to_key is only called by accumulate_effects_v1, while InMemoryStorage is used by testing and genesis only, which always uses latest protocol "
99 )
100 }
101
102 fn get_root_state_hash_for_epoch(
103 &self,
104 _epoch: EpochId,
105 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
106 unreachable!("not used for testing")
107 }
108
109 fn get_root_state_hash_for_highest_epoch(
110 &self,
111 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
112 unreachable!("not used for testing")
113 }
114
115 fn insert_state_hash_for_epoch(
116 &self,
117 _epoch: EpochId,
118 _checkpoint_seq_num: &CheckpointSequenceNumber,
119 _acc: &GlobalStateHash,
120 ) -> SuiResult {
121 unreachable!("not used for testing")
122 }
123
124 fn iter_live_object_set(
125 &self,
126 _include_wrapped_tombstone: bool,
127 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
128 unreachable!("not used for testing")
129 }
130}
131
132#[derive(Serialize, Debug)]
136pub struct WrappedObject {
137 id: ObjectID,
138 wrapped_at: SequenceNumber,
139 digest: ObjectDigest,
140}
141
142impl WrappedObject {
143 pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
144 Self {
145 id,
146 wrapped_at,
147 digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
148 }
149 }
150}
151
152pub fn accumulate_effects<T, S>(
153 store: S,
154 effects: &[TransactionEffects],
155 protocol_config: &ProtocolConfig,
156) -> GlobalStateHash
157where
158 S: std::ops::Deref<Target = T>,
159 T: GlobalStateHashStore + ?Sized,
160{
161 if protocol_config.enable_effects_v2() {
162 accumulate_effects_v3(effects)
163 } else if protocol_config.simplified_unwrap_then_delete() {
164 accumulate_effects_v2(store, effects)
165 } else {
166 accumulate_effects_v1(store, effects, protocol_config)
167 }
168}
169
170fn accumulate_effects_v1<T, S>(
171 store: S,
172 effects: &[TransactionEffects],
173 protocol_config: &ProtocolConfig,
174) -> GlobalStateHash
175where
176 S: std::ops::Deref<Target = T>,
177 T: GlobalStateHashStore + ?Sized,
178{
179 let mut acc = GlobalStateHash::default();
180
181 acc.insert_all(
183 effects
184 .iter()
185 .flat_map(|fx| {
186 fx.all_changed_objects()
187 .into_iter()
188 .map(|(oref, _, _)| oref.2)
189 })
190 .collect::<Vec<ObjectDigest>>(),
191 );
192
193 acc.insert_all(
196 effects
197 .iter()
198 .flat_map(|fx| {
199 fx.wrapped()
200 .iter()
201 .map(|oref| {
202 bcs::to_bytes(&WrappedObject::new(oref.0, oref.1))
203 .unwrap()
204 .to_vec()
205 })
206 .collect::<Vec<Vec<u8>>>()
207 })
208 .collect::<Vec<Vec<u8>>>(),
209 );
210
211 let all_unwrapped = effects
212 .iter()
213 .flat_map(|fx| {
214 fx.unwrapped()
215 .into_iter()
216 .map(|(oref, _owner)| (*fx.transaction_digest(), oref.0, oref.1))
217 })
218 .chain(effects.iter().flat_map(|fx| {
219 fx.unwrapped_then_deleted()
220 .into_iter()
221 .map(|oref| (*fx.transaction_digest(), oref.0, oref.1))
222 }))
223 .collect::<Vec<(TransactionDigest, ObjectID, SequenceNumber)>>();
224
225 let unwrapped_ids: HashMap<TransactionDigest, HashSet<ObjectID>> = all_unwrapped
226 .iter()
227 .map(|(digest, id, _)| (*digest, *id))
228 .into_group_map()
229 .iter()
230 .map(|(digest, ids)| (*digest, HashSet::from_iter(ids.iter().cloned())))
231 .collect();
232
233 let modified_at_version_keys: Vec<ObjectKey> = effects
238 .iter()
239 .flat_map(|fx| {
240 fx.modified_at_versions()
241 .into_iter()
242 .map(|(id, seq_num)| (*fx.transaction_digest(), id, seq_num))
243 })
244 .filter_map(|(tx_digest, id, seq_num)| {
245 if let Some(ids) = unwrapped_ids.get(&tx_digest) {
247 if ids.contains(&id) {
249 return None;
250 }
251 }
252 Some(ObjectKey(id, seq_num))
253 })
254 .collect();
255
256 let modified_at_digests: Vec<_> = store
257 .multi_get_objects_by_key(&modified_at_version_keys.clone())
258 .into_iter()
259 .zip(modified_at_version_keys)
260 .map(|(obj, key)| {
261 obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
262 .compute_object_reference()
263 .2
264 })
265 .collect();
266 acc.remove_all(modified_at_digests);
267
268 let wrapped_objects_to_remove: Vec<WrappedObject> = all_unwrapped
276 .iter()
277 .filter_map(|(_tx_digest, id, seq_num)| {
278 let objref = store
279 .get_object_ref_prior_to_key_deprecated(id, *seq_num)
280 .expect("read cannot fail");
281
282 objref.map(|(id, version, digest)| {
283 assert!(
284 !protocol_config.loaded_child_objects_fixed() || digest.is_wrapped(),
285 "{:?}",
286 id
287 );
288 WrappedObject::new(id, version)
289 })
290 })
291 .collect();
292
293 acc.remove_all(
294 wrapped_objects_to_remove
295 .iter()
296 .map(|wrapped| bcs::to_bytes(wrapped).unwrap().to_vec())
297 .collect::<Vec<Vec<u8>>>(),
298 );
299
300 acc
301}
302
303fn accumulate_effects_v2<T, S>(store: S, effects: &[TransactionEffects]) -> GlobalStateHash
304where
305 S: std::ops::Deref<Target = T>,
306 T: GlobalStateHashStore + ?Sized,
307{
308 let mut acc = GlobalStateHash::default();
309
310 acc.insert_all(
312 effects
313 .iter()
314 .flat_map(|fx| {
315 fx.all_changed_objects()
316 .into_iter()
317 .map(|(oref, _, _)| oref.2)
318 })
319 .collect::<Vec<ObjectDigest>>(),
320 );
321
322 let modified_at_version_keys: Vec<_> = effects
324 .iter()
325 .flat_map(|fx| {
326 fx.modified_at_versions()
327 .into_iter()
328 .map(|(id, version)| ObjectKey(id, version))
329 })
330 .collect();
331
332 let modified_at_digests: Vec<_> = store
333 .multi_get_objects_by_key(&modified_at_version_keys.clone())
334 .into_iter()
335 .zip(modified_at_version_keys)
336 .map(|(obj, key)| {
337 obj.unwrap_or_else(|| panic!("Object for key {:?} from modified_at_versions effects does not exist in objects table", key))
338 .compute_object_reference()
339 .2
340 })
341 .collect();
342 acc.remove_all(modified_at_digests);
343
344 acc
345}
346
347fn accumulate_effects_v3(effects: &[TransactionEffects]) -> GlobalStateHash {
348 let mut acc = GlobalStateHash::default();
349
350 acc.insert_all(
352 effects
353 .iter()
354 .flat_map(|fx| {
355 fx.all_changed_objects()
356 .into_iter()
357 .map(|(object_ref, _, _)| object_ref.2)
358 })
359 .collect::<Vec<ObjectDigest>>(),
360 );
361
362 acc.remove_all(
364 effects
365 .iter()
366 .flat_map(|fx| {
367 fx.old_object_metadata()
368 .into_iter()
369 .map(|(object_ref, _owner)| object_ref.2)
370 })
371 .collect::<Vec<ObjectDigest>>(),
372 );
373
374 acc
375}
376
377impl GlobalStateHasher {
378 pub fn new(store: Arc<dyn GlobalStateHashStore>, metrics: Arc<GlobalStateHashMetrics>) -> Self {
379 Self { store, metrics }
380 }
381
382 pub fn new_for_tests(store: Arc<dyn GlobalStateHashStore>) -> Self {
383 Self::new(store, GlobalStateHashMetrics::new(&Registry::new()))
384 }
385
386 pub fn metrics(&self) -> Arc<GlobalStateHashMetrics> {
387 self.metrics.clone()
388 }
389
390 pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
391 self.metrics
392 .inconsistent_state
393 .set(is_inconsistent_state as i64);
394 }
395
396 pub fn accumulate_checkpoint(
398 &self,
399 effects: &[TransactionEffects],
400 checkpoint_seq_num: CheckpointSequenceNumber,
401 epoch_store: &AuthorityPerEpochStore,
402 ) -> SuiResult<GlobalStateHash> {
403 let _scope = monitored_scope("AccumulateCheckpoint");
404 if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
405 return Ok(acc);
406 }
407
408 let acc = self.accumulate_effects(effects, epoch_store.protocol_config());
409
410 epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
411 debug!("Accumulated checkpoint {}", checkpoint_seq_num);
412
413 epoch_store
414 .checkpoint_state_notify_read
415 .notify(&checkpoint_seq_num, &acc);
416
417 Ok(acc)
418 }
419
420 pub fn accumulate_cached_live_object_set_for_testing(
421 &self,
422 include_wrapped_tombstone: bool,
423 ) -> GlobalStateHash {
424 Self::accumulate_live_object_set_impl(
425 self.store
426 .iter_cached_live_object_set_for_testing(include_wrapped_tombstone),
427 )
428 }
429
430 pub fn accumulate_live_object_set(&self, include_wrapped_tombstone: bool) -> GlobalStateHash {
432 Self::accumulate_live_object_set_impl(
433 self.store.iter_live_object_set(include_wrapped_tombstone),
434 )
435 }
436
437 fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> GlobalStateHash {
438 let mut acc = GlobalStateHash::default();
439 iter.for_each(|live_object| {
440 Self::accumulate_live_object(&mut acc, &live_object);
441 });
442 acc
443 }
444
445 pub fn accumulate_live_object(acc: &mut GlobalStateHash, live_object: &LiveObject) {
446 match live_object {
447 LiveObject::Normal(object) => {
448 acc.insert(object.compute_object_reference().2);
449 }
450 LiveObject::Wrapped(key) => {
451 acc.insert(
452 bcs::to_bytes(&WrappedObject::new(key.0, key.1))
453 .expect("Failed to serialize WrappedObject"),
454 );
455 }
456 }
457 }
458
459 pub fn digest_live_object_set(
460 &self,
461 include_wrapped_tombstone: bool,
462 ) -> ECMHLiveObjectSetDigest {
463 let acc = self.accumulate_live_object_set(include_wrapped_tombstone);
464 acc.digest().into()
465 }
466
467 pub async fn digest_epoch(
468 &self,
469 epoch_store: Arc<AuthorityPerEpochStore>,
470 last_checkpoint_of_epoch: CheckpointSequenceNumber,
471 ) -> SuiResult<ECMHLiveObjectSetDigest> {
472 Ok(self
473 .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
474 .digest()
475 .into())
476 }
477
478 pub async fn wait_for_previous_running_root(
479 &self,
480 epoch_store: &AuthorityPerEpochStore,
481 checkpoint_seq_num: CheckpointSequenceNumber,
482 ) -> SuiResult {
483 assert!(checkpoint_seq_num > 0);
484
485 if self
488 .store
489 .get_root_state_hash_for_highest_epoch()?
490 .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
491 == Some(checkpoint_seq_num - 1)
492 {
493 return Ok(());
494 }
495
496 epoch_store
500 .notify_read_running_root(checkpoint_seq_num - 1)
501 .await?;
502 Ok(())
503 }
504
505 fn get_prior_root(
506 &self,
507 epoch_store: &AuthorityPerEpochStore,
508 checkpoint_seq_num: CheckpointSequenceNumber,
509 ) -> SuiResult<GlobalStateHash> {
510 if checkpoint_seq_num == 0 {
511 return Ok(GlobalStateHash::default());
512 }
513
514 if let Some(prior_running_root) =
515 epoch_store.get_running_root_state_hash(checkpoint_seq_num - 1)?
516 {
517 return Ok(prior_running_root);
518 }
519
520 if let Some((last_checkpoint_prev_epoch, prev_acc)) = self
521 .store
522 .get_root_state_hash_for_epoch(epoch_store.epoch() - 1)?
523 && last_checkpoint_prev_epoch == checkpoint_seq_num - 1
524 {
525 return Ok(prev_acc);
526 }
527
528 fatal!(
529 "Running root state hasher must exist for checkpoint {}",
530 checkpoint_seq_num - 1
531 );
532 }
533
534 pub fn accumulate_running_root(
537 &self,
538 epoch_store: &AuthorityPerEpochStore,
539 checkpoint_seq_num: CheckpointSequenceNumber,
540 checkpoint_acc: Option<GlobalStateHash>,
541 ) -> SuiResult {
542 let _scope = monitored_scope("AccumulateRunningRoot");
543 tracing::info!(
544 "accumulating running root for checkpoint {}",
545 checkpoint_seq_num
546 );
547
548 if epoch_store
550 .get_running_root_state_hash(checkpoint_seq_num)?
551 .is_some()
552 {
553 debug!(
554 "accumulate_running_root {:?} {:?} already exists",
555 epoch_store.epoch(),
556 checkpoint_seq_num
557 );
558 return Ok(());
559 }
560
561 let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
562
563 let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
564 epoch_store
565 .get_state_hash_for_checkpoint(&checkpoint_seq_num)
566 .expect("Failed to get checkpoint accumulator from disk")
567 .expect("Expected checkpoint accumulator to exist")
568 });
569 running_root.union(&checkpoint_acc);
570 epoch_store.insert_running_root_state_hash(&checkpoint_seq_num, &running_root)?;
571 debug!(
572 "Accumulated checkpoint {} to running root accumulator",
573 checkpoint_seq_num,
574 );
575 Ok(())
576 }
577
578 pub fn accumulate_epoch(
579 &self,
580 epoch_store: Arc<AuthorityPerEpochStore>,
581 last_checkpoint_of_epoch: CheckpointSequenceNumber,
582 ) -> SuiResult<GlobalStateHash> {
583 let _scope = monitored_scope("AccumulateEpochV2");
584 let running_root = epoch_store
585 .get_running_root_state_hash(last_checkpoint_of_epoch)?
586 .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
587
588 self.store.insert_state_hash_for_epoch(
589 epoch_store.epoch(),
590 &last_checkpoint_of_epoch,
591 &running_root,
592 )?;
593 debug!(
594 "Finalized root state hash for epoch {} (up to checkpoint {})",
595 epoch_store.epoch(),
596 last_checkpoint_of_epoch
597 );
598 Ok(running_root.clone())
599 }
600
601 pub fn accumulate_effects(
602 &self,
603 effects: &[TransactionEffects],
604 protocol_config: &ProtocolConfig,
605 ) -> GlobalStateHash {
606 accumulate_effects(&*self.store, effects, protocol_config)
607 }
608}