sui_core/execution_cache/
object_locks.rs1use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
5use dashmap::DashMap;
6use dashmap::mapref::entry::Entry as DashMapEntry;
7use mysten_common::*;
8use sui_types::base_types::{ObjectID, ObjectRef};
9use sui_types::digests::TransactionDigest;
10use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
11use sui_types::object::Object;
12use sui_types::storage::ObjectStore;
13use sui_types::transaction::VerifiedSignedTransaction;
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17
18type RefCount = usize;
19
20pub(super) struct ObjectLocks {
21 locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
31}
32
33impl ObjectLocks {
34 pub fn new() -> Self {
35 Self {
36 locked_transactions: DashMap::new(),
37 }
38 }
39
40 pub(crate) fn get_transaction_lock(
41 &self,
42 obj_ref: &ObjectRef,
43 epoch_store: &AuthorityPerEpochStore,
44 ) -> SuiResult<Option<LockDetails>> {
45 epoch_store.tables()?.get_locked_transaction(obj_ref)
49 }
50
51 pub(crate) fn try_set_transaction_lock(
56 &self,
57 obj_ref: &ObjectRef,
58 new_lock: LockDetails,
59 epoch_store: &AuthorityPerEpochStore,
60 ) -> SuiResult {
61 let entry = self.locked_transactions.entry(*obj_ref);
63
64 let prev_lock = match entry {
80 DashMapEntry::Vacant(vacant) => {
81 let tables = epoch_store.tables()?;
82 if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
83 trace!("read lock from db: {:?}", lock_details);
84 vacant.insert((1, lock_details));
85 lock_details
86 } else {
87 trace!("set lock: {:?}", new_lock);
88 vacant.insert((1, new_lock));
89 new_lock
90 }
91 }
92 DashMapEntry::Occupied(mut occupied) => {
93 occupied.get_mut().0 += 1;
94 occupied.get().1
95 }
96 };
97
98 if prev_lock != new_lock {
99 debug!(
100 "lock conflict detected for {:?}: {:?} != {:?}",
101 obj_ref, prev_lock, new_lock
102 );
103 Err(SuiErrorKind::ObjectLockConflict {
104 obj_ref: *obj_ref,
105 pending_transaction: prev_lock,
106 }
107 .into())
108 } else {
109 Ok(())
110 }
111 }
112
113 pub(crate) fn clear(&self) {
114 info!("clearing old transaction locks");
115 self.locked_transactions.clear();
116 }
117
118 fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> SuiResult {
119 debug_assert_eq!(obj_ref.0, live_object.id());
120 if obj_ref.1 != live_object.version() {
121 debug!(
122 "object version unavailable for consumption: {:?} (current: {})",
123 obj_ref,
124 live_object.version()
125 );
126 return Err(SuiErrorKind::UserInputError {
127 error: UserInputError::ObjectVersionUnavailableForConsumption {
128 provided_obj_ref: *obj_ref,
129 current_version: live_object.version(),
130 },
131 }
132 .into());
133 }
134
135 let live_digest = live_object.digest();
136 if obj_ref.2 != live_digest {
137 return Err(SuiErrorKind::UserInputError {
138 error: UserInputError::InvalidObjectDigest {
139 object_id: obj_ref.0,
140 expected_digest: live_digest,
141 },
142 }
143 .into());
144 }
145
146 Ok(())
147 }
148
149 pub(crate) fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
150 for (obj_ref, lock) in locks {
151 let entry = self.locked_transactions.entry(*obj_ref);
152 let mut occupied = match entry {
153 DashMapEntry::Vacant(_) => {
154 debug_fatal!("lock must exist for object: {:?}", obj_ref);
155 continue;
156 }
157 DashMapEntry::Occupied(occupied) => occupied,
158 };
159
160 if occupied.get().1 == *lock {
161 occupied.get_mut().0 -= 1;
162 if occupied.get().0 == 0 {
163 trace!("clearing lock: {:?}", lock);
164 occupied.remove();
165 }
166 } else {
167 panic!("lock was changed since we set it");
171 }
172 }
173 }
174
175 fn multi_get_objects_must_exist(
176 cache: &WritebackCache,
177 object_ids: &[ObjectID],
178 ) -> SuiResult<Vec<Object>> {
179 let objects = cache.multi_get_objects(object_ids);
180 let mut result = Vec::with_capacity(objects.len());
181 for (i, object) in objects.into_iter().enumerate() {
182 if let Some(object) = object {
183 result.push(object);
184 } else {
185 return Err(SuiErrorKind::UserInputError {
186 error: UserInputError::ObjectNotFound {
187 object_id: object_ids[i],
188 version: None,
189 },
190 }
191 .into());
192 }
193 }
194 Ok(result)
195 }
196
197 #[instrument(level = "debug", skip_all)]
198 pub(crate) fn acquire_transaction_locks(
199 &self,
200 cache: &WritebackCache,
201 epoch_store: &AuthorityPerEpochStore,
202 owned_input_objects: &[ObjectRef],
203 tx_digest: TransactionDigest,
204 signed_transaction: Option<VerifiedSignedTransaction>,
205 ) -> SuiResult {
206 Self::validate_owned_object_versions(cache, owned_input_objects)?;
208
209 let mut locks_to_write: Vec<(_, LockDetails)> =
210 Vec::with_capacity(owned_input_objects.len());
211
212 let owned_input_objects = {
222 let mut o = owned_input_objects.to_vec();
223 o.sort_by_key(|o| o.0);
224 o
225 };
226
227 for obj_ref in owned_input_objects.iter() {
232 match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
233 Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
234 Err(e) => {
235 self.clear_cached_locks(&locks_to_write);
240 return Err(e);
241 }
242 }
243 }
244
245 epoch_store
247 .tables()?
248 .write_transaction_locks(signed_transaction, locks_to_write.iter().cloned())?;
249
250 self.clear_cached_locks(&locks_to_write);
252
253 Ok(())
254 }
255
256 #[instrument(level = "debug", skip_all)]
260 pub(crate) fn validate_owned_object_versions(
261 cache: &WritebackCache,
262 owned_input_objects: &[ObjectRef],
263 ) -> SuiResult {
264 let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
265 let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
266
267 for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
269 Self::verify_live_object(obj_ref, live_object)?;
270 }
271
272 Ok(())
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use crate::execution_cache::{
279 ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
280 };
281
282 #[tokio::test]
283 async fn test_transaction_locks_are_exclusive() {
284 telemetry_subscribers::init_for_testing();
285 Scenario::iterate(|mut s| async move {
286 s.with_created(&[1, 2, 3]);
287 s.do_tx().await;
288
289 s.with_mutated(&[1, 2, 3]);
290 s.do_tx().await;
291
292 let new1 = s.obj_ref(1);
293 let new2 = s.obj_ref(2);
294 let new3 = s.obj_ref(3);
295
296 s.with_mutated(&[1, 2, 3]); let outputs = s.take_outputs();
298
299 let tx1 = s.make_signed_transaction(&outputs.transaction);
300
301 s.cache
302 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx1.digest(), Some(tx1))
303 .expect("locks should be available");
304
305 s.with_created(&[4, 5]);
308 let tx2 = s.take_outputs().transaction.clone();
309 let tx2 = s.make_signed_transaction(&tx2);
310
311 s.cache
313 .acquire_transaction_locks(
314 &s.epoch_store,
315 &[new1, new2],
316 *tx2.digest(),
317 Some(tx2.clone()),
318 )
319 .unwrap_err();
320
321 s.cache
323 .acquire_transaction_locks(
324 &s.epoch_store,
325 &[new3, new2],
326 *tx2.digest(),
327 Some(tx2.clone()),
328 )
329 .unwrap_err();
330
331 s.cache
333 .acquire_transaction_locks(
334 &s.epoch_store,
335 &[new3],
336 *tx2.digest(),
337 Some(tx2.clone()),
338 )
339 .expect("new3 should be unlocked");
340 })
341 .await;
342 }
343
344 #[tokio::test]
345 async fn test_transaction_locks_are_durable() {
346 telemetry_subscribers::init_for_testing();
347 Scenario::iterate(|mut s| async move {
348 s.with_created(&[1, 2]);
349 s.do_tx().await;
350
351 let old2 = s.obj_ref(2);
352
353 s.with_mutated(&[1, 2]);
354 s.do_tx().await;
355
356 let new1 = s.obj_ref(1);
357 let new2 = s.obj_ref(2);
358
359 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
361
362 let tx = s.make_signed_transaction(&outputs.transaction);
363
364 s.cache
366 .acquire_transaction_locks(
367 &s.epoch_store,
368 &[new1, old2],
369 *tx.digest(),
370 Some(tx.clone()),
371 )
372 .unwrap_err();
373
374 s.cache
377 .acquire_transaction_locks(
378 &s.epoch_store,
379 &[new1, new2],
380 *tx.digest(),
381 Some(tx.clone()),
382 )
383 .expect("new1 should be unlocked after revert");
384 })
385 .await;
386 }
387
388 #[tokio::test]
389 async fn test_acquire_transaction_locks_revert() {
390 telemetry_subscribers::init_for_testing();
391 Scenario::iterate(|mut s| async move {
392 s.with_created(&[1, 2]);
393 s.do_tx().await;
394
395 let old2 = s.obj_ref(2);
396
397 s.with_mutated(&[1, 2]);
398 s.do_tx().await;
399
400 let new1 = s.obj_ref(1);
401 let new2 = s.obj_ref(2);
402
403 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
405
406 let tx = s.make_signed_transaction(&outputs.transaction);
407
408 s.cache
410 .acquire_transaction_locks(
411 &s.epoch_store,
412 &[new1, old2],
413 *tx.digest(),
414 Some(tx.clone()),
415 )
416 .unwrap_err();
417
418 s.with_created(&[4, 5]);
421 let tx2 = s.take_outputs().transaction.clone();
422 let tx2 = s.make_signed_transaction(&tx2);
423
424 s.cache
427 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx2.digest(), Some(tx2))
428 .expect("new1 should be unlocked after revert");
429 })
430 .await;
431 }
432
433 #[tokio::test]
434 async fn test_acquire_transaction_locks_is_sync() {
435 telemetry_subscribers::init_for_testing();
436 Scenario::iterate(|mut s| async move {
437 s.with_created(&[1, 2]);
438 s.do_tx().await;
439
440 let objects: Vec<_> = vec![s.object(1), s.object(2)]
441 .into_iter()
442 .map(|o| o.compute_object_reference())
443 .collect();
444
445 s.with_mutated(&[1, 2]);
446 let outputs = s.take_outputs();
447
448 let tx2 = s.make_signed_transaction(&outputs.transaction);
449 s.cache
452 .acquire_transaction_locks(
453 &s.epoch_store,
454 &objects,
455 *tx2.digest(),
456 Some(tx2.clone()),
457 )
458 .unwrap();
459 })
460 .await;
461 }
462}