1use crate::accumulators::funds_read::AccountFundsRead;
41use crate::authority::AuthorityStore;
42use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
43use crate::authority::authority_store::{
44 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
45};
46use crate::authority::authority_store_tables::LiveObject;
47use crate::authority::backpressure::BackpressureManager;
48use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
49use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
50use crate::global_state_hasher::GlobalStateHashStore;
51use crate::transaction_outputs::TransactionOutputs;
52
53use dashmap::DashMap;
54use dashmap::mapref::entry::Entry as DashMapEntry;
55use futures::{FutureExt, future::BoxFuture};
56use moka::sync::SegmentedCache as MokaCache;
57use mysten_common::debug_fatal;
58use mysten_common::random_util::randomize_cache_capacity_in_tests;
59use mysten_common::sync::notify_read::NotifyRead;
60use parking_lot::Mutex;
61use std::collections::{BTreeMap, HashSet};
62use std::hash::Hash;
63use std::sync::Arc;
64use std::sync::atomic::AtomicU64;
65use sui_config::ExecutionCacheConfig;
66use sui_macros::fail_point;
67use sui_protocol_config::ProtocolVersion;
68use sui_types::SUI_ACCUMULATOR_ROOT_OBJECT_ID;
69use sui_types::accumulator_event::AccumulatorEvent;
70use sui_types::accumulator_root::{AccumulatorObjId, AccumulatorValue};
71use sui_types::base_types::{
72 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
73};
74use sui_types::bridge::{Bridge, get_bridge};
75use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
76use sui_types::effects::{TransactionEffects, TransactionEvents};
77use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
78use sui_types::executable_transaction::VerifiedExecutableTransaction;
79use sui_types::global_state_hash::GlobalStateHash;
80use sui_types::message_envelope::Message;
81use sui_types::messages_checkpoint::CheckpointSequenceNumber;
82use sui_types::object::Object;
83use sui_types::storage::{
84 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
85};
86use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
87use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
88use tap::TapOptional;
89use tracing::{debug, info, instrument, trace, warn};
90
91use super::ExecutionCacheAPI;
92use super::cache_types::Ticket;
93use super::{
94 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
95 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
96 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
97 implement_passthrough_traits,
98 object_locks::ObjectLocks,
99};
100
101#[cfg(test)]
102#[path = "unit_tests/writeback_cache_tests.rs"]
103pub mod writeback_cache_tests;
104
105#[cfg(test)]
106#[path = "unit_tests/notify_read_input_objects_tests.rs"]
107mod notify_read_input_objects_tests;
108
109#[derive(Clone, PartialEq, Eq)]
110enum ObjectEntry {
111 Object(Object),
112 Deleted,
113 Wrapped,
114}
115
116impl ObjectEntry {
117 #[cfg(test)]
118 fn unwrap_object(&self) -> &Object {
119 match self {
120 ObjectEntry::Object(o) => o,
121 _ => panic!("unwrap_object called on non-Object"),
122 }
123 }
124
125 fn is_tombstone(&self) -> bool {
126 match self {
127 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
128 ObjectEntry::Object(_) => false,
129 }
130 }
131}
132
133impl std::fmt::Debug for ObjectEntry {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 match self {
136 ObjectEntry::Object(o) => {
137 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
138 }
139 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
140 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
141 }
142 }
143}
144
145impl From<Object> for ObjectEntry {
146 fn from(object: Object) -> Self {
147 ObjectEntry::Object(object)
148 }
149}
150
151impl From<ObjectOrTombstone> for ObjectEntry {
152 fn from(object: ObjectOrTombstone) -> Self {
153 match object {
154 ObjectOrTombstone::Object(o) => o.into(),
155 ObjectOrTombstone::Tombstone(obj_ref) => {
156 if obj_ref.2.is_deleted() {
157 ObjectEntry::Deleted
158 } else if obj_ref.2.is_wrapped() {
159 ObjectEntry::Wrapped
160 } else {
161 panic!("tombstone digest must either be deleted or wrapped");
162 }
163 }
164 }
165 }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169enum LatestObjectCacheEntry {
170 Object(SequenceNumber, ObjectEntry),
171 NonExistent,
172}
173
174impl LatestObjectCacheEntry {
175 #[cfg(test)]
176 fn version(&self) -> Option<SequenceNumber> {
177 match self {
178 LatestObjectCacheEntry::Object(version, _) => Some(*version),
179 LatestObjectCacheEntry::NonExistent => None,
180 }
181 }
182
183 fn is_alive(&self) -> bool {
184 match self {
185 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
186 LatestObjectCacheEntry::NonExistent => false,
187 }
188 }
189}
190
191impl IsNewer for LatestObjectCacheEntry {
192 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
193 match (self, other) {
194 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
195 v1 > v2
196 }
197 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
198 _ => false,
199 }
200 }
201}
202
203type MarkerKey = (EpochId, FullObjectID);
204
205struct UncommittedData {
208 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
221
222 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
227
228 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
229
230 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
231
232 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
233
234 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
235
236 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
239
240 fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
253
254 total_transaction_inserts: AtomicU64,
255 total_transaction_commits: AtomicU64,
256}
257
258impl UncommittedData {
259 fn new(config: &ExecutionCacheConfig) -> Self {
260 Self {
261 objects: DashMap::with_shard_amount(2048),
262 markers: DashMap::with_shard_amount(2048),
263 transaction_effects: DashMap::with_shard_amount(2048),
264 executed_effects_digests: DashMap::with_shard_amount(2048),
265 pending_transaction_writes: DashMap::with_shard_amount(2048),
266 fastpath_transaction_outputs: MokaCache::builder(8)
267 .max_capacity(randomize_cache_capacity_in_tests(
268 config.fastpath_transaction_outputs_cache_size(),
269 ))
270 .build(),
271 transaction_events: DashMap::with_shard_amount(2048),
272 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
273 total_transaction_inserts: AtomicU64::new(0),
274 total_transaction_commits: AtomicU64::new(0),
275 }
276 }
277
278 fn clear(&self) {
279 self.objects.clear();
280 self.markers.clear();
281 self.transaction_effects.clear();
282 self.executed_effects_digests.clear();
283 self.pending_transaction_writes.clear();
284 self.fastpath_transaction_outputs.invalidate_all();
285 self.transaction_events.clear();
286 self.unchanged_loaded_runtime_objects.clear();
287 self.total_transaction_inserts
288 .store(0, std::sync::atomic::Ordering::Relaxed);
289 self.total_transaction_commits
290 .store(0, std::sync::atomic::Ordering::Relaxed);
291 }
292
293 fn is_empty(&self) -> bool {
294 let empty = self.pending_transaction_writes.is_empty();
295 if empty && cfg!(debug_assertions) {
296 assert!(
297 self.objects.is_empty()
298 && self.markers.is_empty()
299 && self.transaction_effects.is_empty()
300 && self.executed_effects_digests.is_empty()
301 && self.transaction_events.is_empty()
302 && self.unchanged_loaded_runtime_objects.is_empty()
303 && self
304 .total_transaction_inserts
305 .load(std::sync::atomic::Ordering::Relaxed)
306 == self
307 .total_transaction_commits
308 .load(std::sync::atomic::Ordering::Relaxed),
309 );
310 }
311 empty
312 }
313}
314
315type PointCacheItem<T> = Option<T>;
317
318impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
321 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
322 match (self, other) {
323 (Some(_), None) => true,
324
325 (Some(a), Some(b)) => {
326 debug_assert_eq!(a, b);
328 false
329 }
330
331 _ => false,
332 }
333 }
334}
335
336struct CachedCommittedData {
338 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
340
341 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
343
344 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
345
346 transaction_effects:
347 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
348
349 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
350
351 executed_effects_digests:
352 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
353
354 transaction_executed_in_last_epoch:
355 MonotonicCache<(EpochId, TransactionDigest), PointCacheItem<()>>,
356
357 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
360}
361
362impl CachedCommittedData {
363 fn new(config: &ExecutionCacheConfig) -> Self {
364 let object_cache = MokaCache::builder(8)
365 .max_capacity(randomize_cache_capacity_in_tests(
366 config.object_cache_size(),
367 ))
368 .build();
369 let marker_cache = MokaCache::builder(8)
370 .max_capacity(randomize_cache_capacity_in_tests(
371 config.marker_cache_size(),
372 ))
373 .build();
374
375 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
376 config.transaction_cache_size(),
377 ));
378 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
379 config.effect_cache_size(),
380 ));
381 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
382 config.events_cache_size(),
383 ));
384 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
385 config.executed_effect_cache_size(),
386 ));
387
388 let transaction_objects = MokaCache::builder(8)
389 .max_capacity(randomize_cache_capacity_in_tests(
390 config.transaction_objects_cache_size(),
391 ))
392 .build();
393
394 let transaction_executed_in_last_epoch = MonotonicCache::new(
395 randomize_cache_capacity_in_tests(config.executed_effect_cache_size()),
396 );
397
398 Self {
399 object_cache,
400 marker_cache,
401 transactions,
402 transaction_effects,
403 transaction_events,
404 executed_effects_digests,
405 transaction_executed_in_last_epoch,
406 _transaction_objects: transaction_objects,
407 }
408 }
409
410 fn clear_and_assert_empty(&self) {
411 self.object_cache.invalidate_all();
412 self.marker_cache.invalidate_all();
413 self.transactions.invalidate_all();
414 self.transaction_effects.invalidate_all();
415 self.transaction_events.invalidate_all();
416 self.executed_effects_digests.invalidate_all();
417 self.transaction_executed_in_last_epoch.invalidate_all();
418 self._transaction_objects.invalidate_all();
419
420 assert_empty(&self.object_cache);
421 assert_empty(&self.marker_cache);
422 assert!(self.transactions.is_empty());
423 assert!(self.transaction_effects.is_empty());
424 assert!(self.transaction_events.is_empty());
425 assert!(self.executed_effects_digests.is_empty());
426 assert!(self.transaction_executed_in_last_epoch.is_empty());
427 assert_empty(&self._transaction_objects);
428 }
429}
430
431fn assert_empty<K, V>(cache: &MokaCache<K, V>)
432where
433 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
434 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
435{
436 if cache.iter().next().is_some() {
437 panic!("cache should be empty");
438 }
439}
440
441pub struct WritebackCache {
442 dirty: UncommittedData,
443 cached: CachedCommittedData,
444
445 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
452
453 packages: MokaCache<ObjectID, PackageObject>,
464
465 object_locks: ObjectLocks,
466
467 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
468 object_notify_read: NotifyRead<InputKey, ()>,
469 fastpath_transaction_outputs_notify_read:
470 NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
471
472 store: Arc<AuthorityStore>,
473 backpressure_threshold: u64,
474 backpressure_manager: Arc<BackpressureManager>,
475 metrics: Arc<ExecutionCacheMetrics>,
476}
477
478macro_rules! check_cache_entry_by_version {
479 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
480 $self.metrics.record_cache_request($table, $level);
481 if let Some(cache) = $cache {
482 if let Some(entry) = cache.get(&$version) {
483 $self.metrics.record_cache_hit($table, $level);
484 return CacheResult::Hit(entry.clone());
485 }
486
487 if let Some(least_version) = cache.get_least() {
488 if least_version.0 < $version {
489 $self.metrics.record_cache_negative_hit($table, $level);
492 return CacheResult::NegativeHit;
493 }
494 }
495 }
496 $self.metrics.record_cache_miss($table, $level);
497 };
498}
499
500macro_rules! check_cache_entry_by_latest {
501 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
502 $self.metrics.record_cache_request($table, $level);
503 if let Some(cache) = $cache {
504 if let Some((version, entry)) = cache.get_highest() {
505 $self.metrics.record_cache_hit($table, $level);
506 return CacheResult::Hit((*version, entry.clone()));
507 } else {
508 panic!("empty CachedVersionMap should have been removed");
509 }
510 }
511 $self.metrics.record_cache_miss($table, $level);
512 };
513}
514
515impl WritebackCache {
516 pub fn new(
517 config: &ExecutionCacheConfig,
518 store: Arc<AuthorityStore>,
519 metrics: Arc<ExecutionCacheMetrics>,
520 backpressure_manager: Arc<BackpressureManager>,
521 ) -> Self {
522 let packages = MokaCache::builder(8)
523 .max_capacity(randomize_cache_capacity_in_tests(
524 config.package_cache_size(),
525 ))
526 .build();
527 Self {
528 dirty: UncommittedData::new(config),
529 cached: CachedCommittedData::new(config),
530 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
531 config.object_by_id_cache_size(),
532 )),
533 packages,
534 object_locks: ObjectLocks::new(),
535 executed_effects_digests_notify_read: NotifyRead::new(),
536 object_notify_read: NotifyRead::new(),
537 fastpath_transaction_outputs_notify_read: NotifyRead::new(),
538 store,
539 backpressure_manager,
540 backpressure_threshold: config.backpressure_threshold(),
541 metrics,
542 }
543 }
544
545 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
546 Self::new(
547 &Default::default(),
548 store,
549 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
550 BackpressureManager::new_for_tests(),
551 )
552 }
553
554 #[cfg(test)]
555 pub fn reset_for_test(&mut self) {
556 let mut new = Self::new(
557 &Default::default(),
558 self.store.clone(),
559 self.metrics.clone(),
560 self.backpressure_manager.clone(),
561 );
562 std::mem::swap(self, &mut new);
563 }
564
565 pub fn evict_executed_effects_from_cache_for_testing(&self, tx_digest: &TransactionDigest) {
566 self.cached.executed_effects_digests.invalidate(tx_digest);
567 self.cached.transaction_events.invalidate(tx_digest);
568 self.cached.transactions.invalidate(tx_digest);
569 }
570
571 fn write_object_entry(
572 &self,
573 object_id: &ObjectID,
574 version: SequenceNumber,
575 object: ObjectEntry,
576 ) {
577 trace!(?object_id, ?version, ?object, "inserting object entry");
578 self.metrics.record_cache_write("object");
579
580 let mut entry = self.dirty.objects.entry(*object_id).or_default();
600
601 self.object_by_id_cache
602 .insert(
603 object_id,
604 LatestObjectCacheEntry::Object(version, object.clone()),
605 Ticket::Write,
606 )
607 .ok();
610
611 entry.insert(version, object.clone());
612
613 if let ObjectEntry::Object(object) = &object {
614 if object.is_package() {
615 self.object_notify_read
616 .notify(&InputKey::Package { id: *object_id }, &());
617 } else if !object.is_child_object() {
618 self.object_notify_read.notify(
619 &InputKey::VersionedObject {
620 id: object.full_id(),
621 version: object.version(),
622 },
623 &(),
624 );
625 }
626 }
627 }
628
629 fn write_marker_value(
630 &self,
631 epoch_id: EpochId,
632 object_key: FullObjectKey,
633 marker_value: MarkerValue,
634 ) {
635 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
636 self.metrics.record_cache_write("marker");
637 self.dirty
638 .markers
639 .entry((epoch_id, object_key.id()))
640 .or_default()
641 .value_mut()
642 .insert(object_key.version(), marker_value);
643 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
648 self.object_notify_read.notify(
649 &InputKey::VersionedObject {
650 id: object_key.id(),
651 version: object_key.version(),
652 },
653 &(),
654 );
655 }
656 }
657
658 fn with_locked_cache_entries<K, V, R>(
662 dirty_map: &DashMap<K, CachedVersionMap<V>>,
663 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
664 key: &K,
665 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
666 ) -> R
667 where
668 K: Copy + Eq + Hash + Send + Sync + 'static,
669 V: Send + Sync + 'static,
670 {
671 let dirty_entry = dirty_map.entry(*key);
672 let dirty_entry = match &dirty_entry {
673 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
674 DashMapEntry::Vacant(_) => None,
675 };
676
677 let cached_entry = cached_map.get(key);
678 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
679 let cached_entry = cached_lock.as_deref();
680
681 cb(dirty_entry, cached_entry)
682 }
683
684 fn get_object_entry_by_key_cache_only(
687 &self,
688 object_id: &ObjectID,
689 version: SequenceNumber,
690 ) -> CacheResult<ObjectEntry> {
691 Self::with_locked_cache_entries(
692 &self.dirty.objects,
693 &self.cached.object_cache,
694 object_id,
695 |dirty_entry, cached_entry| {
696 check_cache_entry_by_version!(
697 self,
698 "object_by_version",
699 "uncommitted",
700 dirty_entry,
701 version
702 );
703 check_cache_entry_by_version!(
704 self,
705 "object_by_version",
706 "committed",
707 cached_entry,
708 version
709 );
710 CacheResult::Miss
711 },
712 )
713 }
714
715 fn get_object_by_key_cache_only(
716 &self,
717 object_id: &ObjectID,
718 version: SequenceNumber,
719 ) -> CacheResult<Object> {
720 match self.get_object_entry_by_key_cache_only(object_id, version) {
721 CacheResult::Hit(entry) => match entry {
722 ObjectEntry::Object(object) => CacheResult::Hit(object),
723 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
724 },
725 CacheResult::Miss => CacheResult::Miss,
726 CacheResult::NegativeHit => CacheResult::NegativeHit,
727 }
728 }
729
730 fn get_object_entry_by_id_cache_only(
731 &self,
732 request_type: &'static str,
733 object_id: &ObjectID,
734 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
735 self.metrics
736 .record_cache_request(request_type, "object_by_id");
737 let entry = self.object_by_id_cache.get(object_id);
738
739 if cfg!(debug_assertions)
740 && let Some(entry) = &entry
741 {
742 let highest: Option<ObjectEntry> = self
744 .dirty
745 .objects
746 .get(object_id)
747 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
748 .or_else(|| {
749 let obj: Option<ObjectEntry> = self
750 .store
751 .get_latest_object_or_tombstone(*object_id)
752 .unwrap()
753 .map(|(_, o)| o.into());
754 obj
755 });
756
757 let cache_entry = match &*entry.lock() {
758 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
759 LatestObjectCacheEntry::NonExistent => None,
760 };
761
762 let tombstone_possibly_pruned = highest.is_none()
764 && cache_entry
765 .as_ref()
766 .map(|e| e.is_tombstone())
767 .unwrap_or(false);
768
769 if highest != cache_entry && !tombstone_possibly_pruned {
770 tracing::error!(
771 ?highest,
772 ?cache_entry,
773 ?tombstone_possibly_pruned,
774 "object_by_id cache is incoherent for {:?}",
775 object_id
776 );
777 panic!("object_by_id cache is incoherent for {:?}", object_id);
778 }
779 }
780
781 if let Some(entry) = entry {
782 let entry = entry.lock();
783 match &*entry {
784 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
785 self.metrics.record_cache_hit(request_type, "object_by_id");
786 return CacheResult::Hit((*latest_version, latest_object.clone()));
787 }
788 LatestObjectCacheEntry::NonExistent => {
789 self.metrics
790 .record_cache_negative_hit(request_type, "object_by_id");
791 return CacheResult::NegativeHit;
792 }
793 }
794 } else {
795 self.metrics.record_cache_miss(request_type, "object_by_id");
796 }
797
798 Self::with_locked_cache_entries(
799 &self.dirty.objects,
800 &self.cached.object_cache,
801 object_id,
802 |dirty_entry, cached_entry| {
803 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
804 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
805 CacheResult::Miss
806 },
807 )
808 }
809
810 fn get_object_by_id_cache_only(
811 &self,
812 request_type: &'static str,
813 object_id: &ObjectID,
814 ) -> CacheResult<(SequenceNumber, Object)> {
815 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
816 CacheResult::Hit((version, entry)) => match entry {
817 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
818 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
819 },
820 CacheResult::NegativeHit => CacheResult::NegativeHit,
821 CacheResult::Miss => CacheResult::Miss,
822 }
823 }
824
825 fn get_marker_value_cache_only(
826 &self,
827 object_key: FullObjectKey,
828 epoch_id: EpochId,
829 ) -> CacheResult<MarkerValue> {
830 Self::with_locked_cache_entries(
831 &self.dirty.markers,
832 &self.cached.marker_cache,
833 &(epoch_id, object_key.id()),
834 |dirty_entry, cached_entry| {
835 check_cache_entry_by_version!(
836 self,
837 "marker_by_version",
838 "uncommitted",
839 dirty_entry,
840 object_key.version()
841 );
842 check_cache_entry_by_version!(
843 self,
844 "marker_by_version",
845 "committed",
846 cached_entry,
847 object_key.version()
848 );
849 CacheResult::Miss
850 },
851 )
852 }
853
854 fn get_latest_marker_value_cache_only(
855 &self,
856 object_id: FullObjectID,
857 epoch_id: EpochId,
858 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
859 Self::with_locked_cache_entries(
860 &self.dirty.markers,
861 &self.cached.marker_cache,
862 &(epoch_id, object_id),
863 |dirty_entry, cached_entry| {
864 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
865 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
866 CacheResult::Miss
867 },
868 )
869 }
870
871 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
872 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
873 match self.get_object_entry_by_id_cache_only(request_type, id) {
874 CacheResult::Hit((_, entry)) => match entry {
875 ObjectEntry::Object(object) => Some(object),
876 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
877 },
878 CacheResult::NegativeHit => None,
879 CacheResult::Miss => {
880 let obj = self
881 .store
882 .get_latest_object_or_tombstone(*id)
883 .expect("db error");
884 match obj {
885 Some((key, obj)) => {
886 self.cache_latest_object_by_id(
887 id,
888 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
889 ticket,
890 );
891 match obj {
892 ObjectOrTombstone::Object(object) => Some(object),
893 ObjectOrTombstone::Tombstone(_) => None,
894 }
895 }
896 None => {
897 self.cache_object_not_found(id, ticket);
898 None
899 }
900 }
901 }
902 }
903 }
904
905 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
906 self.metrics.record_cache_request(request_type, "db");
907 &self.store
908 }
909
910 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
911 self.metrics
912 .record_cache_multi_request(request_type, "db", count);
913 &self.store
914 }
915
916 #[instrument(level = "debug", skip_all)]
917 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
918 let tx_digest = *tx_outputs.transaction.digest();
919 trace!(?tx_digest, "writing transaction outputs to cache");
920
921 assert!(
922 !self.transaction_executed_in_last_epoch(&tx_digest, epoch_id),
923 "Transaction {:?} was already executed in epoch {}",
924 tx_digest,
925 epoch_id.saturating_sub(1)
926 );
927
928 self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
929
930 let TransactionOutputs {
931 transaction,
932 effects,
933 markers,
934 written,
935 deleted,
936 wrapped,
937 events,
938 unchanged_loaded_runtime_objects,
939 ..
940 } = &*tx_outputs;
941
942 for ObjectKey(id, version) in deleted.iter() {
947 self.write_object_entry(id, *version, ObjectEntry::Deleted);
948 }
949
950 for ObjectKey(id, version) in wrapped.iter() {
951 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
952 }
953
954 for (object_key, marker_value) in markers.iter() {
956 self.write_marker_value(epoch_id, *object_key, *marker_value);
957 }
958
959 for (object_id, object) in written.iter() {
962 if object.is_child_object() {
963 self.write_object_entry(object_id, object.version(), object.clone().into());
964 }
965 }
966 for (object_id, object) in written.iter() {
967 if !object.is_child_object() {
968 self.write_object_entry(object_id, object.version(), object.clone().into());
969 if object.is_package() {
970 debug!("caching package: {:?}", object.compute_object_reference());
971 self.packages
972 .insert(*object_id, PackageObject::new(object.clone()));
973 }
974 }
975 }
976
977 let tx_digest = *transaction.digest();
978 debug!(
979 ?tx_digest,
980 "Writing transaction output objects to cache: {:?}",
981 written
982 .values()
983 .map(|o| (o.id(), o.version()))
984 .collect::<Vec<_>>(),
985 );
986 let effects_digest = effects.digest();
987
988 self.metrics.record_cache_write("transaction_block");
989 self.dirty
990 .pending_transaction_writes
991 .insert(tx_digest, tx_outputs.clone());
992
993 self.metrics.record_cache_write("transaction_effects");
996 self.dirty
997 .transaction_effects
998 .insert(effects_digest, effects.clone());
999
1000 self.metrics.record_cache_write("transaction_events");
1004 self.dirty
1005 .transaction_events
1006 .insert(tx_digest, events.clone());
1007
1008 self.metrics
1009 .record_cache_write("unchanged_loaded_runtime_objects");
1010 self.dirty
1011 .unchanged_loaded_runtime_objects
1012 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
1013
1014 self.metrics.record_cache_write("executed_effects_digests");
1015 self.dirty
1016 .executed_effects_digests
1017 .insert(tx_digest, effects_digest);
1018
1019 self.executed_effects_digests_notify_read
1020 .notify(&tx_digest, &effects_digest);
1021
1022 self.metrics
1023 .pending_notify_read
1024 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
1025
1026 let prev = self
1027 .dirty
1028 .total_transaction_inserts
1029 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1030
1031 let pending_count = (prev + 1).saturating_sub(
1032 self.dirty
1033 .total_transaction_commits
1034 .load(std::sync::atomic::Ordering::Relaxed),
1035 );
1036
1037 self.set_backpressure(pending_count);
1038 }
1039
1040 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1041 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1042 let mut all_outputs = Vec::with_capacity(digests.len());
1043 for tx in digests {
1044 let Some(outputs) = self
1045 .dirty
1046 .pending_transaction_writes
1047 .get(tx)
1048 .map(|o| o.clone())
1049 else {
1050 warn!("Attempt to commit unknown transaction {:?}", tx);
1056 continue;
1057 };
1058 all_outputs.push(outputs);
1059 }
1060
1061 let batch = self
1062 .store
1063 .build_db_batch(epoch, &all_outputs)
1064 .expect("db error");
1065 (all_outputs, batch)
1066 }
1067
1068 #[instrument(level = "debug", skip_all)]
1070 fn commit_transaction_outputs(
1071 &self,
1072 epoch: EpochId,
1073 (all_outputs, db_batch): Batch,
1074 digests: &[TransactionDigest],
1075 ) {
1076 let _metrics_guard =
1077 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1078 fail_point!("writeback-cache-commit");
1079 trace!(?digests);
1080
1081 db_batch.write().expect("db error");
1085
1086 let _metrics_guard =
1087 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1088 for outputs in all_outputs.iter() {
1089 let tx_digest = outputs.transaction.digest();
1090 assert!(
1091 self.dirty
1092 .pending_transaction_writes
1093 .remove(tx_digest)
1094 .is_some()
1095 );
1096 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1097 }
1098
1099 let num_outputs = all_outputs.len() as u64;
1100 let num_commits = self
1101 .dirty
1102 .total_transaction_commits
1103 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1104 + num_outputs;
1105
1106 let pending_count = self
1107 .dirty
1108 .total_transaction_inserts
1109 .load(std::sync::atomic::Ordering::Relaxed)
1110 .saturating_sub(num_commits);
1111
1112 self.set_backpressure(pending_count);
1113 }
1114
1115 fn approximate_pending_transaction_count(&self) -> u64 {
1116 let num_commits = self
1117 .dirty
1118 .total_transaction_commits
1119 .load(std::sync::atomic::Ordering::Relaxed);
1120
1121 self.dirty
1122 .total_transaction_inserts
1123 .load(std::sync::atomic::Ordering::Relaxed)
1124 .saturating_sub(num_commits)
1125 }
1126
1127 fn set_backpressure(&self, pending_count: u64) {
1128 let backpressure = pending_count > self.backpressure_threshold;
1129 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1130 if backpressure_changed {
1131 self.metrics.backpressure_toggles.inc();
1132 }
1133 self.metrics
1134 .backpressure_status
1135 .set(if backpressure { 1 } else { 0 });
1136 }
1137
1138 fn flush_transactions_from_dirty_to_cached(
1139 &self,
1140 epoch: EpochId,
1141 tx_digest: TransactionDigest,
1142 outputs: &TransactionOutputs,
1143 ) {
1144 let TransactionOutputs {
1147 transaction,
1148 effects,
1149 markers,
1150 written,
1151 deleted,
1152 wrapped,
1153 events,
1154 ..
1155 } = outputs;
1156
1157 let effects_digest = effects.digest();
1158
1159 self.cached
1162 .transactions
1163 .insert(
1164 &tx_digest,
1165 PointCacheItem::Some(transaction.clone()),
1166 Ticket::Write,
1167 )
1168 .ok();
1169 self.cached
1170 .transaction_effects
1171 .insert(
1172 &effects_digest,
1173 PointCacheItem::Some(effects.clone().into()),
1174 Ticket::Write,
1175 )
1176 .ok();
1177 self.cached
1178 .executed_effects_digests
1179 .insert(
1180 &tx_digest,
1181 PointCacheItem::Some(effects_digest),
1182 Ticket::Write,
1183 )
1184 .ok();
1185 self.cached
1186 .transaction_events
1187 .insert(
1188 &tx_digest,
1189 PointCacheItem::Some(events.clone().into()),
1190 Ticket::Write,
1191 )
1192 .ok();
1193
1194 self.dirty
1195 .transaction_effects
1196 .remove(&effects_digest)
1197 .expect("effects must exist");
1198
1199 self.dirty
1200 .transaction_events
1201 .remove(&tx_digest)
1202 .expect("events must exist");
1203
1204 self.dirty
1205 .unchanged_loaded_runtime_objects
1206 .remove(&tx_digest)
1207 .expect("unchanged_loaded_runtime_objects must exist");
1208
1209 self.dirty
1210 .executed_effects_digests
1211 .remove(&tx_digest)
1212 .expect("executed effects must exist");
1213
1214 for (object_key, marker_value) in markers.iter() {
1216 Self::move_version_from_dirty_to_cache(
1217 &self.dirty.markers,
1218 &self.cached.marker_cache,
1219 (epoch, object_key.id()),
1220 object_key.version(),
1221 marker_value,
1222 );
1223 }
1224
1225 for (object_id, object) in written.iter() {
1226 Self::move_version_from_dirty_to_cache(
1227 &self.dirty.objects,
1228 &self.cached.object_cache,
1229 *object_id,
1230 object.version(),
1231 &ObjectEntry::Object(object.clone()),
1232 );
1233 }
1234
1235 for ObjectKey(object_id, version) in deleted.iter() {
1236 Self::move_version_from_dirty_to_cache(
1237 &self.dirty.objects,
1238 &self.cached.object_cache,
1239 *object_id,
1240 *version,
1241 &ObjectEntry::Deleted,
1242 );
1243 }
1244
1245 for ObjectKey(object_id, version) in wrapped.iter() {
1246 Self::move_version_from_dirty_to_cache(
1247 &self.dirty.objects,
1248 &self.cached.object_cache,
1249 *object_id,
1250 *version,
1251 &ObjectEntry::Wrapped,
1252 );
1253 }
1254 }
1255
1256 fn move_version_from_dirty_to_cache<K, V>(
1259 dirty: &DashMap<K, CachedVersionMap<V>>,
1260 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1261 key: K,
1262 version: SequenceNumber,
1263 value: &V,
1264 ) where
1265 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1266 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1267 {
1268 static MAX_VERSIONS: usize = 3;
1269
1270 let dirty_entry = dirty.entry(key);
1273 let cache_entry = cache.entry(key).or_default();
1274 let mut cache_map = cache_entry.value().lock();
1275
1276 cache_map.insert(version, value.clone());
1278 cache_map.truncate_to(MAX_VERSIONS);
1280
1281 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1282 panic!("dirty map must exist");
1283 };
1284
1285 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1286
1287 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1288
1289 if occupied_dirty_entry.get().is_empty() {
1291 occupied_dirty_entry.remove();
1292 }
1293 }
1294
1295 fn cache_latest_object_by_id(
1297 &self,
1298 object_id: &ObjectID,
1299 object: LatestObjectCacheEntry,
1300 ticket: Ticket,
1301 ) {
1302 trace!("caching object by id: {:?} {:?}", object_id, object);
1303 if self
1304 .object_by_id_cache
1305 .insert(object_id, object, ticket)
1306 .is_ok()
1307 {
1308 self.metrics.record_cache_write("object_by_id");
1309 } else {
1310 trace!("discarded cache write due to expired ticket");
1311 self.metrics.record_ticket_expiry();
1312 }
1313 }
1314
1315 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1316 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1317 }
1318
1319 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1320 info!("clearing state at end of epoch");
1321
1322 for r in self.dirty.pending_transaction_writes.iter() {
1325 let outputs = r.value();
1326 if !outputs
1327 .transaction
1328 .transaction_data()
1329 .shared_input_objects()
1330 .is_empty()
1331 {
1332 debug_fatal!("transaction must be single writer");
1333 }
1334 info!(
1335 "clearing state for transaction {:?}",
1336 outputs.transaction.digest()
1337 );
1338 for (object_id, object) in outputs.written.iter() {
1339 if object.is_package() {
1340 info!("removing non-finalized package from cache: {:?}", object_id);
1341 self.packages.invalidate(object_id);
1342 }
1343 self.object_by_id_cache.invalidate(object_id);
1344 self.cached.object_cache.invalidate(object_id);
1345 }
1346
1347 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1348 self.object_by_id_cache.invalidate(object_id);
1349 self.cached.object_cache.invalidate(object_id);
1350 }
1351 }
1352
1353 self.dirty.clear();
1354
1355 info!("clearing old transaction locks");
1356 self.object_locks.clear();
1357 info!("clearing object per epoch marker table");
1358 self.store
1359 .clear_object_per_epoch_marker_table(execution_guard)
1360 .expect("db error");
1361 }
1362
1363 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1364 self.store
1365 .bulk_insert_genesis_objects(objects)
1366 .expect("db error");
1367 for obj in objects {
1368 self.cached.object_cache.invalidate(&obj.id());
1369 self.object_by_id_cache.invalidate(&obj.id());
1370 }
1371 }
1372
1373 fn insert_genesis_object_impl(&self, object: Object) {
1374 self.object_by_id_cache.invalidate(&object.id());
1375 self.cached.object_cache.invalidate(&object.id());
1376 self.store.insert_genesis_object(object).expect("db error");
1377 }
1378
1379 pub fn clear_caches_and_assert_empty(&self) {
1380 info!("clearing caches");
1381 self.cached.clear_and_assert_empty();
1382 self.object_by_id_cache.invalidate_all();
1383 assert!(&self.object_by_id_cache.is_empty());
1384 self.packages.invalidate_all();
1385 assert_empty(&self.packages);
1386 }
1387}
1388
1389impl AccountFundsRead for WritebackCache {
1390 fn get_latest_account_amount(&self, account_id: &AccumulatorObjId) -> (u128, SequenceNumber) {
1391 let mut pre_root_version =
1392 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1393 .unwrap()
1394 .version();
1395 let mut loop_iter = 0;
1396 loop {
1397 let account_obj = ObjectCacheRead::get_object(self, account_id.inner());
1398 if let Some(account_obj) = account_obj {
1399 let (_, AccumulatorValue::U128(value)) =
1400 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1401 return (value.value, account_obj.version());
1402 }
1403 let post_root_version =
1404 ObjectCacheRead::get_object(self, &SUI_ACCUMULATOR_ROOT_OBJECT_ID)
1405 .unwrap()
1406 .version();
1407 if pre_root_version == post_root_version {
1408 return (0, pre_root_version);
1409 }
1410 debug!(
1411 "Root version changed from {} to {} while reading account amount, retrying",
1412 pre_root_version, post_root_version
1413 );
1414 pre_root_version = post_root_version;
1415 loop_iter += 1;
1416 if loop_iter >= 3 {
1417 debug_fatal!("Unable to get a stable version after 3 iterations");
1418 }
1419 }
1420 }
1421
1422 fn get_account_amount_at_version(
1423 &self,
1424 account_id: &AccumulatorObjId,
1425 version: SequenceNumber,
1426 ) -> u128 {
1427 let account_obj = self.find_object_lt_or_eq_version(*account_id.inner(), version);
1428 if let Some(account_obj) = account_obj {
1429 let (_, AccumulatorValue::U128(value)) =
1430 account_obj.data.try_as_move().unwrap().try_into().unwrap();
1431 value.value
1432 } else {
1433 0
1434 }
1435 }
1436}
1437
1438impl ExecutionCacheAPI for WritebackCache {}
1439
1440impl ExecutionCacheCommit for WritebackCache {
1441 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1442 self.build_db_batch(epoch, digests)
1443 }
1444
1445 fn commit_transaction_outputs(
1446 &self,
1447 epoch: EpochId,
1448 batch: Batch,
1449 digests: &[TransactionDigest],
1450 ) {
1451 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1452 }
1453
1454 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1455 self.store.persist_transaction(tx).expect("db error");
1456 }
1457
1458 fn approximate_pending_transaction_count(&self) -> u64 {
1459 WritebackCache::approximate_pending_transaction_count(self)
1460 }
1461}
1462
1463impl ObjectCacheRead for WritebackCache {
1464 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1465 self.metrics
1466 .record_cache_request("package", "package_cache");
1467 if let Some(p) = self.packages.get(package_id) {
1468 if cfg!(debug_assertions) {
1469 let canonical_package = self
1470 .dirty
1471 .objects
1472 .get(package_id)
1473 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1474 Some(ObjectEntry::Object(object)) => Some(object),
1475 _ => None,
1476 })
1477 .or_else(|| self.store.get_object(package_id));
1478
1479 if let Some(canonical_package) = canonical_package {
1480 assert_eq!(
1481 canonical_package.digest(),
1482 p.object().digest(),
1483 "Package object cache is inconsistent for package {:?}",
1484 package_id
1485 );
1486 }
1487 }
1488 self.metrics.record_cache_hit("package", "package_cache");
1489 return Ok(Some(p));
1490 } else {
1491 self.metrics.record_cache_miss("package", "package_cache");
1492 }
1493
1494 if let Some(p) = self.get_object_impl("package", package_id) {
1498 if p.is_package() {
1499 let p = PackageObject::new(p);
1500 tracing::trace!(
1501 "caching package: {:?}",
1502 p.object().compute_object_reference()
1503 );
1504 self.metrics.record_cache_write("package");
1505 self.packages.insert(*package_id, p.clone());
1506 Ok(Some(p))
1507 } else {
1508 Err(SuiErrorKind::UserInputError {
1509 error: UserInputError::MoveObjectAsPackage {
1510 object_id: *package_id,
1511 },
1512 }
1513 .into())
1514 }
1515 } else {
1516 Ok(None)
1517 }
1518 }
1519
1520 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1521 }
1524
1525 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1528 self.get_object_impl("object_latest", id)
1529 }
1530
1531 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1532 match self.get_object_by_key_cache_only(object_id, version) {
1533 CacheResult::Hit(object) => Some(object),
1534 CacheResult::NegativeHit => None,
1535 CacheResult::Miss => self
1536 .record_db_get("object_by_version")
1537 .get_object_by_key(object_id, version),
1538 }
1539 }
1540
1541 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1542 do_fallback_lookup(
1543 object_keys,
1544 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1545 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1546 CacheResult::NegativeHit => CacheResult::NegativeHit,
1547 CacheResult::Miss => CacheResult::Miss,
1548 },
1549 |remaining| {
1550 self.record_db_multi_get("object_by_version", remaining.len())
1551 .multi_get_objects_by_key(remaining)
1552 .expect("db error")
1553 },
1554 )
1555 }
1556
1557 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1558 match self.get_object_by_key_cache_only(object_id, version) {
1559 CacheResult::Hit(_) => true,
1560 CacheResult::NegativeHit => false,
1561 CacheResult::Miss => self
1562 .record_db_get("object_by_version")
1563 .object_exists_by_key(object_id, version)
1564 .expect("db error"),
1565 }
1566 }
1567
1568 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1569 do_fallback_lookup(
1570 object_keys,
1571 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1572 CacheResult::Hit(_) => CacheResult::Hit(true),
1573 CacheResult::NegativeHit => CacheResult::Hit(false),
1574 CacheResult::Miss => CacheResult::Miss,
1575 },
1576 |remaining| {
1577 self.record_db_multi_get("object_by_version", remaining.len())
1578 .multi_object_exists_by_key(remaining)
1579 .expect("db error")
1580 },
1581 )
1582 }
1583
1584 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1585 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1586 CacheResult::Hit((version, entry)) => Some(match entry {
1587 ObjectEntry::Object(object) => object.compute_object_reference(),
1588 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1589 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1590 }),
1591 CacheResult::NegativeHit => None,
1592 CacheResult::Miss => self
1593 .record_db_get("latest_objref_or_tombstone")
1594 .get_latest_object_ref_or_tombstone(object_id)
1595 .expect("db error"),
1596 }
1597 }
1598
1599 fn get_latest_object_or_tombstone(
1600 &self,
1601 object_id: ObjectID,
1602 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1603 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1604 CacheResult::Hit((version, entry)) => {
1605 let key = ObjectKey(object_id, version);
1606 Some(match entry {
1607 ObjectEntry::Object(object) => (key, object.into()),
1608 ObjectEntry::Deleted => (
1609 key,
1610 ObjectOrTombstone::Tombstone((
1611 object_id,
1612 version,
1613 ObjectDigest::OBJECT_DIGEST_DELETED,
1614 )),
1615 ),
1616 ObjectEntry::Wrapped => (
1617 key,
1618 ObjectOrTombstone::Tombstone((
1619 object_id,
1620 version,
1621 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1622 )),
1623 ),
1624 })
1625 }
1626 CacheResult::NegativeHit => None,
1627 CacheResult::Miss => self
1628 .record_db_get("latest_object_or_tombstone")
1629 .get_latest_object_or_tombstone(object_id)
1630 .expect("db error"),
1631 }
1632 }
1633
1634 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1635 keys.iter()
1636 .map(|key| {
1637 if key.is_cancelled() {
1638 true
1639 } else {
1640 match key {
1641 InputKey::VersionedObject { id, version } => {
1642 matches!(
1643 self.get_object_by_key_cache_only(&id.id(), *version),
1644 CacheResult::Hit(_)
1645 )
1646 }
1647 InputKey::Package { id } => self.packages.contains_key(id),
1648 }
1649 }
1650 })
1651 .collect()
1652 }
1653
1654 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1655 fn find_object_lt_or_eq_version(
1656 &self,
1657 object_id: ObjectID,
1658 version_bound: SequenceNumber,
1659 ) -> Option<Object> {
1660 macro_rules! check_cache_entry {
1661 ($level: expr, $objects: expr) => {
1662 self.metrics
1663 .record_cache_request("object_lt_or_eq_version", $level);
1664 if let Some(objects) = $objects {
1665 if let Some((_, object)) = objects
1666 .all_versions_lt_or_eq_descending(&version_bound)
1667 .next()
1668 {
1669 if let ObjectEntry::Object(object) = object {
1670 self.metrics
1671 .record_cache_hit("object_lt_or_eq_version", $level);
1672 return Some(object.clone());
1673 } else {
1674 self.metrics
1676 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1677 return None;
1678 }
1679 } else {
1680 self.metrics
1681 .record_cache_miss("object_lt_or_eq_version", $level);
1682 }
1683 }
1684 };
1685 }
1686
1687 self.metrics
1689 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1690 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1691 if let Some(latest) = &latest_cache_entry {
1692 let latest = latest.lock();
1693 match &*latest {
1694 LatestObjectCacheEntry::Object(latest_version, object) => {
1695 if *latest_version <= version_bound {
1696 if let ObjectEntry::Object(object) = object {
1697 self.metrics
1698 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1699 return Some(object.clone());
1700 } else {
1701 self.metrics.record_cache_negative_hit(
1703 "object_lt_or_eq_version",
1704 "object_by_id",
1705 );
1706 return None;
1707 }
1708 }
1709 }
1711 LatestObjectCacheEntry::NonExistent => {
1713 self.metrics
1714 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1715 return None;
1716 }
1717 }
1718 }
1719 self.metrics
1720 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1721
1722 Self::with_locked_cache_entries(
1723 &self.dirty.objects,
1724 &self.cached.object_cache,
1725 &object_id,
1726 |dirty_entry, cached_entry| {
1727 check_cache_entry!("committed", dirty_entry);
1728 check_cache_entry!("uncommitted", cached_entry);
1729
1730 let latest: Option<(SequenceNumber, ObjectEntry)> =
1751 if let Some(dirty_set) = dirty_entry {
1752 dirty_set
1753 .get_highest()
1754 .cloned()
1755 .tap_none(|| panic!("dirty set cannot be empty"))
1756 } else {
1757 self.record_db_get("object_lt_or_eq_version_latest")
1759 .get_latest_object_or_tombstone(object_id)
1760 .expect("db error")
1761 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1762 (version, ObjectEntry::from(obj_or_tombstone))
1763 })
1764 };
1765
1766 if let Some((obj_version, obj_entry)) = latest {
1767 self.cache_latest_object_by_id(
1775 &object_id,
1776 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1777 self.object_by_id_cache.get_ticket_for_read(&object_id),
1780 );
1781
1782 if obj_version <= version_bound {
1783 match obj_entry {
1784 ObjectEntry::Object(object) => Some(object),
1785 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1786 }
1787 } else {
1788 self.record_db_get("object_lt_or_eq_version_scan")
1792 .find_object_lt_or_eq_version(object_id, version_bound)
1793 .expect("db error")
1794 }
1795
1796 } else if let Some(latest_cache_entry) = latest_cache_entry {
1802 assert!(!latest_cache_entry.lock().is_alive());
1804 None
1805 } else {
1806 let highest = cached_entry.and_then(|c| c.get_highest());
1808 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1809 self.cache_object_not_found(
1810 &object_id,
1811 self.object_by_id_cache.get_ticket_for_read(&object_id),
1813 );
1814 None
1815 }
1816 },
1817 )
1818 }
1819
1820 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1821 get_sui_system_state(self)
1822 }
1823
1824 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1825 get_bridge(self)
1826 }
1827
1828 fn get_marker_value(
1829 &self,
1830 object_key: FullObjectKey,
1831 epoch_id: EpochId,
1832 ) -> Option<MarkerValue> {
1833 match self.get_marker_value_cache_only(object_key, epoch_id) {
1834 CacheResult::Hit(marker) => Some(marker),
1835 CacheResult::NegativeHit => None,
1836 CacheResult::Miss => self
1837 .record_db_get("marker_by_version")
1838 .get_marker_value(object_key, epoch_id)
1839 .expect("db error"),
1840 }
1841 }
1842
1843 fn get_latest_marker(
1844 &self,
1845 object_id: FullObjectID,
1846 epoch_id: EpochId,
1847 ) -> Option<(SequenceNumber, MarkerValue)> {
1848 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1849 CacheResult::Hit((v, marker)) => Some((v, marker)),
1850 CacheResult::NegativeHit => {
1851 panic!("cannot have negative hit when getting latest marker")
1852 }
1853 CacheResult::Miss => self
1854 .record_db_get("marker_latest")
1855 .get_latest_marker(object_id, epoch_id)
1856 .expect("db error"),
1857 }
1858 }
1859
1860 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1861 let cur_epoch = epoch_store.epoch();
1862 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1863 CacheResult::Hit((_, obj)) => {
1864 let actual_objref = obj.compute_object_reference();
1865 if obj_ref != actual_objref {
1866 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1867 locked_ref: actual_objref,
1868 })
1869 } else {
1870 Ok(
1872 match self
1873 .object_locks
1874 .get_transaction_lock(&obj_ref, epoch_store)?
1875 {
1876 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1877 locked_by_tx: LockDetailsDeprecated {
1878 epoch: cur_epoch,
1879 tx_digest,
1880 },
1881 },
1882 None => ObjectLockStatus::Initialized,
1883 },
1884 )
1885 }
1886 }
1887 CacheResult::NegativeHit => {
1888 Err(SuiError::from(UserInputError::ObjectNotFound {
1889 object_id: obj_ref.0,
1890 version: None,
1893 }))
1894 }
1895 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1896 }
1897 }
1898
1899 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1900 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1901 UserInputError::ObjectNotFound {
1902 object_id,
1903 version: None,
1904 },
1905 )?;
1906 Ok(obj.compute_object_reference())
1907 }
1908
1909 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1910 do_fallback_lookup_fallible(
1911 owned_object_refs,
1912 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1913 CacheResult::Hit((version, obj)) => {
1914 if obj.compute_object_reference() != *obj_ref {
1915 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1916 provided_obj_ref: *obj_ref,
1917 current_version: version,
1918 }
1919 .into())
1920 } else {
1921 Ok(CacheResult::Hit(()))
1922 }
1923 }
1924 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1925 object_id: obj_ref.0,
1926 version: None,
1927 }
1928 .into()),
1929 CacheResult::Miss => Ok(CacheResult::Miss),
1930 },
1931 |remaining| {
1932 self.record_db_multi_get("object_is_live", remaining.len())
1933 .check_owned_objects_are_live(remaining)?;
1934 Ok(vec![(); remaining.len()])
1935 },
1936 )?;
1937 Ok(())
1938 }
1939
1940 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1941 self.store
1942 .perpetual_tables
1943 .get_highest_pruned_checkpoint()
1944 .expect("db error")
1945 }
1946
1947 fn notify_read_input_objects<'a>(
1948 &'a self,
1949 input_and_receiving_keys: &'a [InputKey],
1950 receiving_keys: &'a HashSet<InputKey>,
1951 epoch: EpochId,
1952 ) -> BoxFuture<'a, ()> {
1953 self.object_notify_read
1954 .read(
1955 "notify_read_input_objects",
1956 input_and_receiving_keys,
1957 move |keys| {
1958 self.multi_input_objects_available(keys, receiving_keys, epoch)
1959 .into_iter()
1960 .map(|available| if available { Some(()) } else { None })
1961 .collect::<Vec<_>>()
1962 },
1963 )
1964 .map(|_| ())
1965 .boxed()
1966 }
1967}
1968
1969impl TransactionCacheRead for WritebackCache {
1970 fn multi_get_transaction_blocks(
1971 &self,
1972 digests: &[TransactionDigest],
1973 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1974 let digests_and_tickets: Vec<_> = digests
1975 .iter()
1976 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1977 .collect();
1978 do_fallback_lookup(
1979 &digests_and_tickets,
1980 |(digest, _)| {
1981 self.metrics
1982 .record_cache_request("transaction_block", "uncommitted");
1983 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1984 self.metrics
1985 .record_cache_hit("transaction_block", "uncommitted");
1986 return CacheResult::Hit(Some(tx.transaction.clone()));
1987 }
1988 self.metrics
1989 .record_cache_miss("transaction_block", "uncommitted");
1990
1991 self.metrics
1992 .record_cache_request("transaction_block", "committed");
1993
1994 match self
1995 .cached
1996 .transactions
1997 .get(digest)
1998 .map(|l| l.lock().clone())
1999 {
2000 Some(PointCacheItem::Some(tx)) => {
2001 self.metrics
2002 .record_cache_hit("transaction_block", "committed");
2003 CacheResult::Hit(Some(tx))
2004 }
2005 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2006 None => {
2007 self.metrics
2008 .record_cache_miss("transaction_block", "committed");
2009
2010 CacheResult::Miss
2011 }
2012 }
2013 },
2014 |remaining| {
2015 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2016 let results: Vec<_> = self
2017 .record_db_multi_get("transaction_block", remaining.len())
2018 .multi_get_transaction_blocks(&remaining_digests)
2019 .expect("db error")
2020 .into_iter()
2021 .map(|o| o.map(Arc::new))
2022 .collect();
2023 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2024 if result.is_none() {
2025 self.cached.transactions.insert(digest, None, *ticket).ok();
2026 }
2027 }
2028 results
2029 },
2030 )
2031 }
2032
2033 fn multi_get_executed_effects_digests(
2034 &self,
2035 digests: &[TransactionDigest],
2036 ) -> Vec<Option<TransactionEffectsDigest>> {
2037 let digests_and_tickets: Vec<_> = digests
2038 .iter()
2039 .map(|d| {
2040 (
2041 *d,
2042 self.cached.executed_effects_digests.get_ticket_for_read(d),
2043 )
2044 })
2045 .collect();
2046 do_fallback_lookup(
2047 &digests_and_tickets,
2048 |(digest, _)| {
2049 self.metrics
2050 .record_cache_request("executed_effects_digests", "uncommitted");
2051 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
2052 self.metrics
2053 .record_cache_hit("executed_effects_digests", "uncommitted");
2054 return CacheResult::Hit(Some(*digest));
2055 }
2056 self.metrics
2057 .record_cache_miss("executed_effects_digests", "uncommitted");
2058
2059 self.metrics
2060 .record_cache_request("executed_effects_digests", "committed");
2061 match self
2062 .cached
2063 .executed_effects_digests
2064 .get(digest)
2065 .map(|l| *l.lock())
2066 {
2067 Some(PointCacheItem::Some(digest)) => {
2068 self.metrics
2069 .record_cache_hit("executed_effects_digests", "committed");
2070 CacheResult::Hit(Some(digest))
2071 }
2072 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2073 None => {
2074 self.metrics
2075 .record_cache_miss("executed_effects_digests", "committed");
2076 CacheResult::Miss
2077 }
2078 }
2079 },
2080 |remaining| {
2081 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2082 let results = self
2083 .record_db_multi_get("executed_effects_digests", remaining.len())
2084 .multi_get_executed_effects_digests(&remaining_digests)
2085 .expect("db error");
2086 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2087 if result.is_none() {
2088 self.cached
2089 .executed_effects_digests
2090 .insert(digest, None, *ticket)
2091 .ok();
2092 }
2093 }
2094 results
2095 },
2096 )
2097 }
2098
2099 fn multi_get_effects(
2100 &self,
2101 digests: &[TransactionEffectsDigest],
2102 ) -> Vec<Option<TransactionEffects>> {
2103 let digests_and_tickets: Vec<_> = digests
2104 .iter()
2105 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2106 .collect();
2107 do_fallback_lookup(
2108 &digests_and_tickets,
2109 |(digest, _)| {
2110 self.metrics
2111 .record_cache_request("transaction_effects", "uncommitted");
2112 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2113 self.metrics
2114 .record_cache_hit("transaction_effects", "uncommitted");
2115 return CacheResult::Hit(Some(effects.clone()));
2116 }
2117 self.metrics
2118 .record_cache_miss("transaction_effects", "uncommitted");
2119
2120 self.metrics
2121 .record_cache_request("transaction_effects", "committed");
2122 match self
2123 .cached
2124 .transaction_effects
2125 .get(digest)
2126 .map(|l| l.lock().clone())
2127 {
2128 Some(PointCacheItem::Some(effects)) => {
2129 self.metrics
2130 .record_cache_hit("transaction_effects", "committed");
2131 CacheResult::Hit(Some((*effects).clone()))
2132 }
2133 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2134 None => {
2135 self.metrics
2136 .record_cache_miss("transaction_effects", "committed");
2137 CacheResult::Miss
2138 }
2139 }
2140 },
2141 |remaining| {
2142 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2143 let results = self
2144 .record_db_multi_get("transaction_effects", remaining.len())
2145 .multi_get_effects(remaining_digests.iter())
2146 .expect("db error");
2147 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2148 if result.is_none() {
2149 self.cached
2150 .transaction_effects
2151 .insert(digest, None, *ticket)
2152 .ok();
2153 }
2154 }
2155 results
2156 },
2157 )
2158 }
2159
2160 fn transaction_executed_in_last_epoch(
2161 &self,
2162 digest: &TransactionDigest,
2163 current_epoch: EpochId,
2164 ) -> bool {
2165 if current_epoch == 0 {
2166 return false;
2167 }
2168 let last_epoch = current_epoch - 1;
2169 let cache_key = (last_epoch, *digest);
2170
2171 let ticket = self
2172 .cached
2173 .transaction_executed_in_last_epoch
2174 .get_ticket_for_read(&cache_key);
2175
2176 if let Some(cached) = self
2177 .cached
2178 .transaction_executed_in_last_epoch
2179 .get(&cache_key)
2180 {
2181 return cached.lock().is_some();
2182 }
2183
2184 let was_executed = self
2185 .store
2186 .perpetual_tables
2187 .was_transaction_executed_in_last_epoch(digest, current_epoch);
2188
2189 let value = if was_executed { Some(()) } else { None };
2190 self.cached
2191 .transaction_executed_in_last_epoch
2192 .insert(&cache_key, value, ticket)
2193 .ok();
2194
2195 was_executed
2196 }
2197
2198 fn notify_read_executed_effects_digests<'a>(
2199 &'a self,
2200 task_name: &'static str,
2201 digests: &'a [TransactionDigest],
2202 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2203 self.executed_effects_digests_notify_read
2204 .read(task_name, digests, |digests| {
2205 self.multi_get_executed_effects_digests(digests)
2206 })
2207 .boxed()
2208 }
2209
2210 fn multi_get_events(
2211 &self,
2212 event_digests: &[TransactionDigest],
2213 ) -> Vec<Option<TransactionEvents>> {
2214 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2215 if events.data.is_empty() {
2216 None
2217 } else {
2218 Some(events)
2219 }
2220 }
2221
2222 let digests_and_tickets: Vec<_> = event_digests
2223 .iter()
2224 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2225 .collect();
2226 do_fallback_lookup(
2227 &digests_and_tickets,
2228 |(digest, _)| {
2229 self.metrics
2230 .record_cache_request("transaction_events", "uncommitted");
2231 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2232 self.metrics
2233 .record_cache_hit("transaction_events", "uncommitted");
2234
2235 return CacheResult::Hit(map_events(events));
2236 }
2237 self.metrics
2238 .record_cache_miss("transaction_events", "uncommitted");
2239
2240 self.metrics
2241 .record_cache_request("transaction_events", "committed");
2242 match self
2243 .cached
2244 .transaction_events
2245 .get(digest)
2246 .map(|l| l.lock().clone())
2247 {
2248 Some(PointCacheItem::Some(events)) => {
2249 self.metrics
2250 .record_cache_hit("transaction_events", "committed");
2251 CacheResult::Hit(map_events((*events).clone()))
2252 }
2253 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2254 None => {
2255 self.metrics
2256 .record_cache_miss("transaction_events", "committed");
2257
2258 CacheResult::Miss
2259 }
2260 }
2261 },
2262 |remaining| {
2263 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2264 let results = self
2265 .store
2266 .multi_get_events(&remaining_digests)
2267 .expect("db error");
2268 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2269 if result.is_none() {
2270 self.cached
2271 .transaction_events
2272 .insert(digest, None, *ticket)
2273 .ok();
2274 }
2275 }
2276 results
2277 },
2278 )
2279 }
2280
2281 fn get_unchanged_loaded_runtime_objects(
2282 &self,
2283 digest: &TransactionDigest,
2284 ) -> Option<Vec<ObjectKey>> {
2285 self.dirty
2286 .unchanged_loaded_runtime_objects
2287 .get(digest)
2288 .map(|b| b.clone())
2289 .or_else(|| {
2290 self.store
2291 .get_unchanged_loaded_runtime_objects(digest)
2292 .expect("db error")
2293 })
2294 }
2295
2296 fn get_mysticeti_fastpath_outputs(
2297 &self,
2298 tx_digest: &TransactionDigest,
2299 ) -> Option<Arc<TransactionOutputs>> {
2300 self.dirty.fastpath_transaction_outputs.get(tx_digest)
2301 }
2302
2303 fn notify_read_fastpath_transaction_outputs<'a>(
2304 &'a self,
2305 tx_digests: &'a [TransactionDigest],
2306 ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2307 self.fastpath_transaction_outputs_notify_read
2308 .read(
2309 "notify_read_fastpath_transaction_outputs",
2310 tx_digests,
2311 |tx_digests| {
2312 tx_digests
2313 .iter()
2314 .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2315 .collect()
2316 },
2317 )
2318 .boxed()
2319 }
2320
2321 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2322 self.dirty
2323 .pending_transaction_writes
2324 .get(digest)
2325 .map(|transaction_output| transaction_output.take_accumulator_events())
2326 }
2327}
2328
2329impl ExecutionCacheWrite for WritebackCache {
2330 fn acquire_transaction_locks(
2331 &self,
2332 epoch_store: &AuthorityPerEpochStore,
2333 owned_input_objects: &[ObjectRef],
2334 tx_digest: TransactionDigest,
2335 signed_transaction: Option<VerifiedSignedTransaction>,
2336 ) -> SuiResult {
2337 self.object_locks.acquire_transaction_locks(
2338 self,
2339 epoch_store,
2340 owned_input_objects,
2341 tx_digest,
2342 signed_transaction,
2343 )
2344 }
2345
2346 fn validate_owned_object_versions(&self, owned_input_objects: &[ObjectRef]) -> SuiResult {
2347 ObjectLocks::validate_owned_object_versions(self, owned_input_objects)
2348 }
2349
2350 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2351 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2352 }
2353
2354 fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2355 let tx_digest = *tx_outputs.transaction.digest();
2356 debug!(
2357 ?tx_digest,
2358 "writing mysticeti fastpath certified transaction outputs"
2359 );
2360 self.dirty
2361 .fastpath_transaction_outputs
2362 .insert(tx_digest, tx_outputs.clone());
2363 self.fastpath_transaction_outputs_notify_read
2364 .notify(&tx_digest, &tx_outputs);
2365 }
2366
2367 #[cfg(test)]
2368 fn write_object_entry_for_test(&self, object: Object) {
2369 self.write_object_entry(&object.id(), object.version(), object.into());
2370 }
2371}
2372
2373implement_passthrough_traits!(WritebackCache);
2374
2375impl GlobalStateHashStore for WritebackCache {
2376 fn get_object_ref_prior_to_key_deprecated(
2377 &self,
2378 object_id: &ObjectID,
2379 version: SequenceNumber,
2380 ) -> SuiResult<Option<ObjectRef>> {
2381 let mut candidates = Vec::new();
2385
2386 let check_versions =
2387 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2388 Some((version, object_entry)) => match object_entry {
2389 ObjectEntry::Object(object) => {
2390 assert_eq!(object.version(), version);
2391 Some(object.compute_object_reference())
2392 }
2393 ObjectEntry::Deleted => {
2394 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2395 }
2396 ObjectEntry::Wrapped => {
2397 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2398 }
2399 },
2400 None => None,
2401 };
2402
2403 if let Some(objects) = self.dirty.objects.get(object_id)
2405 && let Some(prior) = check_versions(&objects)
2406 {
2407 candidates.push(prior);
2408 }
2409
2410 if let Some(objects) = self.cached.object_cache.get(object_id)
2411 && let Some(prior) = check_versions(&objects.lock())
2412 {
2413 candidates.push(prior);
2414 }
2415
2416 if let Some(prior) = self
2417 .store
2418 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2419 {
2420 candidates.push(prior);
2421 }
2422
2423 candidates.sort_by_key(|(_, version, _)| *version);
2425 Ok(candidates.pop())
2426 }
2427
2428 fn get_root_state_hash_for_epoch(
2429 &self,
2430 epoch: EpochId,
2431 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2432 self.store.get_root_state_hash_for_epoch(epoch)
2433 }
2434
2435 fn get_root_state_hash_for_highest_epoch(
2436 &self,
2437 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2438 self.store.get_root_state_hash_for_highest_epoch()
2439 }
2440
2441 fn insert_state_hash_for_epoch(
2442 &self,
2443 epoch: EpochId,
2444 checkpoint_seq_num: &CheckpointSequenceNumber,
2445 acc: &GlobalStateHash,
2446 ) -> SuiResult {
2447 self.store
2448 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2449 }
2450
2451 fn iter_live_object_set(
2452 &self,
2453 include_wrapped_tombstone: bool,
2454 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2455 assert!(
2459 self.dirty.is_empty(),
2460 "cannot iterate live object set with dirty data"
2461 );
2462 self.store.iter_live_object_set(include_wrapped_tombstone)
2463 }
2464
2465 fn iter_cached_live_object_set_for_testing(
2469 &self,
2470 include_wrapped_tombstone: bool,
2471 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2472 let iter = self.dirty.objects.iter();
2474 let mut dirty_objects = BTreeMap::new();
2475
2476 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2478 dirty_objects.insert(obj.object_id(), obj);
2479 }
2480
2481 for entry in iter {
2483 let id = *entry.key();
2484 let value = entry.value();
2485 match value.get_highest().unwrap() {
2486 (_, ObjectEntry::Object(object)) => {
2487 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2488 }
2489 (version, ObjectEntry::Wrapped) => {
2490 if include_wrapped_tombstone {
2491 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2492 } else {
2493 dirty_objects.remove(&id);
2494 }
2495 }
2496 (_, ObjectEntry::Deleted) => {
2497 dirty_objects.remove(&id);
2498 }
2499 }
2500 }
2501
2502 Box::new(dirty_objects.into_values())
2503 }
2504}
2505
2506impl StateSyncAPI for WritebackCache {
2512 fn insert_transaction_and_effects(
2513 &self,
2514 transaction: &VerifiedTransaction,
2515 transaction_effects: &TransactionEffects,
2516 ) {
2517 self.store
2518 .insert_transaction_and_effects(transaction, transaction_effects)
2519 .expect("db error");
2520 self.cached
2521 .transactions
2522 .insert(
2523 transaction.digest(),
2524 PointCacheItem::Some(Arc::new(transaction.clone())),
2525 Ticket::Write,
2526 )
2527 .ok();
2528 self.cached
2529 .transaction_effects
2530 .insert(
2531 &transaction_effects.digest(),
2532 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2533 Ticket::Write,
2534 )
2535 .ok();
2536 }
2537
2538 fn multi_insert_transaction_and_effects(
2539 &self,
2540 transactions_and_effects: &[VerifiedExecutionData],
2541 ) {
2542 self.store
2543 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2544 .expect("db error");
2545 for VerifiedExecutionData {
2546 transaction,
2547 effects,
2548 } in transactions_and_effects
2549 {
2550 self.cached
2551 .transactions
2552 .insert(
2553 transaction.digest(),
2554 PointCacheItem::Some(Arc::new(transaction.clone())),
2555 Ticket::Write,
2556 )
2557 .ok();
2558 self.cached
2559 .transaction_effects
2560 .insert(
2561 &effects.digest(),
2562 PointCacheItem::Some(Arc::new(effects.clone())),
2563 Ticket::Write,
2564 )
2565 .ok();
2566 }
2567 }
2568}