sui_core/execution_cache/
object_locks.rs1use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
5use dashmap::DashMap;
6use dashmap::mapref::entry::Entry as DashMapEntry;
7use mysten_common::*;
8use sui_types::base_types::{ObjectID, ObjectRef};
9use sui_types::digests::TransactionDigest;
10use sui_types::error::{SuiErrorKind, SuiResult, UserInputError};
11use sui_types::object::Object;
12use sui_types::storage::ObjectStore;
13use sui_types::transaction::VerifiedSignedTransaction;
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17
18type RefCount = usize;
19
20pub(super) struct ObjectLocks {
21 locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
31}
32
33impl ObjectLocks {
34 pub fn new() -> Self {
35 Self {
36 locked_transactions: DashMap::new(),
37 }
38 }
39
40 pub(crate) fn get_transaction_lock(
41 &self,
42 obj_ref: &ObjectRef,
43 epoch_store: &AuthorityPerEpochStore,
44 ) -> SuiResult<Option<LockDetails>> {
45 epoch_store.tables()?.get_locked_transaction(obj_ref)
49 }
50
51 pub(crate) fn try_set_transaction_lock(
56 &self,
57 obj_ref: &ObjectRef,
58 new_lock: LockDetails,
59 epoch_store: &AuthorityPerEpochStore,
60 ) -> SuiResult {
61 let entry = self.locked_transactions.entry(*obj_ref);
63
64 let prev_lock = match entry {
80 DashMapEntry::Vacant(vacant) => {
81 let tables = epoch_store.tables()?;
82 if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
83 trace!("read lock from db: {:?}", lock_details);
84 vacant.insert((1, lock_details));
85 lock_details
86 } else {
87 trace!("set lock: {:?}", new_lock);
88 vacant.insert((1, new_lock));
89 new_lock
90 }
91 }
92 DashMapEntry::Occupied(mut occupied) => {
93 occupied.get_mut().0 += 1;
94 occupied.get().1
95 }
96 };
97
98 if prev_lock != new_lock {
99 debug!(
100 "lock conflict detected for {:?}: {:?} != {:?}",
101 obj_ref, prev_lock, new_lock
102 );
103 Err(SuiErrorKind::ObjectLockConflict {
104 obj_ref: *obj_ref,
105 pending_transaction: prev_lock,
106 }
107 .into())
108 } else {
109 Ok(())
110 }
111 }
112
113 pub(crate) fn clear(&self) {
114 info!("clearing old transaction locks");
115 self.locked_transactions.clear();
116 }
117
118 fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> SuiResult {
119 debug_assert_eq!(obj_ref.0, live_object.id());
120 if obj_ref.1 != live_object.version() {
121 debug!(
122 "object version unavailable for consumption: {:?} (current: {})",
123 obj_ref,
124 live_object.version()
125 );
126 return Err(SuiErrorKind::UserInputError {
127 error: UserInputError::ObjectVersionUnavailableForConsumption {
128 provided_obj_ref: *obj_ref,
129 current_version: live_object.version(),
130 },
131 }
132 .into());
133 }
134
135 let live_digest = live_object.digest();
136 if obj_ref.2 != live_digest {
137 return Err(SuiErrorKind::UserInputError {
138 error: UserInputError::InvalidObjectDigest {
139 object_id: obj_ref.0,
140 expected_digest: live_digest,
141 },
142 }
143 .into());
144 }
145
146 Ok(())
147 }
148
149 fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
150 for (obj_ref, lock) in locks {
151 let entry = self.locked_transactions.entry(*obj_ref);
152 let mut occupied = match entry {
153 DashMapEntry::Vacant(_) => {
154 debug_fatal!("lock must exist for object: {:?}", obj_ref);
155 continue;
156 }
157 DashMapEntry::Occupied(occupied) => occupied,
158 };
159
160 if occupied.get().1 == *lock {
161 occupied.get_mut().0 -= 1;
162 if occupied.get().0 == 0 {
163 trace!("clearing lock: {:?}", lock);
164 occupied.remove();
165 }
166 } else {
167 panic!("lock was changed since we set it");
171 }
172 }
173 }
174
175 fn multi_get_objects_must_exist(
176 cache: &WritebackCache,
177 object_ids: &[ObjectID],
178 ) -> SuiResult<Vec<Object>> {
179 let objects = cache.multi_get_objects(object_ids);
180 let mut result = Vec::with_capacity(objects.len());
181 for (i, object) in objects.into_iter().enumerate() {
182 if let Some(object) = object {
183 result.push(object);
184 } else {
185 return Err(SuiErrorKind::UserInputError {
186 error: UserInputError::ObjectNotFound {
187 object_id: object_ids[i],
188 version: None,
189 },
190 }
191 .into());
192 }
193 }
194 Ok(result)
195 }
196
197 #[instrument(level = "debug", skip_all)]
198 pub(crate) fn acquire_transaction_locks(
199 &self,
200 cache: &WritebackCache,
201 epoch_store: &AuthorityPerEpochStore,
202 owned_input_objects: &[ObjectRef],
203 tx_digest: TransactionDigest,
204 signed_transaction: Option<VerifiedSignedTransaction>,
205 ) -> SuiResult {
206 let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
207 let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
208
209 for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
211 Self::verify_live_object(obj_ref, live_object)?;
212 }
213
214 let mut locks_to_write: Vec<(_, LockDetails)> =
215 Vec::with_capacity(owned_input_objects.len());
216
217 let owned_input_objects = {
227 let mut o = owned_input_objects.to_vec();
228 o.sort_by_key(|o| o.0);
229 o
230 };
231
232 for obj_ref in owned_input_objects.iter() {
237 match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
238 Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
239 Err(e) => {
240 self.clear_cached_locks(&locks_to_write);
245 return Err(e);
246 }
247 }
248 }
249
250 epoch_store
252 .tables()?
253 .write_transaction_locks(signed_transaction, locks_to_write.iter().cloned())?;
254
255 self.clear_cached_locks(&locks_to_write);
257
258 Ok(())
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use crate::execution_cache::{
265 ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
266 };
267
268 #[tokio::test]
269 async fn test_transaction_locks_are_exclusive() {
270 telemetry_subscribers::init_for_testing();
271 Scenario::iterate(|mut s| async move {
272 s.with_created(&[1, 2, 3]);
273 s.do_tx().await;
274
275 s.with_mutated(&[1, 2, 3]);
276 s.do_tx().await;
277
278 let new1 = s.obj_ref(1);
279 let new2 = s.obj_ref(2);
280 let new3 = s.obj_ref(3);
281
282 s.with_mutated(&[1, 2, 3]); let outputs = s.take_outputs();
284
285 let tx1 = s.make_signed_transaction(&outputs.transaction);
286
287 s.cache
288 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx1.digest(), Some(tx1))
289 .expect("locks should be available");
290
291 s.with_created(&[4, 5]);
294 let tx2 = s.take_outputs().transaction.clone();
295 let tx2 = s.make_signed_transaction(&tx2);
296
297 s.cache
299 .acquire_transaction_locks(
300 &s.epoch_store,
301 &[new1, new2],
302 *tx2.digest(),
303 Some(tx2.clone()),
304 )
305 .unwrap_err();
306
307 s.cache
309 .acquire_transaction_locks(
310 &s.epoch_store,
311 &[new3, new2],
312 *tx2.digest(),
313 Some(tx2.clone()),
314 )
315 .unwrap_err();
316
317 s.cache
319 .acquire_transaction_locks(
320 &s.epoch_store,
321 &[new3],
322 *tx2.digest(),
323 Some(tx2.clone()),
324 )
325 .expect("new3 should be unlocked");
326 })
327 .await;
328 }
329
330 #[tokio::test]
331 async fn test_transaction_locks_are_durable() {
332 telemetry_subscribers::init_for_testing();
333 Scenario::iterate(|mut s| async move {
334 s.with_created(&[1, 2]);
335 s.do_tx().await;
336
337 let old2 = s.obj_ref(2);
338
339 s.with_mutated(&[1, 2]);
340 s.do_tx().await;
341
342 let new1 = s.obj_ref(1);
343 let new2 = s.obj_ref(2);
344
345 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
347
348 let tx = s.make_signed_transaction(&outputs.transaction);
349
350 s.cache
352 .acquire_transaction_locks(
353 &s.epoch_store,
354 &[new1, old2],
355 *tx.digest(),
356 Some(tx.clone()),
357 )
358 .unwrap_err();
359
360 s.cache
363 .acquire_transaction_locks(
364 &s.epoch_store,
365 &[new1, new2],
366 *tx.digest(),
367 Some(tx.clone()),
368 )
369 .expect("new1 should be unlocked after revert");
370 })
371 .await;
372 }
373
374 #[tokio::test]
375 async fn test_acquire_transaction_locks_revert() {
376 telemetry_subscribers::init_for_testing();
377 Scenario::iterate(|mut s| async move {
378 s.with_created(&[1, 2]);
379 s.do_tx().await;
380
381 let old2 = s.obj_ref(2);
382
383 s.with_mutated(&[1, 2]);
384 s.do_tx().await;
385
386 let new1 = s.obj_ref(1);
387 let new2 = s.obj_ref(2);
388
389 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
391
392 let tx = s.make_signed_transaction(&outputs.transaction);
393
394 s.cache
396 .acquire_transaction_locks(
397 &s.epoch_store,
398 &[new1, old2],
399 *tx.digest(),
400 Some(tx.clone()),
401 )
402 .unwrap_err();
403
404 s.with_created(&[4, 5]);
407 let tx2 = s.take_outputs().transaction.clone();
408 let tx2 = s.make_signed_transaction(&tx2);
409
410 s.cache
413 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], *tx2.digest(), Some(tx2))
414 .expect("new1 should be unlocked after revert");
415 })
416 .await;
417 }
418
419 #[tokio::test]
420 async fn test_acquire_transaction_locks_is_sync() {
421 telemetry_subscribers::init_for_testing();
422 Scenario::iterate(|mut s| async move {
423 s.with_created(&[1, 2]);
424 s.do_tx().await;
425
426 let objects: Vec<_> = vec![s.object(1), s.object(2)]
427 .into_iter()
428 .map(|o| o.compute_object_reference())
429 .collect();
430
431 s.with_mutated(&[1, 2]);
432 let outputs = s.take_outputs();
433
434 let tx2 = s.make_signed_transaction(&outputs.transaction);
435 s.cache
438 .acquire_transaction_locks(
439 &s.epoch_store,
440 &objects,
441 *tx2.digest(),
442 Some(tx2.clone()),
443 )
444 .unwrap();
445 })
446 .await;
447 }
448}