sui_core/
transaction_input_loader.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    authority::{
6        authority_per_epoch_store::CertLockGuard, shared_object_version_manager::AssignedVersions,
7    },
8    execution_cache::ObjectCacheRead,
9};
10use itertools::izip;
11use std::collections::BTreeMap;
12use std::sync::Arc;
13use sui_types::{
14    base_types::{EpochId, FullObjectID, ObjectRef, TransactionDigest},
15    error::{SuiError, SuiResult, UserInputError},
16    storage::{FullObjectKey, ObjectKey},
17    transaction::{
18        InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
19        ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
20    },
21};
22use tracing::instrument;
23
24pub(crate) struct TransactionInputLoader {
25    cache: Arc<dyn ObjectCacheRead>,
26}
27
28impl TransactionInputLoader {
29    pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
30        Self { cache }
31    }
32}
33
34impl TransactionInputLoader {
35    /// Read the inputs for a transaction that the validator was asked to sign.
36    ///
37    /// tx_digest is provided so that the inputs can be cached with the tx_digest and returned with
38    /// a single hash map lookup when notify_read_objects_for_execution is called later.
39    /// TODO: implement this caching
40    #[instrument(level = "trace", skip_all)]
41    pub fn read_objects_for_signing(
42        &self,
43        _tx_digest_for_caching: Option<&TransactionDigest>,
44        input_object_kinds: &[InputObjectKind],
45        receiving_objects: &[ObjectRef],
46        epoch_id: EpochId,
47    ) -> SuiResult<(InputObjects, ReceivingObjects)> {
48        // Length of input_object_kinds have been checked via validity_check() for ProgrammableTransaction.
49        let mut input_results = vec![None; input_object_kinds.len()];
50        let mut object_refs = Vec::with_capacity(input_object_kinds.len());
51        let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
52
53        for (i, kind) in input_object_kinds.iter().enumerate() {
54            match kind {
55                // Packages are loaded one at a time via the cache
56                InputObjectKind::MovePackage(id) => {
57                    let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
58                        return Err(SuiError::from(kind.object_not_found_error()));
59                    };
60                    input_results[i] = Some(ObjectReadResult {
61                        input_object_kind: *kind,
62                        object: ObjectReadResultKind::Object(package),
63                    });
64                }
65                InputObjectKind::SharedMoveObject { .. } => {
66                    let input_full_id = kind.full_object_id();
67
68                    // Load the most current version from the cache.
69                    match self.cache.get_object(&kind.object_id()) {
70                        // If full ID matches, we're done.
71                        // (Full ID may not match if object was transferred in or out of
72                        // consensus. We have to double-check this because cache is keyed
73                        // on ObjectID and not FullObjectID.)
74                        Some(object) if object.full_id() == input_full_id => {
75                            input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
76                        }
77                        _ => {
78                            // If the full ID doesn't match, check if the object's consensus
79                            // stream was ended.
80                            if let Some((version, digest)) = self
81                                .cache
82                                .get_last_consensus_stream_end_info(input_full_id, epoch_id)
83                            {
84                                input_results[i] = Some(ObjectReadResult {
85                                    input_object_kind: *kind,
86                                    object: ObjectReadResultKind::ObjectConsensusStreamEnded(
87                                        version, digest,
88                                    ),
89                                });
90                            } else {
91                                return Err(SuiError::from(kind.object_not_found_error()));
92                            }
93                        }
94                    }
95                }
96                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
97                    object_refs.push(*objref);
98                    fetch_indices.push(i);
99                }
100            }
101        }
102
103        let objects = self
104            .cache
105            .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
106        assert_eq!(objects.len(), object_refs.len());
107        for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
108            input_results[index] = Some(ObjectReadResult {
109                input_object_kind: input_object_kinds[index],
110                object: ObjectReadResultKind::Object(object),
111            });
112        }
113
114        let receiving_results =
115            self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
116
117        Ok((
118            input_results
119                .into_iter()
120                .map(Option::unwrap)
121                .collect::<Vec<_>>()
122                .into(),
123            receiving_results,
124        ))
125    }
126
127    /// Read the inputs for a transaction that is ready to be executed.
128    ///
129    /// epoch_store is used to resolve the versions of any shared input objects.
130    ///
131    /// This function panics if any inputs are not available, as ExecutionScheduler should already
132    /// have verified that the transaction is ready to be executed.
133    ///
134    /// The tx_digest is provided here to support the following optimization (not yet implemented):
135    /// All the owned input objects will likely have been loaded during transaction signing, and
136    /// can be stored as a group with the transaction_digest as the key, allowing the lookup to
137    /// proceed with only a single hash map lookup. (additional lookups may be necessary for shared
138    /// inputs, since the versions are not known at signing time). Receiving objects could be
139    /// cached, but only with appropriate invalidation logic for when an object is received by a
140    /// different tx first.
141    #[instrument(level = "trace", skip_all)]
142    pub fn read_objects_for_execution(
143        &self,
144        tx_key: &TransactionKey,
145        _tx_lock: &CertLockGuard, // see below for why this is needed
146        input_object_kinds: &[InputObjectKind],
147        assigned_shared_object_versions: &AssignedVersions,
148        epoch_id: EpochId,
149    ) -> SuiResult<InputObjects> {
150        let assigned_shared_versions: BTreeMap<_, _> = assigned_shared_object_versions
151            .iter()
152            .map(|((id, initial_shared_version), version)| {
153                ((*id, *initial_shared_version), *version)
154            })
155            .collect();
156
157        let mut results = vec![None; input_object_kinds.len()];
158        let mut object_keys = Vec::with_capacity(input_object_kinds.len());
159        let mut fetches = Vec::with_capacity(input_object_kinds.len());
160
161        for (i, input) in input_object_kinds.iter().enumerate() {
162            match input {
163                InputObjectKind::MovePackage(id) => {
164                    let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
165                        panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
166                    });
167
168                    results[i] = Some(ObjectReadResult {
169                        input_object_kind: *input,
170                        object: ObjectReadResultKind::Object(package.into()),
171                    });
172                    continue;
173                }
174                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
175                    object_keys.push(objref.into());
176                    fetches.push((i, input));
177                }
178                InputObjectKind::SharedMoveObject {
179                    id,
180                    initial_shared_version,
181                    ..
182                } => {
183                    // If we find a set of assigned versions but one object's version assignments
184                    // are missing from the set, it indicates a serious inconsistency:
185                    let version = assigned_shared_versions.get(&(*id, *initial_shared_version)).unwrap_or_else(|| {
186                        panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
187                    });
188                    if version.is_cancelled() {
189                        // Do not need to fetch shared object for cancelled transaction.
190                        results[i] = Some(ObjectReadResult {
191                            input_object_kind: *input,
192                            object: ObjectReadResultKind::CancelledTransactionSharedObject(
193                                *version,
194                            ),
195                        })
196                    } else {
197                        object_keys.push(ObjectKey(*id, *version));
198                        fetches.push((i, input));
199                    }
200                }
201            }
202        }
203
204        let objects = self.cache.multi_get_objects_by_key(&object_keys);
205
206        assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
207
208        for (object, key, (index, input)) in izip!(
209            objects.into_iter(),
210            object_keys.into_iter(),
211            fetches.into_iter()
212        ) {
213            results[index] = Some(match (object, input) {
214                (Some(obj), InputObjectKind::SharedMoveObject { .. })
215                    if obj.full_id() == input.full_object_id() =>
216                {
217                    ObjectReadResult {
218                        input_object_kind: *input,
219                        object: obj.into(),
220                    }
221                }
222                (_, InputObjectKind::SharedMoveObject { .. }) => {
223                    assert!(key.1.is_valid());
224                    // If the full ID on a shared input doesn't match, check if the object
225                    // was removed from consensus by a concurrently certified tx.
226                    let version = key.1;
227                    if let Some(dependency) = self.cache.get_consensus_stream_end_tx_digest(
228                        FullObjectKey::new(input.full_object_id(), version),
229                        epoch_id,
230                    ) {
231                        ObjectReadResult {
232                            input_object_kind: *input,
233                            object: ObjectReadResultKind::ObjectConsensusStreamEnded(
234                                version, dependency,
235                            ),
236                        }
237                    } else {
238                        panic!(
239                            "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {:?}, version: {version} is absent in epoch {epoch_id}",
240                            input.full_object_id()
241                        );
242                    }
243                }
244                (Some(obj), input_object_kind) => ObjectReadResult {
245                    input_object_kind: *input_object_kind,
246                    object: obj.into(),
247                },
248                _ => panic!(
249                    "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
250                ),
251            });
252        }
253
254        Ok(results
255            .into_iter()
256            .map(Option::unwrap)
257            .collect::<Vec<_>>()
258            .into())
259    }
260}
261
262// private methods
263impl TransactionInputLoader {
264    fn read_receiving_objects_for_signing(
265        &self,
266        receiving_objects: &[ObjectRef],
267        epoch_id: EpochId,
268    ) -> SuiResult<ReceivingObjects> {
269        let mut receiving_results = Vec::with_capacity(receiving_objects.len());
270        for objref in receiving_objects {
271            // Note: the digest is checked later in check_transaction_input
272            let (object_id, version, _) = objref;
273
274            // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
275            if self.cache.have_received_object_at_version(
276                FullObjectKey::new(FullObjectID::new(*object_id, None), *version),
277                epoch_id,
278            ) {
279                receiving_results.push(ReceivingObjectReadResult::new(
280                    *objref,
281                    ReceivingObjectReadResultKind::PreviouslyReceivedObject,
282                ));
283                continue;
284            }
285
286            let Some(object) = self.cache.get_object(object_id) else {
287                return Err(UserInputError::ObjectNotFound {
288                    object_id: *object_id,
289                    version: Some(*version),
290                }
291                .into());
292            };
293
294            receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
295        }
296        Ok(receiving_results.into())
297    }
298}