sui_core/execution_cache/
object_locks.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
5use dashmap::DashMap;
6use dashmap::mapref::entry::Entry as DashMapEntry;
7use mysten_common::*;
8use sui_types::base_types::{ObjectID, ObjectRef};
9use sui_types::digests::TransactionDigest;
10use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
11use sui_types::object::Object;
12use sui_types::storage::ObjectStore;
13use sui_types::transaction::VerifiedSignedTransaction;
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17
18type RefCount = usize;
19
20pub(super) struct ObjectLocks {
21    // When acquire transaction locks, lock entries are briefly inserted into this map. The map
22    // exists to provide atomic test-and-set operations on the locks. After all locks have been inserted
23    // into the map, they are written to the db, and then all locks are removed from the map.
24    //
25    // After a transaction has been executed, newly created objects are available to be locked.
26    // But, because of crash recovery, we cannot rule out that a lock may already exist in the db for
27    // those objects. Therefore we do a db read for each object we are locking.
28    //
29    // TODO: find a strategy to allow us to avoid db reads for each object.
30    locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
31}
32
33impl ObjectLocks {
34    pub fn new() -> Self {
35        Self {
36            locked_transactions: DashMap::new(),
37        }
38    }
39
40    pub(crate) fn get_transaction_lock(
41        &self,
42        obj_ref: &ObjectRef,
43        epoch_store: &AuthorityPerEpochStore,
44    ) -> SuiResult<Option<LockDetails>> {
45        // We don't consult the in-memory state here. We are only interested in state that
46        // has been committed to the db. This is because in memory state is reverted
47        // if the transaction is not successfully locked.
48        epoch_store.tables()?.get_locked_transaction(obj_ref)
49    }
50
51    /// Attempts to atomically test-and-set a transaction lock on an object.
52    /// If the lock is already set to a conflicting transaction, an error is returned.
53    /// If the lock is not set, or is already set to the same transaction, the lock is
54    /// set.
55    pub(crate) fn try_set_transaction_lock(
56        &self,
57        obj_ref: &ObjectRef,
58        new_lock: LockDetails,
59        epoch_store: &AuthorityPerEpochStore,
60    ) -> SuiResult {
61        // entry holds a lock on the dashmap shard, so this function operates atomicly
62        let entry = self.locked_transactions.entry(*obj_ref);
63
64        // TODO: currently, the common case for this code is that we will miss the cache
65        // and read from the db. It is difficult to implement negative caching, since we
66        // may have restarted, in which case there could be locks in the db that we do
67        // not have in the cache. We may want to explore strategies for proving there
68        // cannot be a lock in the db that we do not know about. Two possibilities are:
69        //
70        // 1. Read all locks into memory at startup (and keep them there). The lifetime
71        //    of locks is relatively short in the common case, so this might be feasible.
72        // 2. Find some strategy to distinguish between the cases where we are re-executing
73        //    old transactions after restarting vs executing transactions that we have never
74        //    seen before. The output objects of novel transactions cannot previously have
75        //    been locked on this validator.
76        //
77        // Solving this is not terribly important as it is not in the execution path, and
78        // hence only improves the latency of transaction signing, not transaction execution
79        let prev_lock = match entry {
80            DashMapEntry::Vacant(vacant) => {
81                let tables = epoch_store.tables()?;
82                if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
83                    trace!("read lock from db: {:?}", lock_details);
84                    vacant.insert((1, lock_details));
85                    lock_details
86                } else {
87                    trace!("set lock: {:?}", new_lock);
88                    vacant.insert((1, new_lock));
89                    new_lock
90                }
91            }
92            DashMapEntry::Occupied(mut occupied) => {
93                occupied.get_mut().0 += 1;
94                occupied.get().1
95            }
96        };
97
98        if prev_lock != new_lock {
99            debug!(
100                "lock conflict detected for {:?}: {:?} != {:?}",
101                obj_ref, prev_lock, new_lock
102            );
103            Err(SuiErrorKind::ObjectLockConflict {
104                obj_ref: *obj_ref,
105                pending_transaction: prev_lock,
106            }
107            .into())
108        } else {
109            Ok(())
110        }
111    }
112
113    pub(crate) fn clear(&self) {
114        info!("clearing old transaction locks");
115        self.locked_transactions.clear();
116    }
117
118    fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> SuiResult {
119        debug_assert_eq!(obj_ref.0, live_object.id());
120        if obj_ref.1 != live_object.version() {
121            debug!(
122                "object version unavailable for consumption: {:?} (current: {})",
123                obj_ref,
124                live_object.version()
125            );
126            return Err(SuiErrorKind::UserInputError {
127                error: UserInputError::ObjectVersionUnavailableForConsumption {
128                    provided_obj_ref: *obj_ref,
129                    current_version: live_object.version(),
130                },
131            }
132            .into());
133        }
134
135        let live_digest = live_object.digest();
136        if obj_ref.2 != live_digest {
137            return Err(SuiErrorKind::UserInputError {
138                error: UserInputError::InvalidObjectDigest {
139                    object_id: obj_ref.0,
140                    expected_digest: live_digest,
141                },
142            }
143            .into());
144        }
145
146        Ok(())
147    }
148
149    fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
150        for (obj_ref, lock) in locks {
151            let entry = self.locked_transactions.entry(*obj_ref);
152            let mut occupied = match entry {
153                DashMapEntry::Vacant(_) => {
154                    debug_fatal!("lock must exist for object: {:?}", obj_ref);
155                    continue;
156                }
157                DashMapEntry::Occupied(occupied) => occupied,
158            };
159
160            if occupied.get().1 == *lock {
161                occupied.get_mut().0 -= 1;
162                if occupied.get().0 == 0 {
163                    trace!("clearing lock: {:?}", lock);
164                    occupied.remove();
165                }
166            } else {
167                // this is impossible because the only case in which we overwrite a
168                // lock is when the lock is from a previous epoch. but we are holding
169                // execution_lock, so the epoch cannot have changed.
170                panic!("lock was changed since we set it");
171            }
172        }
173    }
174
175    fn multi_get_objects_must_exist(
176        cache: &WritebackCache,
177        object_ids: &[ObjectID],
178    ) -> SuiResult<Vec<Object>> {
179        let objects = cache.multi_get_objects(object_ids);
180        let mut result = Vec::with_capacity(objects.len());
181        for (i, object) in objects.into_iter().enumerate() {
182            if let Some(object) = object {
183                result.push(object);
184            } else {
185                return Err(SuiErrorKind::UserInputError {
186                    error: UserInputError::ObjectNotFound {
187                        object_id: object_ids[i],
188                        version: None,
189                    },
190                }
191                .into());
192            }
193        }
194        Ok(result)
195    }
196
197    #[instrument(level = "debug", skip_all)]
198    pub(crate) fn acquire_transaction_locks(
199        &self,
200        cache: &WritebackCache,
201        epoch_store: &AuthorityPerEpochStore,
202        owned_input_objects: &[ObjectRef],
203        tx_digest: TransactionDigest,
204        signed_transaction: Option<VerifiedSignedTransaction>,
205    ) -> SuiResult {
206        let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
207        let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
208
209        // Only live objects can be locked
210        for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
211            Self::verify_live_object(obj_ref, live_object)?;
212        }
213
214        let mut locks_to_write: Vec<(_, LockDetails)> =
215            Vec::with_capacity(owned_input_objects.len());
216
217        // Sort the objects before locking. This is not required by the protocol (since it's okay to
218        // reject any equivocating tx). However, this does prevent a confusing error on the client.
219        // Consider the case:
220        //   TX1: [o1, o2];
221        //   TX2: [o2, o1];
222        // If two threads race to acquire these locks, they might both acquire the first object, then
223        // error when trying to acquire the second. The error returned to the client would say that there
224        // is a conflicting tx on that object, but in fact neither object was locked and the tx was never
225        // signed. If one client then retries, they will succeed (counterintuitively).
226        let owned_input_objects = {
227            let mut o = owned_input_objects.to_vec();
228            o.sort_by_key(|o| o.0);
229            o
230        };
231
232        // Note that this function does not have to operate atomically. If there are two racing threads,
233        // then they are either trying to lock the same transaction (in which case both will succeed),
234        // or they are trying to lock the same object in two different transactions, in which case
235        // the sender has equivocated, and we are under no obligation to help them form a cert.
236        for obj_ref in owned_input_objects.iter() {
237            match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
238                Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
239                Err(e) => {
240                    // revert all pending writes and return error
241                    // Note that reverting is not required for liveness, since a well formed and un-equivocating
242                    // txn cannot fail to acquire locks.
243                    // However, reverting is easy enough to do in this implementation that we do it anyway.
244                    self.clear_cached_locks(&locks_to_write);
245                    return Err(e);
246                }
247            }
248        }
249
250        // commit all writes to DB
251        epoch_store
252            .tables()?
253            .write_transaction_locks(signed_transaction, locks_to_write.iter().cloned())?;
254
255        // remove pending locks from unbounded storage
256        self.clear_cached_locks(&locks_to_write);
257
258        Ok(())
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use crate::execution_cache::{
265        ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
266    };
267
268    #[tokio::test]
269    async fn test_transaction_locks_are_exclusive() {
270        telemetry_subscribers::init_for_testing();
271        Scenario::iterate(|mut s| async move {
272            s.with_created(&[1, 2, 3]);
273            s.do_tx().await;
274
275            s.with_mutated(&[1, 2, 3]);
276            s.do_tx().await;
277
278            let new1 = s.obj_ref(1);
279            let new2 = s.obj_ref(2);
280            let new3 = s.obj_ref(3);
281
282            s.with_mutated(&[1, 2, 3]); // begin forming a tx but never execute it
283            let outputs = s.take_outputs();
284
285            let tx1 = s.make_signed_transaction(&outputs.transaction);
286
287            s.cache
288                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx1.digest(), Some(tx1))
289                .expect("locks should be available");
290
291            // this tx doesn't use the actual objects in question, but we just need something
292            // to insert into the table.
293            s.with_created(&[4, 5]);
294            let tx2 = s.take_outputs().transaction.clone();
295            let tx2 = s.make_signed_transaction(&tx2);
296
297            // both locks are held by tx1, so this should fail
298            s.cache
299                .acquire_transaction_locks(
300                    &s.epoch_store,
301                    &[new1, new2],
302                    *tx2.digest(),
303                    Some(tx2.clone()),
304                )
305                .unwrap_err();
306
307            // new3 is lockable, but new2 is not, so this should fail
308            s.cache
309                .acquire_transaction_locks(
310                    &s.epoch_store,
311                    &[new3, new2],
312                    *tx2.digest(),
313                    Some(tx2.clone()),
314                )
315                .unwrap_err();
316
317            // new3 is unlocked
318            s.cache
319                .acquire_transaction_locks(
320                    &s.epoch_store,
321                    &[new3],
322                    *tx2.digest(),
323                    Some(tx2.clone()),
324                )
325                .expect("new3 should be unlocked");
326        })
327        .await;
328    }
329
330    #[tokio::test]
331    async fn test_transaction_locks_are_durable() {
332        telemetry_subscribers::init_for_testing();
333        Scenario::iterate(|mut s| async move {
334            s.with_created(&[1, 2]);
335            s.do_tx().await;
336
337            let old2 = s.obj_ref(2);
338
339            s.with_mutated(&[1, 2]);
340            s.do_tx().await;
341
342            let new1 = s.obj_ref(1);
343            let new2 = s.obj_ref(2);
344
345            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
346            let outputs = s.take_outputs();
347
348            let tx = s.make_signed_transaction(&outputs.transaction);
349
350            // fails because we are referring to an old object
351            s.cache
352                .acquire_transaction_locks(
353                    &s.epoch_store,
354                    &[new1, old2],
355                    *tx.digest(),
356                    Some(tx.clone()),
357                )
358                .unwrap_err();
359
360            // succeeds because the above call releases the lock on new1 after failing
361            // to get the lock on old2
362            s.cache
363                .acquire_transaction_locks(
364                    &s.epoch_store,
365                    &[new1, new2],
366                    *tx.digest(),
367                    Some(tx.clone()),
368                )
369                .expect("new1 should be unlocked after revert");
370        })
371        .await;
372    }
373
374    #[tokio::test]
375    async fn test_acquire_transaction_locks_revert() {
376        telemetry_subscribers::init_for_testing();
377        Scenario::iterate(|mut s| async move {
378            s.with_created(&[1, 2]);
379            s.do_tx().await;
380
381            let old2 = s.obj_ref(2);
382
383            s.with_mutated(&[1, 2]);
384            s.do_tx().await;
385
386            let new1 = s.obj_ref(1);
387            let new2 = s.obj_ref(2);
388
389            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
390            let outputs = s.take_outputs();
391
392            let tx = s.make_signed_transaction(&outputs.transaction);
393
394            // fails because we are referring to an old object
395            s.cache
396                .acquire_transaction_locks(
397                    &s.epoch_store,
398                    &[new1, old2],
399                    *tx.digest(),
400                    Some(tx.clone()),
401                )
402                .unwrap_err();
403
404            // this tx doesn't use the actual objects in question, but we just need something
405            // to insert into the table.
406            s.with_created(&[4, 5]);
407            let tx2 = s.take_outputs().transaction.clone();
408            let tx2 = s.make_signed_transaction(&tx2);
409
410            // succeeds because the above call releases the lock on new1 after failing
411            // to get the lock on old2
412            s.cache
413                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx2.digest(), Some(tx2))
414                .expect("new1 should be unlocked after revert");
415        })
416        .await;
417    }
418
419    #[tokio::test]
420    async fn test_acquire_transaction_locks_is_sync() {
421        telemetry_subscribers::init_for_testing();
422        Scenario::iterate(|mut s| async move {
423            s.with_created(&[1, 2]);
424            s.do_tx().await;
425
426            let objects: Vec<_> = vec![s.object(1), s.object(2)]
427                .into_iter()
428                .map(|o| o.compute_object_reference())
429                .collect();
430
431            s.with_mutated(&[1, 2]);
432            let outputs = s.take_outputs();
433
434            let tx2 = s.make_signed_transaction(&outputs.transaction);
435            // assert that acquire_transaction_locks is sync in non-simtest, which causes the
436            // fail_point_async! macros above to be elided
437            s.cache
438                .acquire_transaction_locks(
439                    &s.epoch_store,
440                    &objects,
441                    *tx2.digest(),
442                    Some(tx2.clone()),
443                )
444                .unwrap();
445        })
446        .await;
447    }
448}