1use crate::authority::AuthorityStore;
41use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
42use crate::authority::authority_store::{
43 ExecutionLockWriteGuard, LockDetailsDeprecated, ObjectLockStatus, SuiLockResult,
44};
45use crate::authority::authority_store_tables::LiveObject;
46use crate::authority::backpressure::BackpressureManager;
47use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration};
48use crate::fallback_fetch::{do_fallback_lookup, do_fallback_lookup_fallible};
49use crate::global_state_hasher::GlobalStateHashStore;
50use crate::transaction_outputs::TransactionOutputs;
51
52use dashmap::DashMap;
53use dashmap::mapref::entry::Entry as DashMapEntry;
54use futures::{FutureExt, future::BoxFuture};
55use moka::sync::SegmentedCache as MokaCache;
56use mysten_common::debug_fatal;
57use mysten_common::random_util::randomize_cache_capacity_in_tests;
58use mysten_common::sync::notify_read::NotifyRead;
59use parking_lot::Mutex;
60use std::collections::{BTreeMap, HashSet};
61use std::hash::Hash;
62use std::sync::Arc;
63use std::sync::atomic::AtomicU64;
64use sui_config::ExecutionCacheConfig;
65use sui_macros::fail_point;
66use sui_protocol_config::ProtocolVersion;
67use sui_types::accumulator_event::AccumulatorEvent;
68use sui_types::base_types::{
69 EpochId, FullObjectID, ObjectID, ObjectRef, SequenceNumber, VerifiedExecutionData,
70};
71use sui_types::bridge::{Bridge, get_bridge};
72use sui_types::digests::{ObjectDigest, TransactionDigest, TransactionEffectsDigest};
73use sui_types::effects::{TransactionEffects, TransactionEvents};
74use sui_types::error::{SuiError, SuiErrorKind, SuiResult, UserInputError};
75use sui_types::executable_transaction::VerifiedExecutableTransaction;
76use sui_types::global_state_hash::GlobalStateHash;
77use sui_types::message_envelope::Message;
78use sui_types::messages_checkpoint::CheckpointSequenceNumber;
79use sui_types::object::Object;
80use sui_types::storage::{
81 FullObjectKey, InputKey, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject,
82};
83use sui_types::sui_system_state::{SuiSystemState, get_sui_system_state};
84use sui_types::transaction::{TransactionDataAPI, VerifiedSignedTransaction, VerifiedTransaction};
85use tap::TapOptional;
86use tracing::{debug, info, instrument, trace, warn};
87
88use super::ExecutionCacheAPI;
89use super::cache_types::Ticket;
90use super::{
91 Batch, CheckpointCache, ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheReconfigAPI,
92 ExecutionCacheWrite, ObjectCacheRead, StateSyncAPI, TestingAPI, TransactionCacheRead,
93 cache_types::{CacheResult, CachedVersionMap, IsNewer, MonotonicCache},
94 implement_passthrough_traits,
95 object_locks::ObjectLocks,
96};
97
98#[cfg(test)]
99#[path = "unit_tests/writeback_cache_tests.rs"]
100pub mod writeback_cache_tests;
101
102#[cfg(test)]
103#[path = "unit_tests/notify_read_input_objects_tests.rs"]
104mod notify_read_input_objects_tests;
105
106#[derive(Clone, PartialEq, Eq)]
107enum ObjectEntry {
108 Object(Object),
109 Deleted,
110 Wrapped,
111}
112
113impl ObjectEntry {
114 #[cfg(test)]
115 fn unwrap_object(&self) -> &Object {
116 match self {
117 ObjectEntry::Object(o) => o,
118 _ => panic!("unwrap_object called on non-Object"),
119 }
120 }
121
122 fn is_tombstone(&self) -> bool {
123 match self {
124 ObjectEntry::Deleted | ObjectEntry::Wrapped => true,
125 ObjectEntry::Object(_) => false,
126 }
127 }
128}
129
130impl std::fmt::Debug for ObjectEntry {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 ObjectEntry::Object(o) => {
134 write!(f, "ObjectEntry::Object({:?})", o.compute_object_reference())
135 }
136 ObjectEntry::Deleted => write!(f, "ObjectEntry::Deleted"),
137 ObjectEntry::Wrapped => write!(f, "ObjectEntry::Wrapped"),
138 }
139 }
140}
141
142impl From<Object> for ObjectEntry {
143 fn from(object: Object) -> Self {
144 ObjectEntry::Object(object)
145 }
146}
147
148impl From<ObjectOrTombstone> for ObjectEntry {
149 fn from(object: ObjectOrTombstone) -> Self {
150 match object {
151 ObjectOrTombstone::Object(o) => o.into(),
152 ObjectOrTombstone::Tombstone(obj_ref) => {
153 if obj_ref.2.is_deleted() {
154 ObjectEntry::Deleted
155 } else if obj_ref.2.is_wrapped() {
156 ObjectEntry::Wrapped
157 } else {
158 panic!("tombstone digest must either be deleted or wrapped");
159 }
160 }
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166enum LatestObjectCacheEntry {
167 Object(SequenceNumber, ObjectEntry),
168 NonExistent,
169}
170
171impl LatestObjectCacheEntry {
172 #[cfg(test)]
173 fn version(&self) -> Option<SequenceNumber> {
174 match self {
175 LatestObjectCacheEntry::Object(version, _) => Some(*version),
176 LatestObjectCacheEntry::NonExistent => None,
177 }
178 }
179
180 fn is_alive(&self) -> bool {
181 match self {
182 LatestObjectCacheEntry::Object(_, entry) => !entry.is_tombstone(),
183 LatestObjectCacheEntry::NonExistent => false,
184 }
185 }
186}
187
188impl IsNewer for LatestObjectCacheEntry {
189 fn is_newer_than(&self, other: &LatestObjectCacheEntry) -> bool {
190 match (self, other) {
191 (LatestObjectCacheEntry::Object(v1, _), LatestObjectCacheEntry::Object(v2, _)) => {
192 v1 > v2
193 }
194 (LatestObjectCacheEntry::Object(_, _), LatestObjectCacheEntry::NonExistent) => true,
195 _ => false,
196 }
197 }
198}
199
200type MarkerKey = (EpochId, FullObjectID);
201
202struct UncommittedData {
205 objects: DashMap<ObjectID, CachedVersionMap<ObjectEntry>>,
218
219 markers: DashMap<MarkerKey, CachedVersionMap<MarkerValue>>,
224
225 transaction_effects: DashMap<TransactionEffectsDigest, TransactionEffects>,
226
227 transaction_events: DashMap<TransactionDigest, TransactionEvents>,
228
229 unchanged_loaded_runtime_objects: DashMap<TransactionDigest, Vec<ObjectKey>>,
230
231 executed_effects_digests: DashMap<TransactionDigest, TransactionEffectsDigest>,
232
233 pending_transaction_writes: DashMap<TransactionDigest, Arc<TransactionOutputs>>,
236
237 fastpath_transaction_outputs: MokaCache<TransactionDigest, Arc<TransactionOutputs>>,
250
251 total_transaction_inserts: AtomicU64,
252 total_transaction_commits: AtomicU64,
253}
254
255impl UncommittedData {
256 fn new(config: &ExecutionCacheConfig) -> Self {
257 Self {
258 objects: DashMap::with_shard_amount(2048),
259 markers: DashMap::with_shard_amount(2048),
260 transaction_effects: DashMap::with_shard_amount(2048),
261 executed_effects_digests: DashMap::with_shard_amount(2048),
262 pending_transaction_writes: DashMap::with_shard_amount(2048),
263 fastpath_transaction_outputs: MokaCache::builder(8)
264 .max_capacity(randomize_cache_capacity_in_tests(
265 config.fastpath_transaction_outputs_cache_size(),
266 ))
267 .build(),
268 transaction_events: DashMap::with_shard_amount(2048),
269 unchanged_loaded_runtime_objects: DashMap::with_shard_amount(2048),
270 total_transaction_inserts: AtomicU64::new(0),
271 total_transaction_commits: AtomicU64::new(0),
272 }
273 }
274
275 fn clear(&self) {
276 self.objects.clear();
277 self.markers.clear();
278 self.transaction_effects.clear();
279 self.executed_effects_digests.clear();
280 self.pending_transaction_writes.clear();
281 self.fastpath_transaction_outputs.invalidate_all();
282 self.transaction_events.clear();
283 self.unchanged_loaded_runtime_objects.clear();
284 self.total_transaction_inserts
285 .store(0, std::sync::atomic::Ordering::Relaxed);
286 self.total_transaction_commits
287 .store(0, std::sync::atomic::Ordering::Relaxed);
288 }
289
290 fn is_empty(&self) -> bool {
291 let empty = self.pending_transaction_writes.is_empty();
292 if empty && cfg!(debug_assertions) {
293 assert!(
294 self.objects.is_empty()
295 && self.markers.is_empty()
296 && self.transaction_effects.is_empty()
297 && self.executed_effects_digests.is_empty()
298 && self.transaction_events.is_empty()
299 && self.unchanged_loaded_runtime_objects.is_empty()
300 && self
301 .total_transaction_inserts
302 .load(std::sync::atomic::Ordering::Relaxed)
303 == self
304 .total_transaction_commits
305 .load(std::sync::atomic::Ordering::Relaxed),
306 );
307 }
308 empty
309 }
310}
311
312type PointCacheItem<T> = Option<T>;
314
315impl<T: Eq + std::fmt::Debug> IsNewer for PointCacheItem<T> {
318 fn is_newer_than(&self, other: &PointCacheItem<T>) -> bool {
319 match (self, other) {
320 (Some(_), None) => true,
321
322 (Some(a), Some(b)) => {
323 debug_assert_eq!(a, b);
325 false
326 }
327
328 _ => false,
329 }
330 }
331}
332
333struct CachedCommittedData {
335 object_cache: MokaCache<ObjectID, Arc<Mutex<CachedVersionMap<ObjectEntry>>>>,
337
338 marker_cache: MokaCache<MarkerKey, Arc<Mutex<CachedVersionMap<MarkerValue>>>>,
340
341 transactions: MonotonicCache<TransactionDigest, PointCacheItem<Arc<VerifiedTransaction>>>,
342
343 transaction_effects:
344 MonotonicCache<TransactionEffectsDigest, PointCacheItem<Arc<TransactionEffects>>>,
345
346 transaction_events: MonotonicCache<TransactionDigest, PointCacheItem<Arc<TransactionEvents>>>,
347
348 executed_effects_digests:
349 MonotonicCache<TransactionDigest, PointCacheItem<TransactionEffectsDigest>>,
350
351 _transaction_objects: MokaCache<TransactionDigest, Vec<Object>>,
354}
355
356impl CachedCommittedData {
357 fn new(config: &ExecutionCacheConfig) -> Self {
358 let object_cache = MokaCache::builder(8)
359 .max_capacity(randomize_cache_capacity_in_tests(
360 config.object_cache_size(),
361 ))
362 .build();
363 let marker_cache = MokaCache::builder(8)
364 .max_capacity(randomize_cache_capacity_in_tests(
365 config.marker_cache_size(),
366 ))
367 .build();
368
369 let transactions = MonotonicCache::new(randomize_cache_capacity_in_tests(
370 config.transaction_cache_size(),
371 ));
372 let transaction_effects = MonotonicCache::new(randomize_cache_capacity_in_tests(
373 config.effect_cache_size(),
374 ));
375 let transaction_events = MonotonicCache::new(randomize_cache_capacity_in_tests(
376 config.events_cache_size(),
377 ));
378 let executed_effects_digests = MonotonicCache::new(randomize_cache_capacity_in_tests(
379 config.executed_effect_cache_size(),
380 ));
381
382 let transaction_objects = MokaCache::builder(8)
383 .max_capacity(randomize_cache_capacity_in_tests(
384 config.transaction_objects_cache_size(),
385 ))
386 .build();
387
388 Self {
389 object_cache,
390 marker_cache,
391 transactions,
392 transaction_effects,
393 transaction_events,
394 executed_effects_digests,
395 _transaction_objects: transaction_objects,
396 }
397 }
398
399 fn clear_and_assert_empty(&self) {
400 self.object_cache.invalidate_all();
401 self.marker_cache.invalidate_all();
402 self.transactions.invalidate_all();
403 self.transaction_effects.invalidate_all();
404 self.transaction_events.invalidate_all();
405 self.executed_effects_digests.invalidate_all();
406 self._transaction_objects.invalidate_all();
407
408 assert_empty(&self.object_cache);
409 assert_empty(&self.marker_cache);
410 assert!(self.transactions.is_empty());
411 assert!(self.transaction_effects.is_empty());
412 assert!(self.transaction_events.is_empty());
413 assert!(self.executed_effects_digests.is_empty());
414 assert_empty(&self._transaction_objects);
415 }
416}
417
418fn assert_empty<K, V>(cache: &MokaCache<K, V>)
419where
420 K: std::hash::Hash + std::cmp::Eq + std::cmp::PartialEq + Send + Sync + 'static,
421 V: std::clone::Clone + std::marker::Send + std::marker::Sync + 'static,
422{
423 if cache.iter().next().is_some() {
424 panic!("cache should be empty");
425 }
426}
427
428pub struct WritebackCache {
429 dirty: UncommittedData,
430 cached: CachedCommittedData,
431
432 object_by_id_cache: MonotonicCache<ObjectID, LatestObjectCacheEntry>,
439
440 packages: MokaCache<ObjectID, PackageObject>,
451
452 object_locks: ObjectLocks,
453
454 executed_effects_digests_notify_read: NotifyRead<TransactionDigest, TransactionEffectsDigest>,
455 object_notify_read: NotifyRead<InputKey, ()>,
456 fastpath_transaction_outputs_notify_read:
457 NotifyRead<TransactionDigest, Arc<TransactionOutputs>>,
458
459 store: Arc<AuthorityStore>,
460 backpressure_threshold: u64,
461 backpressure_manager: Arc<BackpressureManager>,
462 metrics: Arc<ExecutionCacheMetrics>,
463}
464
465macro_rules! check_cache_entry_by_version {
466 ($self: ident, $table: expr, $level: expr, $cache: expr, $version: expr) => {
467 $self.metrics.record_cache_request($table, $level);
468 if let Some(cache) = $cache {
469 if let Some(entry) = cache.get(&$version) {
470 $self.metrics.record_cache_hit($table, $level);
471 return CacheResult::Hit(entry.clone());
472 }
473
474 if let Some(least_version) = cache.get_least() {
475 if least_version.0 < $version {
476 $self.metrics.record_cache_negative_hit($table, $level);
479 return CacheResult::NegativeHit;
480 }
481 }
482 }
483 $self.metrics.record_cache_miss($table, $level);
484 };
485}
486
487macro_rules! check_cache_entry_by_latest {
488 ($self: ident, $table: expr, $level: expr, $cache: expr) => {
489 $self.metrics.record_cache_request($table, $level);
490 if let Some(cache) = $cache {
491 if let Some((version, entry)) = cache.get_highest() {
492 $self.metrics.record_cache_hit($table, $level);
493 return CacheResult::Hit((*version, entry.clone()));
494 } else {
495 panic!("empty CachedVersionMap should have been removed");
496 }
497 }
498 $self.metrics.record_cache_miss($table, $level);
499 };
500}
501
502impl WritebackCache {
503 pub fn new(
504 config: &ExecutionCacheConfig,
505 store: Arc<AuthorityStore>,
506 metrics: Arc<ExecutionCacheMetrics>,
507 backpressure_manager: Arc<BackpressureManager>,
508 ) -> Self {
509 let packages = MokaCache::builder(8)
510 .max_capacity(randomize_cache_capacity_in_tests(
511 config.package_cache_size(),
512 ))
513 .build();
514 Self {
515 dirty: UncommittedData::new(config),
516 cached: CachedCommittedData::new(config),
517 object_by_id_cache: MonotonicCache::new(randomize_cache_capacity_in_tests(
518 config.object_by_id_cache_size(),
519 )),
520 packages,
521 object_locks: ObjectLocks::new(),
522 executed_effects_digests_notify_read: NotifyRead::new(),
523 object_notify_read: NotifyRead::new(),
524 fastpath_transaction_outputs_notify_read: NotifyRead::new(),
525 store,
526 backpressure_manager,
527 backpressure_threshold: config.backpressure_threshold(),
528 metrics,
529 }
530 }
531
532 pub fn new_for_tests(store: Arc<AuthorityStore>) -> Self {
533 Self::new(
534 &Default::default(),
535 store,
536 ExecutionCacheMetrics::new(&prometheus::Registry::new()).into(),
537 BackpressureManager::new_for_tests(),
538 )
539 }
540
541 #[cfg(test)]
542 pub fn reset_for_test(&mut self) {
543 let mut new = Self::new(
544 &Default::default(),
545 self.store.clone(),
546 self.metrics.clone(),
547 self.backpressure_manager.clone(),
548 );
549 std::mem::swap(self, &mut new);
550 }
551
552 fn write_object_entry(
553 &self,
554 object_id: &ObjectID,
555 version: SequenceNumber,
556 object: ObjectEntry,
557 ) {
558 trace!(?object_id, ?version, ?object, "inserting object entry");
559 self.metrics.record_cache_write("object");
560
561 let mut entry = self.dirty.objects.entry(*object_id).or_default();
581
582 self.object_by_id_cache
583 .insert(
584 object_id,
585 LatestObjectCacheEntry::Object(version, object.clone()),
586 Ticket::Write,
587 )
588 .ok();
591
592 entry.insert(version, object.clone());
593
594 if let ObjectEntry::Object(object) = &object {
595 if object.is_package() {
596 self.object_notify_read
597 .notify(&InputKey::Package { id: *object_id }, &());
598 } else if !object.is_child_object() {
599 self.object_notify_read.notify(
600 &InputKey::VersionedObject {
601 id: object.full_id(),
602 version: object.version(),
603 },
604 &(),
605 );
606 }
607 }
608 }
609
610 fn write_marker_value(
611 &self,
612 epoch_id: EpochId,
613 object_key: FullObjectKey,
614 marker_value: MarkerValue,
615 ) {
616 tracing::trace!("inserting marker value {object_key:?}: {marker_value:?}",);
617 self.metrics.record_cache_write("marker");
618 self.dirty
619 .markers
620 .entry((epoch_id, object_key.id()))
621 .or_default()
622 .value_mut()
623 .insert(object_key.version(), marker_value);
624 if matches!(marker_value, MarkerValue::ConsensusStreamEnded(_)) {
629 self.object_notify_read.notify(
630 &InputKey::VersionedObject {
631 id: object_key.id(),
632 version: object_key.version(),
633 },
634 &(),
635 );
636 }
637 }
638
639 fn with_locked_cache_entries<K, V, R>(
643 dirty_map: &DashMap<K, CachedVersionMap<V>>,
644 cached_map: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
645 key: &K,
646 cb: impl FnOnce(Option<&CachedVersionMap<V>>, Option<&CachedVersionMap<V>>) -> R,
647 ) -> R
648 where
649 K: Copy + Eq + Hash + Send + Sync + 'static,
650 V: Send + Sync + 'static,
651 {
652 let dirty_entry = dirty_map.entry(*key);
653 let dirty_entry = match &dirty_entry {
654 DashMapEntry::Occupied(occupied) => Some(occupied.get()),
655 DashMapEntry::Vacant(_) => None,
656 };
657
658 let cached_entry = cached_map.get(key);
659 let cached_lock = cached_entry.as_ref().map(|entry| entry.lock());
660 let cached_entry = cached_lock.as_deref();
661
662 cb(dirty_entry, cached_entry)
663 }
664
665 fn get_object_entry_by_key_cache_only(
668 &self,
669 object_id: &ObjectID,
670 version: SequenceNumber,
671 ) -> CacheResult<ObjectEntry> {
672 Self::with_locked_cache_entries(
673 &self.dirty.objects,
674 &self.cached.object_cache,
675 object_id,
676 |dirty_entry, cached_entry| {
677 check_cache_entry_by_version!(
678 self,
679 "object_by_version",
680 "uncommitted",
681 dirty_entry,
682 version
683 );
684 check_cache_entry_by_version!(
685 self,
686 "object_by_version",
687 "committed",
688 cached_entry,
689 version
690 );
691 CacheResult::Miss
692 },
693 )
694 }
695
696 fn get_object_by_key_cache_only(
697 &self,
698 object_id: &ObjectID,
699 version: SequenceNumber,
700 ) -> CacheResult<Object> {
701 match self.get_object_entry_by_key_cache_only(object_id, version) {
702 CacheResult::Hit(entry) => match entry {
703 ObjectEntry::Object(object) => CacheResult::Hit(object),
704 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
705 },
706 CacheResult::Miss => CacheResult::Miss,
707 CacheResult::NegativeHit => CacheResult::NegativeHit,
708 }
709 }
710
711 fn get_object_entry_by_id_cache_only(
712 &self,
713 request_type: &'static str,
714 object_id: &ObjectID,
715 ) -> CacheResult<(SequenceNumber, ObjectEntry)> {
716 self.metrics
717 .record_cache_request(request_type, "object_by_id");
718 let entry = self.object_by_id_cache.get(object_id);
719
720 if cfg!(debug_assertions)
721 && let Some(entry) = &entry
722 {
723 let highest: Option<ObjectEntry> = self
725 .dirty
726 .objects
727 .get(object_id)
728 .and_then(|entry| entry.get_highest().map(|(_, o)| o.clone()))
729 .or_else(|| {
730 let obj: Option<ObjectEntry> = self
731 .store
732 .get_latest_object_or_tombstone(*object_id)
733 .unwrap()
734 .map(|(_, o)| o.into());
735 obj
736 });
737
738 let cache_entry = match &*entry.lock() {
739 LatestObjectCacheEntry::Object(_, entry) => Some(entry.clone()),
740 LatestObjectCacheEntry::NonExistent => None,
741 };
742
743 let tombstone_possibly_pruned = highest.is_none()
745 && cache_entry
746 .as_ref()
747 .map(|e| e.is_tombstone())
748 .unwrap_or(false);
749
750 if highest != cache_entry && !tombstone_possibly_pruned {
751 tracing::error!(
752 ?highest,
753 ?cache_entry,
754 ?tombstone_possibly_pruned,
755 "object_by_id cache is incoherent for {:?}",
756 object_id
757 );
758 panic!("object_by_id cache is incoherent for {:?}", object_id);
759 }
760 }
761
762 if let Some(entry) = entry {
763 let entry = entry.lock();
764 match &*entry {
765 LatestObjectCacheEntry::Object(latest_version, latest_object) => {
766 self.metrics.record_cache_hit(request_type, "object_by_id");
767 return CacheResult::Hit((*latest_version, latest_object.clone()));
768 }
769 LatestObjectCacheEntry::NonExistent => {
770 self.metrics
771 .record_cache_negative_hit(request_type, "object_by_id");
772 return CacheResult::NegativeHit;
773 }
774 }
775 } else {
776 self.metrics.record_cache_miss(request_type, "object_by_id");
777 }
778
779 Self::with_locked_cache_entries(
780 &self.dirty.objects,
781 &self.cached.object_cache,
782 object_id,
783 |dirty_entry, cached_entry| {
784 check_cache_entry_by_latest!(self, request_type, "uncommitted", dirty_entry);
785 check_cache_entry_by_latest!(self, request_type, "committed", cached_entry);
786 CacheResult::Miss
787 },
788 )
789 }
790
791 fn get_object_by_id_cache_only(
792 &self,
793 request_type: &'static str,
794 object_id: &ObjectID,
795 ) -> CacheResult<(SequenceNumber, Object)> {
796 match self.get_object_entry_by_id_cache_only(request_type, object_id) {
797 CacheResult::Hit((version, entry)) => match entry {
798 ObjectEntry::Object(object) => CacheResult::Hit((version, object)),
799 ObjectEntry::Deleted | ObjectEntry::Wrapped => CacheResult::NegativeHit,
800 },
801 CacheResult::NegativeHit => CacheResult::NegativeHit,
802 CacheResult::Miss => CacheResult::Miss,
803 }
804 }
805
806 fn get_marker_value_cache_only(
807 &self,
808 object_key: FullObjectKey,
809 epoch_id: EpochId,
810 ) -> CacheResult<MarkerValue> {
811 Self::with_locked_cache_entries(
812 &self.dirty.markers,
813 &self.cached.marker_cache,
814 &(epoch_id, object_key.id()),
815 |dirty_entry, cached_entry| {
816 check_cache_entry_by_version!(
817 self,
818 "marker_by_version",
819 "uncommitted",
820 dirty_entry,
821 object_key.version()
822 );
823 check_cache_entry_by_version!(
824 self,
825 "marker_by_version",
826 "committed",
827 cached_entry,
828 object_key.version()
829 );
830 CacheResult::Miss
831 },
832 )
833 }
834
835 fn get_latest_marker_value_cache_only(
836 &self,
837 object_id: FullObjectID,
838 epoch_id: EpochId,
839 ) -> CacheResult<(SequenceNumber, MarkerValue)> {
840 Self::with_locked_cache_entries(
841 &self.dirty.markers,
842 &self.cached.marker_cache,
843 &(epoch_id, object_id),
844 |dirty_entry, cached_entry| {
845 check_cache_entry_by_latest!(self, "marker_latest", "uncommitted", dirty_entry);
846 check_cache_entry_by_latest!(self, "marker_latest", "committed", cached_entry);
847 CacheResult::Miss
848 },
849 )
850 }
851
852 fn get_object_impl(&self, request_type: &'static str, id: &ObjectID) -> Option<Object> {
853 let ticket = self.object_by_id_cache.get_ticket_for_read(id);
854 match self.get_object_entry_by_id_cache_only(request_type, id) {
855 CacheResult::Hit((_, entry)) => match entry {
856 ObjectEntry::Object(object) => Some(object),
857 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
858 },
859 CacheResult::NegativeHit => None,
860 CacheResult::Miss => {
861 let obj = self
862 .store
863 .get_latest_object_or_tombstone(*id)
864 .expect("db error");
865 match obj {
866 Some((key, obj)) => {
867 self.cache_latest_object_by_id(
868 id,
869 LatestObjectCacheEntry::Object(key.1, obj.clone().into()),
870 ticket,
871 );
872 match obj {
873 ObjectOrTombstone::Object(object) => Some(object),
874 ObjectOrTombstone::Tombstone(_) => None,
875 }
876 }
877 None => {
878 self.cache_object_not_found(id, ticket);
879 None
880 }
881 }
882 }
883 }
884 }
885
886 fn record_db_get(&self, request_type: &'static str) -> &AuthorityStore {
887 self.metrics.record_cache_request(request_type, "db");
888 &self.store
889 }
890
891 fn record_db_multi_get(&self, request_type: &'static str, count: usize) -> &AuthorityStore {
892 self.metrics
893 .record_cache_multi_request(request_type, "db", count);
894 &self.store
895 }
896
897 #[instrument(level = "debug", skip_all)]
898 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
899 let tx_digest = *tx_outputs.transaction.digest();
900 trace!(?tx_digest, "writing transaction outputs to cache");
901
902 self.dirty.fastpath_transaction_outputs.remove(&tx_digest);
903
904 let TransactionOutputs {
905 transaction,
906 effects,
907 markers,
908 written,
909 deleted,
910 wrapped,
911 events,
912 unchanged_loaded_runtime_objects,
913 ..
914 } = &*tx_outputs;
915
916 for ObjectKey(id, version) in deleted.iter() {
921 self.write_object_entry(id, *version, ObjectEntry::Deleted);
922 }
923
924 for ObjectKey(id, version) in wrapped.iter() {
925 self.write_object_entry(id, *version, ObjectEntry::Wrapped);
926 }
927
928 for (object_key, marker_value) in markers.iter() {
930 self.write_marker_value(epoch_id, *object_key, *marker_value);
931 }
932
933 for (object_id, object) in written.iter() {
936 if object.is_child_object() {
937 self.write_object_entry(object_id, object.version(), object.clone().into());
938 }
939 }
940 for (object_id, object) in written.iter() {
941 if !object.is_child_object() {
942 self.write_object_entry(object_id, object.version(), object.clone().into());
943 if object.is_package() {
944 debug!("caching package: {:?}", object.compute_object_reference());
945 self.packages
946 .insert(*object_id, PackageObject::new(object.clone()));
947 }
948 }
949 }
950
951 let tx_digest = *transaction.digest();
952 debug!(
953 ?tx_digest,
954 "Writing transaction output objects to cache: {:?}",
955 written
956 .values()
957 .map(|o| (o.id(), o.version()))
958 .collect::<Vec<_>>(),
959 );
960 let effects_digest = effects.digest();
961
962 self.metrics.record_cache_write("transaction_block");
963 self.dirty
964 .pending_transaction_writes
965 .insert(tx_digest, tx_outputs.clone());
966
967 self.metrics.record_cache_write("transaction_effects");
970 self.dirty
971 .transaction_effects
972 .insert(effects_digest, effects.clone());
973
974 self.metrics.record_cache_write("transaction_events");
978 self.dirty
979 .transaction_events
980 .insert(tx_digest, events.clone());
981
982 self.metrics
983 .record_cache_write("unchanged_loaded_runtime_objects");
984 self.dirty
985 .unchanged_loaded_runtime_objects
986 .insert(tx_digest, unchanged_loaded_runtime_objects.clone());
987
988 self.metrics.record_cache_write("executed_effects_digests");
989 self.dirty
990 .executed_effects_digests
991 .insert(tx_digest, effects_digest);
992
993 self.executed_effects_digests_notify_read
994 .notify(&tx_digest, &effects_digest);
995
996 self.metrics
997 .pending_notify_read
998 .set(self.executed_effects_digests_notify_read.num_pending() as i64);
999
1000 let prev = self
1001 .dirty
1002 .total_transaction_inserts
1003 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1004
1005 let pending_count = (prev + 1).saturating_sub(
1006 self.dirty
1007 .total_transaction_commits
1008 .load(std::sync::atomic::Ordering::Relaxed),
1009 );
1010
1011 self.set_backpressure(pending_count);
1012 }
1013
1014 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1015 let _metrics_guard = mysten_metrics::monitored_scope("WritebackCache::build_db_batch");
1016 let mut all_outputs = Vec::with_capacity(digests.len());
1017 for tx in digests {
1018 let Some(outputs) = self
1019 .dirty
1020 .pending_transaction_writes
1021 .get(tx)
1022 .map(|o| o.clone())
1023 else {
1024 warn!("Attempt to commit unknown transaction {:?}", tx);
1030 continue;
1031 };
1032 all_outputs.push(outputs);
1033 }
1034
1035 let batch = self
1036 .store
1037 .build_db_batch(epoch, &all_outputs)
1038 .expect("db error");
1039 (all_outputs, batch)
1040 }
1041
1042 #[instrument(level = "debug", skip_all)]
1044 fn commit_transaction_outputs(
1045 &self,
1046 epoch: EpochId,
1047 (all_outputs, db_batch): Batch,
1048 digests: &[TransactionDigest],
1049 ) {
1050 let _metrics_guard =
1051 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs");
1052 fail_point!("writeback-cache-commit");
1053 trace!(?digests);
1054
1055 db_batch.write().expect("db error");
1059
1060 let _metrics_guard =
1061 mysten_metrics::monitored_scope("WritebackCache::commit_transaction_outputs::flush");
1062 for outputs in all_outputs.iter() {
1063 let tx_digest = outputs.transaction.digest();
1064 assert!(
1065 self.dirty
1066 .pending_transaction_writes
1067 .remove(tx_digest)
1068 .is_some()
1069 );
1070 self.flush_transactions_from_dirty_to_cached(epoch, *tx_digest, outputs);
1071 }
1072
1073 let num_outputs = all_outputs.len() as u64;
1074 let num_commits = self
1075 .dirty
1076 .total_transaction_commits
1077 .fetch_add(num_outputs, std::sync::atomic::Ordering::Relaxed)
1078 + num_outputs;
1079
1080 let pending_count = self
1081 .dirty
1082 .total_transaction_inserts
1083 .load(std::sync::atomic::Ordering::Relaxed)
1084 .saturating_sub(num_commits);
1085
1086 self.set_backpressure(pending_count);
1087 }
1088
1089 fn approximate_pending_transaction_count(&self) -> u64 {
1090 let num_commits = self
1091 .dirty
1092 .total_transaction_commits
1093 .load(std::sync::atomic::Ordering::Relaxed);
1094
1095 self.dirty
1096 .total_transaction_inserts
1097 .load(std::sync::atomic::Ordering::Relaxed)
1098 .saturating_sub(num_commits)
1099 }
1100
1101 fn set_backpressure(&self, pending_count: u64) {
1102 let backpressure = pending_count > self.backpressure_threshold;
1103 let backpressure_changed = self.backpressure_manager.set_backpressure(backpressure);
1104 if backpressure_changed {
1105 self.metrics.backpressure_toggles.inc();
1106 }
1107 self.metrics
1108 .backpressure_status
1109 .set(if backpressure { 1 } else { 0 });
1110 }
1111
1112 fn flush_transactions_from_dirty_to_cached(
1113 &self,
1114 epoch: EpochId,
1115 tx_digest: TransactionDigest,
1116 outputs: &TransactionOutputs,
1117 ) {
1118 let TransactionOutputs {
1121 transaction,
1122 effects,
1123 markers,
1124 written,
1125 deleted,
1126 wrapped,
1127 events,
1128 ..
1129 } = outputs;
1130
1131 let effects_digest = effects.digest();
1132
1133 self.cached
1136 .transactions
1137 .insert(
1138 &tx_digest,
1139 PointCacheItem::Some(transaction.clone()),
1140 Ticket::Write,
1141 )
1142 .ok();
1143 self.cached
1144 .transaction_effects
1145 .insert(
1146 &effects_digest,
1147 PointCacheItem::Some(effects.clone().into()),
1148 Ticket::Write,
1149 )
1150 .ok();
1151 self.cached
1152 .executed_effects_digests
1153 .insert(
1154 &tx_digest,
1155 PointCacheItem::Some(effects_digest),
1156 Ticket::Write,
1157 )
1158 .ok();
1159 self.cached
1160 .transaction_events
1161 .insert(
1162 &tx_digest,
1163 PointCacheItem::Some(events.clone().into()),
1164 Ticket::Write,
1165 )
1166 .ok();
1167
1168 self.dirty
1169 .transaction_effects
1170 .remove(&effects_digest)
1171 .expect("effects must exist");
1172
1173 self.dirty
1174 .transaction_events
1175 .remove(&tx_digest)
1176 .expect("events must exist");
1177
1178 self.dirty
1179 .unchanged_loaded_runtime_objects
1180 .remove(&tx_digest)
1181 .expect("unchanged_loaded_runtime_objects must exist");
1182
1183 self.dirty
1184 .executed_effects_digests
1185 .remove(&tx_digest)
1186 .expect("executed effects must exist");
1187
1188 for (object_key, marker_value) in markers.iter() {
1190 Self::move_version_from_dirty_to_cache(
1191 &self.dirty.markers,
1192 &self.cached.marker_cache,
1193 (epoch, object_key.id()),
1194 object_key.version(),
1195 marker_value,
1196 );
1197 }
1198
1199 for (object_id, object) in written.iter() {
1200 Self::move_version_from_dirty_to_cache(
1201 &self.dirty.objects,
1202 &self.cached.object_cache,
1203 *object_id,
1204 object.version(),
1205 &ObjectEntry::Object(object.clone()),
1206 );
1207 }
1208
1209 for ObjectKey(object_id, version) in deleted.iter() {
1210 Self::move_version_from_dirty_to_cache(
1211 &self.dirty.objects,
1212 &self.cached.object_cache,
1213 *object_id,
1214 *version,
1215 &ObjectEntry::Deleted,
1216 );
1217 }
1218
1219 for ObjectKey(object_id, version) in wrapped.iter() {
1220 Self::move_version_from_dirty_to_cache(
1221 &self.dirty.objects,
1222 &self.cached.object_cache,
1223 *object_id,
1224 *version,
1225 &ObjectEntry::Wrapped,
1226 );
1227 }
1228 }
1229
1230 fn move_version_from_dirty_to_cache<K, V>(
1233 dirty: &DashMap<K, CachedVersionMap<V>>,
1234 cache: &MokaCache<K, Arc<Mutex<CachedVersionMap<V>>>>,
1235 key: K,
1236 version: SequenceNumber,
1237 value: &V,
1238 ) where
1239 K: Eq + std::hash::Hash + Clone + Send + Sync + Copy + 'static,
1240 V: Send + Sync + Clone + Eq + std::fmt::Debug + 'static,
1241 {
1242 static MAX_VERSIONS: usize = 3;
1243
1244 let dirty_entry = dirty.entry(key);
1247 let cache_entry = cache.entry(key).or_default();
1248 let mut cache_map = cache_entry.value().lock();
1249
1250 cache_map.insert(version, value.clone());
1252 cache_map.truncate_to(MAX_VERSIONS);
1254
1255 let DashMapEntry::Occupied(mut occupied_dirty_entry) = dirty_entry else {
1256 panic!("dirty map must exist");
1257 };
1258
1259 let removed = occupied_dirty_entry.get_mut().pop_oldest(&version);
1260
1261 assert_eq!(removed.as_ref(), Some(value), "dirty version must exist");
1262
1263 if occupied_dirty_entry.get().is_empty() {
1265 occupied_dirty_entry.remove();
1266 }
1267 }
1268
1269 fn cache_latest_object_by_id(
1271 &self,
1272 object_id: &ObjectID,
1273 object: LatestObjectCacheEntry,
1274 ticket: Ticket,
1275 ) {
1276 trace!("caching object by id: {:?} {:?}", object_id, object);
1277 if self
1278 .object_by_id_cache
1279 .insert(object_id, object, ticket)
1280 .is_ok()
1281 {
1282 self.metrics.record_cache_write("object_by_id");
1283 } else {
1284 trace!("discarded cache write due to expired ticket");
1285 self.metrics.record_ticket_expiry();
1286 }
1287 }
1288
1289 fn cache_object_not_found(&self, object_id: &ObjectID, ticket: Ticket) {
1290 self.cache_latest_object_by_id(object_id, LatestObjectCacheEntry::NonExistent, ticket);
1291 }
1292
1293 fn clear_state_end_of_epoch_impl(&self, execution_guard: &ExecutionLockWriteGuard<'_>) {
1294 info!("clearing state at end of epoch");
1295
1296 for r in self.dirty.pending_transaction_writes.iter() {
1299 let outputs = r.value();
1300 if !outputs
1301 .transaction
1302 .transaction_data()
1303 .shared_input_objects()
1304 .is_empty()
1305 {
1306 debug_fatal!("transaction must be single writer");
1307 }
1308 info!(
1309 "clearing state for transaction {:?}",
1310 outputs.transaction.digest()
1311 );
1312 for (object_id, object) in outputs.written.iter() {
1313 if object.is_package() {
1314 info!("removing non-finalized package from cache: {:?}", object_id);
1315 self.packages.invalidate(object_id);
1316 }
1317 self.object_by_id_cache.invalidate(object_id);
1318 self.cached.object_cache.invalidate(object_id);
1319 }
1320
1321 for ObjectKey(object_id, _) in outputs.deleted.iter().chain(outputs.wrapped.iter()) {
1322 self.object_by_id_cache.invalidate(object_id);
1323 self.cached.object_cache.invalidate(object_id);
1324 }
1325 }
1326
1327 self.dirty.clear();
1328
1329 info!("clearing old transaction locks");
1330 self.object_locks.clear();
1331 info!("clearing object per epoch marker table");
1332 self.store
1333 .clear_object_per_epoch_marker_table(execution_guard)
1334 .expect("db error");
1335 }
1336
1337 fn bulk_insert_genesis_objects_impl(&self, objects: &[Object]) {
1338 self.store
1339 .bulk_insert_genesis_objects(objects)
1340 .expect("db error");
1341 for obj in objects {
1342 self.cached.object_cache.invalidate(&obj.id());
1343 self.object_by_id_cache.invalidate(&obj.id());
1344 }
1345 }
1346
1347 fn insert_genesis_object_impl(&self, object: Object) {
1348 self.object_by_id_cache.invalidate(&object.id());
1349 self.cached.object_cache.invalidate(&object.id());
1350 self.store.insert_genesis_object(object).expect("db error");
1351 }
1352
1353 pub fn clear_caches_and_assert_empty(&self) {
1354 info!("clearing caches");
1355 self.cached.clear_and_assert_empty();
1356 self.object_by_id_cache.invalidate_all();
1357 assert!(&self.object_by_id_cache.is_empty());
1358 self.packages.invalidate_all();
1359 assert_empty(&self.packages);
1360 }
1361}
1362
1363impl ExecutionCacheAPI for WritebackCache {}
1364
1365impl ExecutionCacheCommit for WritebackCache {
1366 fn build_db_batch(&self, epoch: EpochId, digests: &[TransactionDigest]) -> Batch {
1367 self.build_db_batch(epoch, digests)
1368 }
1369
1370 fn commit_transaction_outputs(
1371 &self,
1372 epoch: EpochId,
1373 batch: Batch,
1374 digests: &[TransactionDigest],
1375 ) {
1376 WritebackCache::commit_transaction_outputs(self, epoch, batch, digests)
1377 }
1378
1379 fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) {
1380 self.store.persist_transaction(tx).expect("db error");
1381 }
1382
1383 fn approximate_pending_transaction_count(&self) -> u64 {
1384 WritebackCache::approximate_pending_transaction_count(self)
1385 }
1386}
1387
1388impl ObjectCacheRead for WritebackCache {
1389 fn get_package_object(&self, package_id: &ObjectID) -> SuiResult<Option<PackageObject>> {
1390 self.metrics
1391 .record_cache_request("package", "package_cache");
1392 if let Some(p) = self.packages.get(package_id) {
1393 if cfg!(debug_assertions) {
1394 let canonical_package = self
1395 .dirty
1396 .objects
1397 .get(package_id)
1398 .and_then(|v| match v.get_highest().map(|v| v.1.clone()) {
1399 Some(ObjectEntry::Object(object)) => Some(object),
1400 _ => None,
1401 })
1402 .or_else(|| self.store.get_object(package_id));
1403
1404 if let Some(canonical_package) = canonical_package {
1405 assert_eq!(
1406 canonical_package.digest(),
1407 p.object().digest(),
1408 "Package object cache is inconsistent for package {:?}",
1409 package_id
1410 );
1411 }
1412 }
1413 self.metrics.record_cache_hit("package", "package_cache");
1414 return Ok(Some(p));
1415 } else {
1416 self.metrics.record_cache_miss("package", "package_cache");
1417 }
1418
1419 if let Some(p) = self.get_object_impl("package", package_id) {
1423 if p.is_package() {
1424 let p = PackageObject::new(p);
1425 tracing::trace!(
1426 "caching package: {:?}",
1427 p.object().compute_object_reference()
1428 );
1429 self.metrics.record_cache_write("package");
1430 self.packages.insert(*package_id, p.clone());
1431 Ok(Some(p))
1432 } else {
1433 Err(SuiErrorKind::UserInputError {
1434 error: UserInputError::MoveObjectAsPackage {
1435 object_id: *package_id,
1436 },
1437 }
1438 .into())
1439 }
1440 } else {
1441 Ok(None)
1442 }
1443 }
1444
1445 fn force_reload_system_packages(&self, _system_package_ids: &[ObjectID]) {
1446 }
1449
1450 fn get_object(&self, id: &ObjectID) -> Option<Object> {
1453 self.get_object_impl("object_latest", id)
1454 }
1455
1456 fn get_object_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> Option<Object> {
1457 match self.get_object_by_key_cache_only(object_id, version) {
1458 CacheResult::Hit(object) => Some(object),
1459 CacheResult::NegativeHit => None,
1460 CacheResult::Miss => self
1461 .record_db_get("object_by_version")
1462 .get_object_by_key(object_id, version),
1463 }
1464 }
1465
1466 fn multi_get_objects_by_key(&self, object_keys: &[ObjectKey]) -> Vec<Option<Object>> {
1467 do_fallback_lookup(
1468 object_keys,
1469 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1470 CacheResult::Hit(maybe_object) => CacheResult::Hit(Some(maybe_object)),
1471 CacheResult::NegativeHit => CacheResult::NegativeHit,
1472 CacheResult::Miss => CacheResult::Miss,
1473 },
1474 |remaining| {
1475 self.record_db_multi_get("object_by_version", remaining.len())
1476 .multi_get_objects_by_key(remaining)
1477 .expect("db error")
1478 },
1479 )
1480 }
1481
1482 fn object_exists_by_key(&self, object_id: &ObjectID, version: SequenceNumber) -> bool {
1483 match self.get_object_by_key_cache_only(object_id, version) {
1484 CacheResult::Hit(_) => true,
1485 CacheResult::NegativeHit => false,
1486 CacheResult::Miss => self
1487 .record_db_get("object_by_version")
1488 .object_exists_by_key(object_id, version)
1489 .expect("db error"),
1490 }
1491 }
1492
1493 fn multi_object_exists_by_key(&self, object_keys: &[ObjectKey]) -> Vec<bool> {
1494 do_fallback_lookup(
1495 object_keys,
1496 |key| match self.get_object_by_key_cache_only(&key.0, key.1) {
1497 CacheResult::Hit(_) => CacheResult::Hit(true),
1498 CacheResult::NegativeHit => CacheResult::Hit(false),
1499 CacheResult::Miss => CacheResult::Miss,
1500 },
1501 |remaining| {
1502 self.record_db_multi_get("object_by_version", remaining.len())
1503 .multi_object_exists_by_key(remaining)
1504 .expect("db error")
1505 },
1506 )
1507 }
1508
1509 fn get_latest_object_ref_or_tombstone(&self, object_id: ObjectID) -> Option<ObjectRef> {
1510 match self.get_object_entry_by_id_cache_only("latest_objref_or_tombstone", &object_id) {
1511 CacheResult::Hit((version, entry)) => Some(match entry {
1512 ObjectEntry::Object(object) => object.compute_object_reference(),
1513 ObjectEntry::Deleted => (object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED),
1514 ObjectEntry::Wrapped => (object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED),
1515 }),
1516 CacheResult::NegativeHit => None,
1517 CacheResult::Miss => self
1518 .record_db_get("latest_objref_or_tombstone")
1519 .get_latest_object_ref_or_tombstone(object_id)
1520 .expect("db error"),
1521 }
1522 }
1523
1524 fn get_latest_object_or_tombstone(
1525 &self,
1526 object_id: ObjectID,
1527 ) -> Option<(ObjectKey, ObjectOrTombstone)> {
1528 match self.get_object_entry_by_id_cache_only("latest_object_or_tombstone", &object_id) {
1529 CacheResult::Hit((version, entry)) => {
1530 let key = ObjectKey(object_id, version);
1531 Some(match entry {
1532 ObjectEntry::Object(object) => (key, object.into()),
1533 ObjectEntry::Deleted => (
1534 key,
1535 ObjectOrTombstone::Tombstone((
1536 object_id,
1537 version,
1538 ObjectDigest::OBJECT_DIGEST_DELETED,
1539 )),
1540 ),
1541 ObjectEntry::Wrapped => (
1542 key,
1543 ObjectOrTombstone::Tombstone((
1544 object_id,
1545 version,
1546 ObjectDigest::OBJECT_DIGEST_WRAPPED,
1547 )),
1548 ),
1549 })
1550 }
1551 CacheResult::NegativeHit => None,
1552 CacheResult::Miss => self
1553 .record_db_get("latest_object_or_tombstone")
1554 .get_latest_object_or_tombstone(object_id)
1555 .expect("db error"),
1556 }
1557 }
1558
1559 fn multi_input_objects_available_cache_only(&self, keys: &[InputKey]) -> Vec<bool> {
1560 keys.iter()
1561 .map(|key| {
1562 if key.is_cancelled() {
1563 true
1564 } else {
1565 match key {
1566 InputKey::VersionedObject { id, version } => {
1567 matches!(
1568 self.get_object_by_key_cache_only(&id.id(), *version),
1569 CacheResult::Hit(_)
1570 )
1571 }
1572 InputKey::Package { id } => self.packages.contains_key(id),
1573 }
1574 }
1575 })
1576 .collect()
1577 }
1578
1579 #[instrument(level = "trace", skip_all, fields(object_id, version_bound))]
1580 fn find_object_lt_or_eq_version(
1581 &self,
1582 object_id: ObjectID,
1583 version_bound: SequenceNumber,
1584 ) -> Option<Object> {
1585 macro_rules! check_cache_entry {
1586 ($level: expr, $objects: expr) => {
1587 self.metrics
1588 .record_cache_request("object_lt_or_eq_version", $level);
1589 if let Some(objects) = $objects {
1590 if let Some((_, object)) = objects
1591 .all_versions_lt_or_eq_descending(&version_bound)
1592 .next()
1593 {
1594 if let ObjectEntry::Object(object) = object {
1595 self.metrics
1596 .record_cache_hit("object_lt_or_eq_version", $level);
1597 return Some(object.clone());
1598 } else {
1599 self.metrics
1601 .record_cache_negative_hit("object_lt_or_eq_version", $level);
1602 return None;
1603 }
1604 } else {
1605 self.metrics
1606 .record_cache_miss("object_lt_or_eq_version", $level);
1607 }
1608 }
1609 };
1610 }
1611
1612 self.metrics
1614 .record_cache_request("object_lt_or_eq_version", "object_by_id");
1615 let latest_cache_entry = self.object_by_id_cache.get(&object_id);
1616 if let Some(latest) = &latest_cache_entry {
1617 let latest = latest.lock();
1618 match &*latest {
1619 LatestObjectCacheEntry::Object(latest_version, object) => {
1620 if *latest_version <= version_bound {
1621 if let ObjectEntry::Object(object) = object {
1622 self.metrics
1623 .record_cache_hit("object_lt_or_eq_version", "object_by_id");
1624 return Some(object.clone());
1625 } else {
1626 self.metrics.record_cache_negative_hit(
1628 "object_lt_or_eq_version",
1629 "object_by_id",
1630 );
1631 return None;
1632 }
1633 }
1634 }
1636 LatestObjectCacheEntry::NonExistent => {
1638 self.metrics
1639 .record_cache_negative_hit("object_lt_or_eq_version", "object_by_id");
1640 return None;
1641 }
1642 }
1643 }
1644 self.metrics
1645 .record_cache_miss("object_lt_or_eq_version", "object_by_id");
1646
1647 Self::with_locked_cache_entries(
1648 &self.dirty.objects,
1649 &self.cached.object_cache,
1650 &object_id,
1651 |dirty_entry, cached_entry| {
1652 check_cache_entry!("committed", dirty_entry);
1653 check_cache_entry!("uncommitted", cached_entry);
1654
1655 let latest: Option<(SequenceNumber, ObjectEntry)> =
1676 if let Some(dirty_set) = dirty_entry {
1677 dirty_set
1678 .get_highest()
1679 .cloned()
1680 .tap_none(|| panic!("dirty set cannot be empty"))
1681 } else {
1682 self.record_db_get("object_lt_or_eq_version_latest")
1684 .get_latest_object_or_tombstone(object_id)
1685 .expect("db error")
1686 .map(|(ObjectKey(_, version), obj_or_tombstone)| {
1687 (version, ObjectEntry::from(obj_or_tombstone))
1688 })
1689 };
1690
1691 if let Some((obj_version, obj_entry)) = latest {
1692 self.cache_latest_object_by_id(
1700 &object_id,
1701 LatestObjectCacheEntry::Object(obj_version, obj_entry.clone()),
1702 self.object_by_id_cache.get_ticket_for_read(&object_id),
1705 );
1706
1707 if obj_version <= version_bound {
1708 match obj_entry {
1709 ObjectEntry::Object(object) => Some(object),
1710 ObjectEntry::Deleted | ObjectEntry::Wrapped => None,
1711 }
1712 } else {
1713 self.record_db_get("object_lt_or_eq_version_scan")
1717 .find_object_lt_or_eq_version(object_id, version_bound)
1718 .expect("db error")
1719 }
1720
1721 } else if let Some(latest_cache_entry) = latest_cache_entry {
1727 assert!(!latest_cache_entry.lock().is_alive());
1729 None
1730 } else {
1731 let highest = cached_entry.and_then(|c| c.get_highest());
1733 assert!(highest.is_none() || highest.unwrap().1.is_tombstone());
1734 self.cache_object_not_found(
1735 &object_id,
1736 self.object_by_id_cache.get_ticket_for_read(&object_id),
1738 );
1739 None
1740 }
1741 },
1742 )
1743 }
1744
1745 fn get_sui_system_state_object_unsafe(&self) -> SuiResult<SuiSystemState> {
1746 get_sui_system_state(self)
1747 }
1748
1749 fn get_bridge_object_unsafe(&self) -> SuiResult<Bridge> {
1750 get_bridge(self)
1751 }
1752
1753 fn get_marker_value(
1754 &self,
1755 object_key: FullObjectKey,
1756 epoch_id: EpochId,
1757 ) -> Option<MarkerValue> {
1758 match self.get_marker_value_cache_only(object_key, epoch_id) {
1759 CacheResult::Hit(marker) => Some(marker),
1760 CacheResult::NegativeHit => None,
1761 CacheResult::Miss => self
1762 .record_db_get("marker_by_version")
1763 .get_marker_value(object_key, epoch_id)
1764 .expect("db error"),
1765 }
1766 }
1767
1768 fn get_latest_marker(
1769 &self,
1770 object_id: FullObjectID,
1771 epoch_id: EpochId,
1772 ) -> Option<(SequenceNumber, MarkerValue)> {
1773 match self.get_latest_marker_value_cache_only(object_id, epoch_id) {
1774 CacheResult::Hit((v, marker)) => Some((v, marker)),
1775 CacheResult::NegativeHit => {
1776 panic!("cannot have negative hit when getting latest marker")
1777 }
1778 CacheResult::Miss => self
1779 .record_db_get("marker_latest")
1780 .get_latest_marker(object_id, epoch_id)
1781 .expect("db error"),
1782 }
1783 }
1784
1785 fn get_lock(&self, obj_ref: ObjectRef, epoch_store: &AuthorityPerEpochStore) -> SuiLockResult {
1786 let cur_epoch = epoch_store.epoch();
1787 match self.get_object_by_id_cache_only("lock", &obj_ref.0) {
1788 CacheResult::Hit((_, obj)) => {
1789 let actual_objref = obj.compute_object_reference();
1790 if obj_ref != actual_objref {
1791 Ok(ObjectLockStatus::LockedAtDifferentVersion {
1792 locked_ref: actual_objref,
1793 })
1794 } else {
1795 Ok(
1797 match self
1798 .object_locks
1799 .get_transaction_lock(&obj_ref, epoch_store)?
1800 {
1801 Some(tx_digest) => ObjectLockStatus::LockedToTx {
1802 locked_by_tx: LockDetailsDeprecated {
1803 epoch: cur_epoch,
1804 tx_digest,
1805 },
1806 },
1807 None => ObjectLockStatus::Initialized,
1808 },
1809 )
1810 }
1811 }
1812 CacheResult::NegativeHit => {
1813 Err(SuiError::from(UserInputError::ObjectNotFound {
1814 object_id: obj_ref.0,
1815 version: None,
1818 }))
1819 }
1820 CacheResult::Miss => self.record_db_get("lock").get_lock(obj_ref, epoch_store),
1821 }
1822 }
1823
1824 fn _get_live_objref(&self, object_id: ObjectID) -> SuiResult<ObjectRef> {
1825 let obj = self.get_object_impl("live_objref", &object_id).ok_or(
1826 UserInputError::ObjectNotFound {
1827 object_id,
1828 version: None,
1829 },
1830 )?;
1831 Ok(obj.compute_object_reference())
1832 }
1833
1834 fn check_owned_objects_are_live(&self, owned_object_refs: &[ObjectRef]) -> SuiResult {
1835 do_fallback_lookup_fallible(
1836 owned_object_refs,
1837 |obj_ref| match self.get_object_by_id_cache_only("object_is_live", &obj_ref.0) {
1838 CacheResult::Hit((version, obj)) => {
1839 if obj.compute_object_reference() != *obj_ref {
1840 Err(UserInputError::ObjectVersionUnavailableForConsumption {
1841 provided_obj_ref: *obj_ref,
1842 current_version: version,
1843 }
1844 .into())
1845 } else {
1846 Ok(CacheResult::Hit(()))
1847 }
1848 }
1849 CacheResult::NegativeHit => Err(UserInputError::ObjectNotFound {
1850 object_id: obj_ref.0,
1851 version: None,
1852 }
1853 .into()),
1854 CacheResult::Miss => Ok(CacheResult::Miss),
1855 },
1856 |remaining| {
1857 self.record_db_multi_get("object_is_live", remaining.len())
1858 .check_owned_objects_are_live(remaining)?;
1859 Ok(vec![(); remaining.len()])
1860 },
1861 )?;
1862 Ok(())
1863 }
1864
1865 fn get_highest_pruned_checkpoint(&self) -> Option<CheckpointSequenceNumber> {
1866 self.store
1867 .perpetual_tables
1868 .get_highest_pruned_checkpoint()
1869 .expect("db error")
1870 }
1871
1872 fn notify_read_input_objects<'a>(
1873 &'a self,
1874 input_and_receiving_keys: &'a [InputKey],
1875 receiving_keys: &'a HashSet<InputKey>,
1876 epoch: EpochId,
1877 ) -> BoxFuture<'a, ()> {
1878 self.object_notify_read
1879 .read(
1880 "notify_read_input_objects",
1881 input_and_receiving_keys,
1882 move |keys| {
1883 self.multi_input_objects_available(keys, receiving_keys, epoch)
1884 .into_iter()
1885 .map(|available| if available { Some(()) } else { None })
1886 .collect::<Vec<_>>()
1887 },
1888 )
1889 .map(|_| ())
1890 .boxed()
1891 }
1892}
1893
1894impl TransactionCacheRead for WritebackCache {
1895 fn multi_get_transaction_blocks(
1896 &self,
1897 digests: &[TransactionDigest],
1898 ) -> Vec<Option<Arc<VerifiedTransaction>>> {
1899 let digests_and_tickets: Vec<_> = digests
1900 .iter()
1901 .map(|d| (*d, self.cached.transactions.get_ticket_for_read(d)))
1902 .collect();
1903 do_fallback_lookup(
1904 &digests_and_tickets,
1905 |(digest, _)| {
1906 self.metrics
1907 .record_cache_request("transaction_block", "uncommitted");
1908 if let Some(tx) = self.dirty.pending_transaction_writes.get(digest) {
1909 self.metrics
1910 .record_cache_hit("transaction_block", "uncommitted");
1911 return CacheResult::Hit(Some(tx.transaction.clone()));
1912 }
1913 self.metrics
1914 .record_cache_miss("transaction_block", "uncommitted");
1915
1916 self.metrics
1917 .record_cache_request("transaction_block", "committed");
1918
1919 match self
1920 .cached
1921 .transactions
1922 .get(digest)
1923 .map(|l| l.lock().clone())
1924 {
1925 Some(PointCacheItem::Some(tx)) => {
1926 self.metrics
1927 .record_cache_hit("transaction_block", "committed");
1928 CacheResult::Hit(Some(tx))
1929 }
1930 Some(PointCacheItem::None) => CacheResult::NegativeHit,
1931 None => {
1932 self.metrics
1933 .record_cache_miss("transaction_block", "committed");
1934
1935 CacheResult::Miss
1936 }
1937 }
1938 },
1939 |remaining| {
1940 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
1941 let results: Vec<_> = self
1942 .record_db_multi_get("transaction_block", remaining.len())
1943 .multi_get_transaction_blocks(&remaining_digests)
1944 .expect("db error")
1945 .into_iter()
1946 .map(|o| o.map(Arc::new))
1947 .collect();
1948 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
1949 if result.is_none() {
1950 self.cached.transactions.insert(digest, None, *ticket).ok();
1951 }
1952 }
1953 results
1954 },
1955 )
1956 }
1957
1958 fn multi_get_executed_effects_digests(
1959 &self,
1960 digests: &[TransactionDigest],
1961 ) -> Vec<Option<TransactionEffectsDigest>> {
1962 let digests_and_tickets: Vec<_> = digests
1963 .iter()
1964 .map(|d| {
1965 (
1966 *d,
1967 self.cached.executed_effects_digests.get_ticket_for_read(d),
1968 )
1969 })
1970 .collect();
1971 do_fallback_lookup(
1972 &digests_and_tickets,
1973 |(digest, _)| {
1974 self.metrics
1975 .record_cache_request("executed_effects_digests", "uncommitted");
1976 if let Some(digest) = self.dirty.executed_effects_digests.get(digest) {
1977 self.metrics
1978 .record_cache_hit("executed_effects_digests", "uncommitted");
1979 return CacheResult::Hit(Some(*digest));
1980 }
1981 self.metrics
1982 .record_cache_miss("executed_effects_digests", "uncommitted");
1983
1984 self.metrics
1985 .record_cache_request("executed_effects_digests", "committed");
1986 match self
1987 .cached
1988 .executed_effects_digests
1989 .get(digest)
1990 .map(|l| *l.lock())
1991 {
1992 Some(PointCacheItem::Some(digest)) => {
1993 self.metrics
1994 .record_cache_hit("executed_effects_digests", "committed");
1995 CacheResult::Hit(Some(digest))
1996 }
1997 Some(PointCacheItem::None) => CacheResult::NegativeHit,
1998 None => {
1999 self.metrics
2000 .record_cache_miss("executed_effects_digests", "committed");
2001 CacheResult::Miss
2002 }
2003 }
2004 },
2005 |remaining| {
2006 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2007 let results = self
2008 .record_db_multi_get("executed_effects_digests", remaining.len())
2009 .multi_get_executed_effects_digests(&remaining_digests)
2010 .expect("db error");
2011 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2012 if result.is_none() {
2013 self.cached
2014 .executed_effects_digests
2015 .insert(digest, None, *ticket)
2016 .ok();
2017 }
2018 }
2019 results
2020 },
2021 )
2022 }
2023
2024 fn multi_get_effects(
2025 &self,
2026 digests: &[TransactionEffectsDigest],
2027 ) -> Vec<Option<TransactionEffects>> {
2028 let digests_and_tickets: Vec<_> = digests
2029 .iter()
2030 .map(|d| (*d, self.cached.transaction_effects.get_ticket_for_read(d)))
2031 .collect();
2032 do_fallback_lookup(
2033 &digests_and_tickets,
2034 |(digest, _)| {
2035 self.metrics
2036 .record_cache_request("transaction_effects", "uncommitted");
2037 if let Some(effects) = self.dirty.transaction_effects.get(digest) {
2038 self.metrics
2039 .record_cache_hit("transaction_effects", "uncommitted");
2040 return CacheResult::Hit(Some(effects.clone()));
2041 }
2042 self.metrics
2043 .record_cache_miss("transaction_effects", "uncommitted");
2044
2045 self.metrics
2046 .record_cache_request("transaction_effects", "committed");
2047 match self
2048 .cached
2049 .transaction_effects
2050 .get(digest)
2051 .map(|l| l.lock().clone())
2052 {
2053 Some(PointCacheItem::Some(effects)) => {
2054 self.metrics
2055 .record_cache_hit("transaction_effects", "committed");
2056 CacheResult::Hit(Some((*effects).clone()))
2057 }
2058 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2059 None => {
2060 self.metrics
2061 .record_cache_miss("transaction_effects", "committed");
2062 CacheResult::Miss
2063 }
2064 }
2065 },
2066 |remaining| {
2067 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2068 let results = self
2069 .record_db_multi_get("transaction_effects", remaining.len())
2070 .multi_get_effects(remaining_digests.iter())
2071 .expect("db error");
2072 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2073 if result.is_none() {
2074 self.cached
2075 .transaction_effects
2076 .insert(digest, None, *ticket)
2077 .ok();
2078 }
2079 }
2080 results
2081 },
2082 )
2083 }
2084
2085 fn notify_read_executed_effects_digests<'a>(
2086 &'a self,
2087 task_name: &'static str,
2088 digests: &'a [TransactionDigest],
2089 ) -> BoxFuture<'a, Vec<TransactionEffectsDigest>> {
2090 self.executed_effects_digests_notify_read
2091 .read(task_name, digests, |digests| {
2092 self.multi_get_executed_effects_digests(digests)
2093 })
2094 .boxed()
2095 }
2096
2097 fn multi_get_events(
2098 &self,
2099 event_digests: &[TransactionDigest],
2100 ) -> Vec<Option<TransactionEvents>> {
2101 fn map_events(events: TransactionEvents) -> Option<TransactionEvents> {
2102 if events.data.is_empty() {
2103 None
2104 } else {
2105 Some(events)
2106 }
2107 }
2108
2109 let digests_and_tickets: Vec<_> = event_digests
2110 .iter()
2111 .map(|d| (*d, self.cached.transaction_events.get_ticket_for_read(d)))
2112 .collect();
2113 do_fallback_lookup(
2114 &digests_and_tickets,
2115 |(digest, _)| {
2116 self.metrics
2117 .record_cache_request("transaction_events", "uncommitted");
2118 if let Some(events) = self.dirty.transaction_events.get(digest).map(|e| e.clone()) {
2119 self.metrics
2120 .record_cache_hit("transaction_events", "uncommitted");
2121
2122 return CacheResult::Hit(map_events(events));
2123 }
2124 self.metrics
2125 .record_cache_miss("transaction_events", "uncommitted");
2126
2127 self.metrics
2128 .record_cache_request("transaction_events", "committed");
2129 match self
2130 .cached
2131 .transaction_events
2132 .get(digest)
2133 .map(|l| l.lock().clone())
2134 {
2135 Some(PointCacheItem::Some(events)) => {
2136 self.metrics
2137 .record_cache_hit("transaction_events", "committed");
2138 CacheResult::Hit(map_events((*events).clone()))
2139 }
2140 Some(PointCacheItem::None) => CacheResult::NegativeHit,
2141 None => {
2142 self.metrics
2143 .record_cache_miss("transaction_events", "committed");
2144
2145 CacheResult::Miss
2146 }
2147 }
2148 },
2149 |remaining| {
2150 let remaining_digests: Vec<_> = remaining.iter().map(|(d, _)| *d).collect();
2151 let results = self
2152 .store
2153 .multi_get_events(&remaining_digests)
2154 .expect("db error");
2155 for ((digest, ticket), result) in remaining.iter().zip(results.iter()) {
2156 if result.is_none() {
2157 self.cached
2158 .transaction_events
2159 .insert(digest, None, *ticket)
2160 .ok();
2161 }
2162 }
2163 results
2164 },
2165 )
2166 }
2167
2168 fn get_unchanged_loaded_runtime_objects(
2169 &self,
2170 digest: &TransactionDigest,
2171 ) -> Option<Vec<ObjectKey>> {
2172 self.dirty
2173 .unchanged_loaded_runtime_objects
2174 .get(digest)
2175 .map(|b| b.clone())
2176 .or_else(|| {
2177 self.store
2178 .get_unchanged_loaded_runtime_objects(digest)
2179 .expect("db error")
2180 })
2181 }
2182
2183 fn get_mysticeti_fastpath_outputs(
2184 &self,
2185 tx_digest: &TransactionDigest,
2186 ) -> Option<Arc<TransactionOutputs>> {
2187 self.dirty.fastpath_transaction_outputs.get(tx_digest)
2188 }
2189
2190 fn notify_read_fastpath_transaction_outputs<'a>(
2191 &'a self,
2192 tx_digests: &'a [TransactionDigest],
2193 ) -> BoxFuture<'a, Vec<Arc<TransactionOutputs>>> {
2194 self.fastpath_transaction_outputs_notify_read
2195 .read(
2196 "notify_read_fastpath_transaction_outputs",
2197 tx_digests,
2198 |tx_digests| {
2199 tx_digests
2200 .iter()
2201 .map(|tx_digest| self.get_mysticeti_fastpath_outputs(tx_digest))
2202 .collect()
2203 },
2204 )
2205 .boxed()
2206 }
2207
2208 fn take_accumulator_events(&self, digest: &TransactionDigest) -> Option<Vec<AccumulatorEvent>> {
2209 self.dirty
2210 .pending_transaction_writes
2211 .get(digest)
2212 .map(|transaction_output| transaction_output.take_accumulator_events())
2213 }
2214}
2215
2216impl ExecutionCacheWrite for WritebackCache {
2217 fn acquire_transaction_locks(
2218 &self,
2219 epoch_store: &AuthorityPerEpochStore,
2220 owned_input_objects: &[ObjectRef],
2221 tx_digest: TransactionDigest,
2222 signed_transaction: Option<VerifiedSignedTransaction>,
2223 ) -> SuiResult {
2224 self.object_locks.acquire_transaction_locks(
2225 self,
2226 epoch_store,
2227 owned_input_objects,
2228 tx_digest,
2229 signed_transaction,
2230 )
2231 }
2232
2233 fn write_transaction_outputs(&self, epoch_id: EpochId, tx_outputs: Arc<TransactionOutputs>) {
2234 WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs);
2235 }
2236
2237 fn write_fastpath_transaction_outputs(&self, tx_outputs: Arc<TransactionOutputs>) {
2238 let tx_digest = *tx_outputs.transaction.digest();
2239 debug!(
2240 ?tx_digest,
2241 "writing mysticeti fastpath certified transaction outputs"
2242 );
2243 self.dirty
2244 .fastpath_transaction_outputs
2245 .insert(tx_digest, tx_outputs.clone());
2246 self.fastpath_transaction_outputs_notify_read
2247 .notify(&tx_digest, &tx_outputs);
2248 }
2249
2250 #[cfg(test)]
2251 fn write_object_entry_for_test(&self, object: Object) {
2252 self.write_object_entry(&object.id(), object.version(), object.into());
2253 }
2254}
2255
2256implement_passthrough_traits!(WritebackCache);
2257
2258impl GlobalStateHashStore for WritebackCache {
2259 fn get_object_ref_prior_to_key_deprecated(
2260 &self,
2261 object_id: &ObjectID,
2262 version: SequenceNumber,
2263 ) -> SuiResult<Option<ObjectRef>> {
2264 let mut candidates = Vec::new();
2268
2269 let check_versions =
2270 |versions: &CachedVersionMap<ObjectEntry>| match versions.get_prior_to(&version) {
2271 Some((version, object_entry)) => match object_entry {
2272 ObjectEntry::Object(object) => {
2273 assert_eq!(object.version(), version);
2274 Some(object.compute_object_reference())
2275 }
2276 ObjectEntry::Deleted => {
2277 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_DELETED))
2278 }
2279 ObjectEntry::Wrapped => {
2280 Some((*object_id, version, ObjectDigest::OBJECT_DIGEST_WRAPPED))
2281 }
2282 },
2283 None => None,
2284 };
2285
2286 if let Some(objects) = self.dirty.objects.get(object_id)
2288 && let Some(prior) = check_versions(&objects)
2289 {
2290 candidates.push(prior);
2291 }
2292
2293 if let Some(objects) = self.cached.object_cache.get(object_id)
2294 && let Some(prior) = check_versions(&objects.lock())
2295 {
2296 candidates.push(prior);
2297 }
2298
2299 if let Some(prior) = self
2300 .store
2301 .get_object_ref_prior_to_key_deprecated(object_id, version)?
2302 {
2303 candidates.push(prior);
2304 }
2305
2306 candidates.sort_by_key(|(_, version, _)| *version);
2308 Ok(candidates.pop())
2309 }
2310
2311 fn get_root_state_hash_for_epoch(
2312 &self,
2313 epoch: EpochId,
2314 ) -> SuiResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
2315 self.store.get_root_state_hash_for_epoch(epoch)
2316 }
2317
2318 fn get_root_state_hash_for_highest_epoch(
2319 &self,
2320 ) -> SuiResult<Option<(EpochId, (CheckpointSequenceNumber, GlobalStateHash))>> {
2321 self.store.get_root_state_hash_for_highest_epoch()
2322 }
2323
2324 fn insert_state_hash_for_epoch(
2325 &self,
2326 epoch: EpochId,
2327 checkpoint_seq_num: &CheckpointSequenceNumber,
2328 acc: &GlobalStateHash,
2329 ) -> SuiResult {
2330 self.store
2331 .insert_state_hash_for_epoch(epoch, checkpoint_seq_num, acc)
2332 }
2333
2334 fn iter_live_object_set(
2335 &self,
2336 include_wrapped_tombstone: bool,
2337 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2338 assert!(
2342 self.dirty.is_empty(),
2343 "cannot iterate live object set with dirty data"
2344 );
2345 self.store.iter_live_object_set(include_wrapped_tombstone)
2346 }
2347
2348 fn iter_cached_live_object_set_for_testing(
2352 &self,
2353 include_wrapped_tombstone: bool,
2354 ) -> Box<dyn Iterator<Item = LiveObject> + '_> {
2355 let iter = self.dirty.objects.iter();
2357 let mut dirty_objects = BTreeMap::new();
2358
2359 for obj in self.store.iter_live_object_set(include_wrapped_tombstone) {
2361 dirty_objects.insert(obj.object_id(), obj);
2362 }
2363
2364 for entry in iter {
2366 let id = *entry.key();
2367 let value = entry.value();
2368 match value.get_highest().unwrap() {
2369 (_, ObjectEntry::Object(object)) => {
2370 dirty_objects.insert(id, LiveObject::Normal(object.clone()));
2371 }
2372 (version, ObjectEntry::Wrapped) => {
2373 if include_wrapped_tombstone {
2374 dirty_objects.insert(id, LiveObject::Wrapped(ObjectKey(id, *version)));
2375 } else {
2376 dirty_objects.remove(&id);
2377 }
2378 }
2379 (_, ObjectEntry::Deleted) => {
2380 dirty_objects.remove(&id);
2381 }
2382 }
2383 }
2384
2385 Box::new(dirty_objects.into_values())
2386 }
2387}
2388
2389impl StateSyncAPI for WritebackCache {
2395 fn insert_transaction_and_effects(
2396 &self,
2397 transaction: &VerifiedTransaction,
2398 transaction_effects: &TransactionEffects,
2399 ) {
2400 self.store
2401 .insert_transaction_and_effects(transaction, transaction_effects)
2402 .expect("db error");
2403 self.cached
2404 .transactions
2405 .insert(
2406 transaction.digest(),
2407 PointCacheItem::Some(Arc::new(transaction.clone())),
2408 Ticket::Write,
2409 )
2410 .ok();
2411 self.cached
2412 .transaction_effects
2413 .insert(
2414 &transaction_effects.digest(),
2415 PointCacheItem::Some(Arc::new(transaction_effects.clone())),
2416 Ticket::Write,
2417 )
2418 .ok();
2419 }
2420
2421 fn multi_insert_transaction_and_effects(
2422 &self,
2423 transactions_and_effects: &[VerifiedExecutionData],
2424 ) {
2425 self.store
2426 .multi_insert_transaction_and_effects(transactions_and_effects.iter())
2427 .expect("db error");
2428 for VerifiedExecutionData {
2429 transaction,
2430 effects,
2431 } in transactions_and_effects
2432 {
2433 self.cached
2434 .transactions
2435 .insert(
2436 transaction.digest(),
2437 PointCacheItem::Some(Arc::new(transaction.clone())),
2438 Ticket::Write,
2439 )
2440 .ok();
2441 self.cached
2442 .transaction_effects
2443 .insert(
2444 &effects.digest(),
2445 PointCacheItem::Some(Arc::new(effects.clone())),
2446 Ticket::Write,
2447 )
2448 .ok();
2449 }
2450 }
2451}