sui_core/transaction_input_loader.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertLockGuard},
execution_cache::ObjectCacheRead,
};
use itertools::izip;
use mysten_common::fatal;
use once_cell::unsync::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use sui_types::{
base_types::{EpochId, FullObjectID, ObjectRef, TransactionDigest},
error::{SuiError, SuiResult, UserInputError},
storage::{FullObjectKey, ObjectKey},
transaction::{
InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
},
};
use tracing::instrument;
pub(crate) struct TransactionInputLoader {
cache: Arc<dyn ObjectCacheRead>,
}
impl TransactionInputLoader {
pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
Self { cache }
}
}
impl TransactionInputLoader {
/// Read the inputs for a transaction that the validator was asked to sign.
///
/// tx_digest is provided so that the inputs can be cached with the tx_digest and returned with
/// a single hash map lookup when notify_read_objects_for_execution is called later.
/// TODO: implement this caching
#[instrument(level = "trace", skip_all)]
pub fn read_objects_for_signing(
&self,
_tx_digest_for_caching: Option<&TransactionDigest>,
input_object_kinds: &[InputObjectKind],
receiving_objects: &[ObjectRef],
epoch_id: EpochId,
) -> SuiResult<(InputObjects, ReceivingObjects)> {
// Length of input_object_kinds have been checked via validity_check() for ProgrammableTransaction.
let mut input_results = vec![None; input_object_kinds.len()];
let mut object_refs = Vec::with_capacity(input_object_kinds.len());
let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
for (i, kind) in input_object_kinds.iter().enumerate() {
match kind {
// Packages are loaded one at a time via the cache
InputObjectKind::MovePackage(id) => {
let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
return Err(SuiError::from(kind.object_not_found_error()));
};
input_results[i] = Some(ObjectReadResult {
input_object_kind: *kind,
object: ObjectReadResultKind::Object(package),
});
}
InputObjectKind::SharedMoveObject { .. } => {
let input_full_id = kind.full_object_id();
// Load the most current version from the cache.
match self.cache.get_object(&kind.object_id()) {
// If full ID matches, we're done.
// (Full ID may not match if object was transferred in or out of
// consensus. We have to double-check this because cache is keyed
// on ObjectID and not FullObjectID.)
Some(object) if object.full_id() == input_full_id => {
input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
}
_ => {
// If the full ID doesn't match, check if the object's consensus
// stream was ended.
if let Some((version, digest)) = self
.cache
.get_last_consensus_stream_end_info(input_full_id, epoch_id)
{
input_results[i] = Some(ObjectReadResult {
input_object_kind: *kind,
object: ObjectReadResultKind::ObjectConsensusStreamEnded(
version, digest,
),
});
} else {
return Err(SuiError::from(kind.object_not_found_error()));
}
}
}
}
InputObjectKind::ImmOrOwnedMoveObject(objref) => {
object_refs.push(*objref);
fetch_indices.push(i);
}
}
}
let objects = self
.cache
.multi_get_objects_with_more_accurate_error_return(&object_refs)?;
assert_eq!(objects.len(), object_refs.len());
for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
input_results[index] = Some(ObjectReadResult {
input_object_kind: input_object_kinds[index],
object: ObjectReadResultKind::Object(object),
});
}
let receiving_results =
self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
Ok((
input_results
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>()
.into(),
receiving_results,
))
}
/// Read the inputs for a transaction that is ready to be executed.
///
/// epoch_store is used to resolve the versions of any shared input objects.
///
/// This function panics if any inputs are not available, as TransactionManager should already
/// have verified that the transaction is ready to be executed.
///
/// The tx_digest is provided here to support the following optimization (not yet implemented):
/// All the owned input objects will likely have been loaded during transaction signing, and
/// can be stored as a group with the transaction_digest as the key, allowing the lookup to
/// proceed with only a single hash map lookup. (additional lookups may be necessary for shared
/// inputs, since the versions are not known at signing time). Receiving objects could be
/// cached, but only with appropriate invalidation logic for when an object is received by a
/// different tx first.
#[instrument(level = "trace", skip_all)]
pub fn read_objects_for_execution(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx_key: &TransactionKey,
_tx_lock: &CertLockGuard, // see below for why this is needed
input_object_kinds: &[InputObjectKind],
epoch_id: EpochId,
) -> SuiResult<InputObjects> {
let assigned_shared_versions_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
let mut results = vec![None; input_object_kinds.len()];
let mut object_keys = Vec::with_capacity(input_object_kinds.len());
let mut fetches = Vec::with_capacity(input_object_kinds.len());
for (i, input) in input_object_kinds.iter().enumerate() {
match input {
InputObjectKind::MovePackage(id) => {
let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
});
results[i] = Some(ObjectReadResult {
input_object_kind: *input,
object: ObjectReadResultKind::Object(package.into()),
});
continue;
}
InputObjectKind::ImmOrOwnedMoveObject(objref) => {
object_keys.push(objref.into());
fetches.push((i, input));
}
InputObjectKind::SharedMoveObject {
id,
initial_shared_version,
..
} => {
let assigned_shared_versions = assigned_shared_versions_cell
.get_or_init(|| {
epoch_store
.get_assigned_shared_object_versions(tx_key)
.map(|versions| versions.into_iter().collect())
})
.as_ref()
.unwrap_or_else(|| {
// Important to hold the _tx_lock here - otherwise it would be possible
// for a concurrent execution of the same tx to enter this point after
// the first execution has finished and the assigned shared versions
// have been deleted.
fatal!(
"Failed to get assigned shared versions for transaction {tx_key:?}"
);
});
// If we find a set of assigned versions but one object's version assignments
// are missing from the set, it indicates a serious inconsistency:
let version = assigned_shared_versions.get(&(*id, *initial_shared_version)).unwrap_or_else(|| {
panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
});
if version.is_cancelled() {
// Do not need to fetch shared object for cancelled transaction.
results[i] = Some(ObjectReadResult {
input_object_kind: *input,
object: ObjectReadResultKind::CancelledTransactionSharedObject(
*version,
),
})
} else {
object_keys.push(ObjectKey(*id, *version));
fetches.push((i, input));
}
}
}
}
let objects = self.cache.multi_get_objects_by_key(&object_keys);
assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
for (object, key, (index, input)) in izip!(
objects.into_iter(),
object_keys.into_iter(),
fetches.into_iter()
) {
results[index] = Some(match (object, input) {
(Some(obj), InputObjectKind::SharedMoveObject { .. }) if obj.full_id() == input.full_object_id() => ObjectReadResult {
input_object_kind: *input,
object: obj.into(),
},
(_, InputObjectKind::SharedMoveObject { .. }) => {
assert!(key.1.is_valid());
// If the full ID on a shared input doesn't match, check if the object
// was removed from consensus by a concurrently certified tx.
let version = key.1;
if let Some(dependency) = self.cache.get_consensus_stream_end_tx_digest(
FullObjectKey::new(input.full_object_id(), version),
epoch_id,
) {
ObjectReadResult {
input_object_kind: *input,
object: ObjectReadResultKind::ObjectConsensusStreamEnded(version, dependency),
}
} else {
panic!("All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {:?}, version: {version} is absent in epoch {epoch_id}", input.full_object_id());
}
},
(Some(obj), input_object_kind) => ObjectReadResult {
input_object_kind: *input_object_kind,
object: obj.into(),
},
_ => panic!("All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"),
});
}
Ok(results
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>()
.into())
}
}
// private methods
impl TransactionInputLoader {
fn read_receiving_objects_for_signing(
&self,
receiving_objects: &[ObjectRef],
epoch_id: EpochId,
) -> SuiResult<ReceivingObjects> {
let mut receiving_results = Vec::with_capacity(receiving_objects.len());
for objref in receiving_objects {
// Note: the digest is checked later in check_transaction_input
let (object_id, version, _) = objref;
// TODO: Add support for receiving ConsensusV2 objects. For now this assumes fastpath.
if self.cache.have_received_object_at_version(
FullObjectKey::new(FullObjectID::new(*object_id, None), *version),
epoch_id,
) {
receiving_results.push(ReceivingObjectReadResult::new(
*objref,
ReceivingObjectReadResultKind::PreviouslyReceivedObject,
));
continue;
}
let Some(object) = self.cache.get_object(object_id) else {
return Err(UserInputError::ObjectNotFound {
object_id: *object_id,
version: Some(*version),
}
.into());
};
receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
}
Ok(receiving_results.into())
}
}