1use crate::{
5 authority::{
6 authority_per_epoch_store::CertLockGuard, shared_object_version_manager::AssignedVersions,
7 },
8 execution_cache::ObjectCacheRead,
9};
10use itertools::izip;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13use sui_types::{
14 base_types::{EpochId, FullObjectID, ObjectRef, TransactionDigest},
15 error::{SuiError, SuiResult, UserInputError},
16 storage::{FullObjectKey, ObjectKey},
17 transaction::{
18 InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
19 ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
20 },
21};
22use tracing::instrument;
23
24pub(crate) struct TransactionInputLoader {
25 cache: Arc<dyn ObjectCacheRead>,
26}
27
28impl TransactionInputLoader {
29 pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
30 Self { cache }
31 }
32}
33
34impl TransactionInputLoader {
35 #[instrument(level = "trace", skip_all)]
41 pub fn read_objects_for_signing(
42 &self,
43 _tx_digest_for_caching: Option<&TransactionDigest>,
44 input_object_kinds: &[InputObjectKind],
45 receiving_objects: &[ObjectRef],
46 epoch_id: EpochId,
47 ) -> SuiResult<(InputObjects, ReceivingObjects)> {
48 let mut input_results = vec![None; input_object_kinds.len()];
50 let mut object_refs = Vec::with_capacity(input_object_kinds.len());
51 let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
52
53 for (i, kind) in input_object_kinds.iter().enumerate() {
54 match kind {
55 InputObjectKind::MovePackage(id) => {
57 let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
58 return Err(SuiError::from(kind.object_not_found_error()));
59 };
60 input_results[i] = Some(ObjectReadResult {
61 input_object_kind: *kind,
62 object: ObjectReadResultKind::Object(package),
63 });
64 }
65 InputObjectKind::SharedMoveObject { .. } => {
66 let input_full_id = kind.full_object_id();
67
68 match self.cache.get_object(&kind.object_id()) {
70 Some(object) if object.full_id() == input_full_id => {
75 input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
76 }
77 _ => {
78 if let Some((version, digest)) = self
81 .cache
82 .get_last_consensus_stream_end_info(input_full_id, epoch_id)
83 {
84 input_results[i] = Some(ObjectReadResult {
85 input_object_kind: *kind,
86 object: ObjectReadResultKind::ObjectConsensusStreamEnded(
87 version, digest,
88 ),
89 });
90 } else {
91 return Err(SuiError::from(kind.object_not_found_error()));
92 }
93 }
94 }
95 }
96 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
97 object_refs.push(*objref);
98 fetch_indices.push(i);
99 }
100 }
101 }
102
103 let objects = self
104 .cache
105 .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
106 assert_eq!(objects.len(), object_refs.len());
107 for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
108 input_results[index] = Some(ObjectReadResult {
109 input_object_kind: input_object_kinds[index],
110 object: ObjectReadResultKind::Object(object),
111 });
112 }
113
114 let receiving_results =
115 self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
116
117 Ok((
118 input_results
119 .into_iter()
120 .map(Option::unwrap)
121 .collect::<Vec<_>>()
122 .into(),
123 receiving_results,
124 ))
125 }
126
127 #[instrument(level = "trace", skip_all)]
142 pub fn read_objects_for_execution(
143 &self,
144 tx_key: &TransactionKey,
145 _tx_lock: &CertLockGuard, input_object_kinds: &[InputObjectKind],
147 assigned_shared_object_versions: &AssignedVersions,
148 epoch_id: EpochId,
149 ) -> SuiResult<InputObjects> {
150 let assigned_shared_versions: BTreeMap<_, _> = assigned_shared_object_versions
151 .iter()
152 .map(|((id, initial_shared_version), version)| {
153 ((*id, *initial_shared_version), *version)
154 })
155 .collect();
156
157 let mut results = vec![None; input_object_kinds.len()];
158 let mut object_keys = Vec::with_capacity(input_object_kinds.len());
159 let mut fetches = Vec::with_capacity(input_object_kinds.len());
160
161 for (i, input) in input_object_kinds.iter().enumerate() {
162 match input {
163 InputObjectKind::MovePackage(id) => {
164 let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
165 panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
166 });
167
168 results[i] = Some(ObjectReadResult {
169 input_object_kind: *input,
170 object: ObjectReadResultKind::Object(package.into()),
171 });
172 continue;
173 }
174 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
175 object_keys.push(objref.into());
176 fetches.push((i, input));
177 }
178 InputObjectKind::SharedMoveObject {
179 id,
180 initial_shared_version,
181 ..
182 } => {
183 let version = assigned_shared_versions.get(&(*id, *initial_shared_version)).unwrap_or_else(|| {
186 panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
187 });
188 if version.is_cancelled() {
189 results[i] = Some(ObjectReadResult {
191 input_object_kind: *input,
192 object: ObjectReadResultKind::CancelledTransactionSharedObject(
193 *version,
194 ),
195 })
196 } else {
197 object_keys.push(ObjectKey(*id, *version));
198 fetches.push((i, input));
199 }
200 }
201 }
202 }
203
204 let objects = self.cache.multi_get_objects_by_key(&object_keys);
205
206 assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
207
208 for (object, key, (index, input)) in izip!(
209 objects.into_iter(),
210 object_keys.into_iter(),
211 fetches.into_iter()
212 ) {
213 results[index] = Some(match (object, input) {
214 (Some(obj), InputObjectKind::SharedMoveObject { .. })
215 if obj.full_id() == input.full_object_id() =>
216 {
217 ObjectReadResult {
218 input_object_kind: *input,
219 object: obj.into(),
220 }
221 }
222 (_, InputObjectKind::SharedMoveObject { .. }) => {
223 assert!(key.1.is_valid());
224 let version = key.1;
227 if let Some(dependency) = self.cache.get_consensus_stream_end_tx_digest(
228 FullObjectKey::new(input.full_object_id(), version),
229 epoch_id,
230 ) {
231 ObjectReadResult {
232 input_object_kind: *input,
233 object: ObjectReadResultKind::ObjectConsensusStreamEnded(
234 version, dependency,
235 ),
236 }
237 } else {
238 panic!(
239 "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {:?}, version: {version} is absent in epoch {epoch_id}",
240 input.full_object_id()
241 );
242 }
243 }
244 (Some(obj), input_object_kind) => ObjectReadResult {
245 input_object_kind: *input_object_kind,
246 object: obj.into(),
247 },
248 _ => panic!(
249 "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
250 ),
251 });
252 }
253
254 Ok(results
255 .into_iter()
256 .map(Option::unwrap)
257 .collect::<Vec<_>>()
258 .into())
259 }
260}
261
262impl TransactionInputLoader {
264 fn read_receiving_objects_for_signing(
265 &self,
266 receiving_objects: &[ObjectRef],
267 epoch_id: EpochId,
268 ) -> SuiResult<ReceivingObjects> {
269 let mut receiving_results = Vec::with_capacity(receiving_objects.len());
270 for objref in receiving_objects {
271 let (object_id, version, _) = objref;
273
274 if self.cache.have_received_object_at_version(
276 FullObjectKey::new(FullObjectID::new(*object_id, None), *version),
277 epoch_id,
278 ) {
279 receiving_results.push(ReceivingObjectReadResult::new(
280 *objref,
281 ReceivingObjectReadResultKind::PreviouslyReceivedObject,
282 ));
283 continue;
284 }
285
286 let Some(object) = self.cache.get_object(object_id) else {
287 return Err(UserInputError::ObjectNotFound {
288 object_id: *object_id,
289 version: Some(*version),
290 }
291 .into());
292 };
293
294 receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
295 }
296 Ok(receiving_results.into())
297 }
298}