sui_core/execution_cache/
object_locks.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
5use dashmap::DashMap;
6use dashmap::mapref::entry::Entry as DashMapEntry;
7use mysten_common::*;
8use sui_types::base_types::{ObjectID, ObjectRef};
9use sui_types::digests::TransactionDigest;
10use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
11use sui_types::object::Object;
12use sui_types::storage::ObjectStore;
13use sui_types::transaction::VerifiedSignedTransaction;
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17
18type RefCount = usize;
19
20pub(super) struct ObjectLocks {
21    // When acquire transaction locks, lock entries are briefly inserted into this map. The map
22    // exists to provide atomic test-and-set operations on the locks. After all locks have been inserted
23    // into the map, they are written to the db, and then all locks are removed from the map.
24    //
25    // After a transaction has been executed, newly created objects are available to be locked.
26    // But, because of crash recovery, we cannot rule out that a lock may already exist in the db for
27    // those objects. Therefore we do a db read for each object we are locking.
28    //
29    // TODO: find a strategy to allow us to avoid db reads for each object.
30    locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
31}
32
33impl ObjectLocks {
34    pub fn new() -> Self {
35        Self {
36            locked_transactions: DashMap::new(),
37        }
38    }
39
40    pub(crate) fn get_transaction_lock(
41        &self,
42        obj_ref: &ObjectRef,
43        epoch_store: &AuthorityPerEpochStore,
44    ) -> SuiResult<Option<LockDetails>> {
45        // We don't consult the in-memory state here. We are only interested in state that
46        // has been committed to the db. This is because in memory state is reverted
47        // if the transaction is not successfully locked.
48        epoch_store.tables()?.get_locked_transaction(obj_ref)
49    }
50
51    /// Attempts to atomically test-and-set a transaction lock on an object.
52    /// If the lock is already set to a conflicting transaction, an error is returned.
53    /// If the lock is not set, or is already set to the same transaction, the lock is
54    /// set.
55    pub(crate) fn try_set_transaction_lock(
56        &self,
57        obj_ref: &ObjectRef,
58        new_lock: LockDetails,
59        epoch_store: &AuthorityPerEpochStore,
60    ) -> SuiResult {
61        // entry holds a lock on the dashmap shard, so this function operates atomicly
62        let entry = self.locked_transactions.entry(*obj_ref);
63
64        // TODO: currently, the common case for this code is that we will miss the cache
65        // and read from the db. It is difficult to implement negative caching, since we
66        // may have restarted, in which case there could be locks in the db that we do
67        // not have in the cache. We may want to explore strategies for proving there
68        // cannot be a lock in the db that we do not know about. Two possibilities are:
69        //
70        // 1. Read all locks into memory at startup (and keep them there). The lifetime
71        //    of locks is relatively short in the common case, so this might be feasible.
72        // 2. Find some strategy to distinguish between the cases where we are re-executing
73        //    old transactions after restarting vs executing transactions that we have never
74        //    seen before. The output objects of novel transactions cannot previously have
75        //    been locked on this validator.
76        //
77        // Solving this is not terribly important as it is not in the execution path, and
78        // hence only improves the latency of transaction signing, not transaction execution
79        let prev_lock = match entry {
80            DashMapEntry::Vacant(vacant) => {
81                let tables = epoch_store.tables()?;
82                if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
83                    trace!("read lock from db: {:?}", lock_details);
84                    vacant.insert((1, lock_details));
85                    lock_details
86                } else {
87                    trace!("set lock: {:?}", new_lock);
88                    vacant.insert((1, new_lock));
89                    new_lock
90                }
91            }
92            DashMapEntry::Occupied(mut occupied) => {
93                occupied.get_mut().0 += 1;
94                occupied.get().1
95            }
96        };
97
98        if prev_lock != new_lock {
99            debug!(
100                "lock conflict detected for {:?}: {:?} != {:?}",
101                obj_ref, prev_lock, new_lock
102            );
103            Err(SuiErrorKind::ObjectLockConflict {
104                obj_ref: *obj_ref,
105                pending_transaction: prev_lock,
106            }
107            .into())
108        } else {
109            Ok(())
110        }
111    }
112
113    pub(crate) fn clear(&self) {
114        info!("clearing old transaction locks");
115        self.locked_transactions.clear();
116    }
117
118    fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> SuiResult {
119        debug_assert_eq!(obj_ref.0, live_object.id());
120        if obj_ref.1 != live_object.version() {
121            debug!(
122                "object version unavailable for consumption: {:?} (current: {})",
123                obj_ref,
124                live_object.version()
125            );
126            return Err(SuiErrorKind::UserInputError {
127                error: UserInputError::ObjectVersionUnavailableForConsumption {
128                    provided_obj_ref: *obj_ref,
129                    current_version: live_object.version(),
130                },
131            }
132            .into());
133        }
134
135        let live_digest = live_object.digest();
136        if obj_ref.2 != live_digest {
137            return Err(SuiErrorKind::UserInputError {
138                error: UserInputError::InvalidObjectDigest {
139                    object_id: obj_ref.0,
140                    expected_digest: live_digest,
141                },
142            }
143            .into());
144        }
145
146        Ok(())
147    }
148
149    pub(crate) fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
150        for (obj_ref, lock) in locks {
151            let entry = self.locked_transactions.entry(*obj_ref);
152            let mut occupied = match entry {
153                DashMapEntry::Vacant(_) => {
154                    debug_fatal!("lock must exist for object: {:?}", obj_ref);
155                    continue;
156                }
157                DashMapEntry::Occupied(occupied) => occupied,
158            };
159
160            if occupied.get().1 == *lock {
161                occupied.get_mut().0 -= 1;
162                if occupied.get().0 == 0 {
163                    trace!("clearing lock: {:?}", lock);
164                    occupied.remove();
165                }
166            } else {
167                // this is impossible because the only case in which we overwrite a
168                // lock is when the lock is from a previous epoch. but we are holding
169                // execution_lock, so the epoch cannot have changed.
170                panic!("lock was changed since we set it");
171            }
172        }
173    }
174
175    fn multi_get_objects_must_exist(
176        cache: &WritebackCache,
177        object_ids: &[ObjectID],
178    ) -> SuiResult<Vec<Object>> {
179        let objects = cache.multi_get_objects(object_ids);
180        let mut result = Vec::with_capacity(objects.len());
181        for (i, object) in objects.into_iter().enumerate() {
182            if let Some(object) = object {
183                result.push(object);
184            } else {
185                return Err(SuiErrorKind::UserInputError {
186                    error: UserInputError::ObjectNotFound {
187                        object_id: object_ids[i],
188                        version: None,
189                    },
190                }
191                .into());
192            }
193        }
194        Ok(result)
195    }
196
197    #[instrument(level = "debug", skip_all)]
198    pub(crate) fn acquire_transaction_locks(
199        &self,
200        cache: &WritebackCache,
201        epoch_store: &AuthorityPerEpochStore,
202        owned_input_objects: &[ObjectRef],
203        tx_digest: TransactionDigest,
204        signed_transaction: Option<VerifiedSignedTransaction>,
205    ) -> SuiResult {
206        // First validate all object versions match live objects
207        Self::validate_owned_object_versions(cache, owned_input_objects)?;
208
209        let mut locks_to_write: Vec<(_, LockDetails)> =
210            Vec::with_capacity(owned_input_objects.len());
211
212        // Sort the objects before locking. This is not required by the protocol (since it's okay to
213        // reject any equivocating tx). However, this does prevent a confusing error on the client.
214        // Consider the case:
215        //   TX1: [o1, o2];
216        //   TX2: [o2, o1];
217        // If two threads race to acquire these locks, they might both acquire the first object, then
218        // error when trying to acquire the second. The error returned to the client would say that there
219        // is a conflicting tx on that object, but in fact neither object was locked and the tx was never
220        // signed. If one client then retries, they will succeed (counterintuitively).
221        let owned_input_objects = {
222            let mut o = owned_input_objects.to_vec();
223            o.sort_by_key(|o| o.0);
224            o
225        };
226
227        // Note that this function does not have to operate atomically. If there are two racing threads,
228        // then they are either trying to lock the same transaction (in which case both will succeed),
229        // or they are trying to lock the same object in two different transactions, in which case
230        // the sender has equivocated, and we are under no obligation to help them form a cert.
231        for obj_ref in owned_input_objects.iter() {
232            match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
233                Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
234                Err(e) => {
235                    // revert all pending writes and return error
236                    // Note that reverting is not required for liveness, since a well formed and un-equivocating
237                    // txn cannot fail to acquire locks.
238                    // However, reverting is easy enough to do in this implementation that we do it anyway.
239                    self.clear_cached_locks(&locks_to_write);
240                    return Err(e);
241                }
242            }
243        }
244
245        // commit all writes to DB
246        epoch_store
247            .tables()?
248            .write_transaction_locks(signed_transaction, locks_to_write.iter().cloned())?;
249
250        // remove pending locks from unbounded storage
251        self.clear_cached_locks(&locks_to_write);
252
253        Ok(())
254    }
255
256    /// Validates owned object versions and digests without acquiring locks.
257    /// Used when preconsensus locking is disabled to validate objects before signing,
258    /// since actual locking happens post-consensus in that mode.
259    #[instrument(level = "debug", skip_all)]
260    pub(crate) fn validate_owned_object_versions(
261        cache: &WritebackCache,
262        owned_input_objects: &[ObjectRef],
263    ) -> SuiResult {
264        let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
265        let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
266
267        // Validate that all objects are live and versions/digests match
268        for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
269            Self::verify_live_object(obj_ref, live_object)?;
270        }
271
272        Ok(())
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use crate::execution_cache::{
279        ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
280    };
281
282    #[tokio::test]
283    async fn test_transaction_locks_are_exclusive() {
284        telemetry_subscribers::init_for_testing();
285        Scenario::iterate(|mut s| async move {
286            s.with_created(&[1, 2, 3]);
287            s.do_tx().await;
288
289            s.with_mutated(&[1, 2, 3]);
290            s.do_tx().await;
291
292            let new1 = s.obj_ref(1);
293            let new2 = s.obj_ref(2);
294            let new3 = s.obj_ref(3);
295
296            s.with_mutated(&[1, 2, 3]); // begin forming a tx but never execute it
297            let outputs = s.take_outputs();
298
299            let tx1 = s.make_signed_transaction(&outputs.transaction);
300
301            s.cache
302                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx1.digest(), Some(tx1))
303                .expect("locks should be available");
304
305            // this tx doesn't use the actual objects in question, but we just need something
306            // to insert into the table.
307            s.with_created(&[4, 5]);
308            let tx2 = s.take_outputs().transaction.clone();
309            let tx2 = s.make_signed_transaction(&tx2);
310
311            // both locks are held by tx1, so this should fail
312            s.cache
313                .acquire_transaction_locks(
314                    &s.epoch_store,
315                    &[new1, new2],
316                    *tx2.digest(),
317                    Some(tx2.clone()),
318                )
319                .unwrap_err();
320
321            // new3 is lockable, but new2 is not, so this should fail
322            s.cache
323                .acquire_transaction_locks(
324                    &s.epoch_store,
325                    &[new3, new2],
326                    *tx2.digest(),
327                    Some(tx2.clone()),
328                )
329                .unwrap_err();
330
331            // new3 is unlocked
332            s.cache
333                .acquire_transaction_locks(
334                    &s.epoch_store,
335                    &[new3],
336                    *tx2.digest(),
337                    Some(tx2.clone()),
338                )
339                .expect("new3 should be unlocked");
340        })
341        .await;
342    }
343
344    #[tokio::test]
345    async fn test_transaction_locks_are_durable() {
346        telemetry_subscribers::init_for_testing();
347        Scenario::iterate(|mut s| async move {
348            s.with_created(&[1, 2]);
349            s.do_tx().await;
350
351            let old2 = s.obj_ref(2);
352
353            s.with_mutated(&[1, 2]);
354            s.do_tx().await;
355
356            let new1 = s.obj_ref(1);
357            let new2 = s.obj_ref(2);
358
359            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
360            let outputs = s.take_outputs();
361
362            let tx = s.make_signed_transaction(&outputs.transaction);
363
364            // fails because we are referring to an old object
365            s.cache
366                .acquire_transaction_locks(
367                    &s.epoch_store,
368                    &[new1, old2],
369                    *tx.digest(),
370                    Some(tx.clone()),
371                )
372                .unwrap_err();
373
374            // succeeds because the above call releases the lock on new1 after failing
375            // to get the lock on old2
376            s.cache
377                .acquire_transaction_locks(
378                    &s.epoch_store,
379                    &[new1, new2],
380                    *tx.digest(),
381                    Some(tx.clone()),
382                )
383                .expect("new1 should be unlocked after revert");
384        })
385        .await;
386    }
387
388    #[tokio::test]
389    async fn test_acquire_transaction_locks_revert() {
390        telemetry_subscribers::init_for_testing();
391        Scenario::iterate(|mut s| async move {
392            s.with_created(&[1, 2]);
393            s.do_tx().await;
394
395            let old2 = s.obj_ref(2);
396
397            s.with_mutated(&[1, 2]);
398            s.do_tx().await;
399
400            let new1 = s.obj_ref(1);
401            let new2 = s.obj_ref(2);
402
403            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
404            let outputs = s.take_outputs();
405
406            let tx = s.make_signed_transaction(&outputs.transaction);
407
408            // fails because we are referring to an old object
409            s.cache
410                .acquire_transaction_locks(
411                    &s.epoch_store,
412                    &[new1, old2],
413                    *tx.digest(),
414                    Some(tx.clone()),
415                )
416                .unwrap_err();
417
418            // this tx doesn't use the actual objects in question, but we just need something
419            // to insert into the table.
420            s.with_created(&[4, 5]);
421            let tx2 = s.take_outputs().transaction.clone();
422            let tx2 = s.make_signed_transaction(&tx2);
423
424            // succeeds because the above call releases the lock on new1 after failing
425            // to get the lock on old2
426            s.cache
427                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx2.digest(), Some(tx2))
428                .expect("new1 should be unlocked after revert");
429        })
430        .await;
431    }
432
433    #[tokio::test]
434    async fn test_acquire_transaction_locks_is_sync() {
435        telemetry_subscribers::init_for_testing();
436        Scenario::iterate(|mut s| async move {
437            s.with_created(&[1, 2]);
438            s.do_tx().await;
439
440            let objects: Vec<_> = vec![s.object(1), s.object(2)]
441                .into_iter()
442                .map(|o| o.compute_object_reference())
443                .collect();
444
445            s.with_mutated(&[1, 2]);
446            let outputs = s.take_outputs();
447
448            let tx2 = s.make_signed_transaction(&outputs.transaction);
449            // assert that acquire_transaction_locks is sync in non-simtest, which causes the
450            // fail_point_async! macros above to be elided
451            s.cache
452                .acquire_transaction_locks(
453                    &s.epoch_store,
454                    &objects,
455                    *tx2.digest(),
456                    Some(tx2.clone()),
457                )
458                .unwrap();
459        })
460        .await;
461    }
462}