sui_core/
transaction_outputs.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use parking_lot::Mutex;
5use std::collections::{BTreeMap, HashSet};
6use std::sync::Arc;
7use sui_types::accumulator_event::AccumulatorEvent;
8use sui_types::base_types::{FullObjectID, ObjectRef};
9use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
10use sui_types::full_checkpoint_content::ObjectSet;
11use sui_types::inner_temporary_store::{InnerTemporaryStore, WrittenObjects};
12use sui_types::storage::{FullObjectKey, InputKey, MarkerValue, ObjectKey};
13use sui_types::transaction::{TransactionData, TransactionDataAPI, VerifiedTransaction};
14
15/// TransactionOutputs
16#[derive(Debug)]
17pub struct TransactionOutputs {
18    pub transaction: Arc<VerifiedTransaction>,
19    pub effects: TransactionEffects,
20    pub events: TransactionEvents,
21    pub unchanged_loaded_runtime_objects: Vec<ObjectKey>,
22    pub accumulator_events: Mutex<Vec<AccumulatorEvent>>,
23
24    pub markers: Vec<(FullObjectKey, MarkerValue)>,
25    pub wrapped: Vec<ObjectKey>,
26    pub deleted: Vec<ObjectKey>,
27    pub locks_to_delete: Vec<ObjectRef>,
28    pub new_locks_to_init: Vec<ObjectRef>,
29    pub written: WrittenObjects,
30
31    // Temporarily needed to notify TxManager about the availability of objects.
32    // TODO: Remove this once we ship the new ExecutionScheduler.
33    pub output_keys: Vec<InputKey>,
34}
35
36impl TransactionOutputs {
37    // Convert InnerTemporaryStore + Effects into the exact set of updates to the store
38    pub fn build_transaction_outputs(
39        transaction: VerifiedTransaction,
40        effects: TransactionEffects,
41        inner_temporary_store: InnerTemporaryStore,
42        unchanged_loaded_runtime_objects: Vec<ObjectKey>,
43    ) -> TransactionOutputs {
44        let output_keys = inner_temporary_store.get_output_keys(&effects);
45
46        let InnerTemporaryStore {
47            input_objects,
48            stream_ended_consensus_objects,
49            mutable_inputs,
50            written,
51            events,
52            accumulator_events,
53            loaded_runtime_objects: _,
54            binary_config: _,
55            runtime_packages_loaded_from_db: _,
56            lamport_version,
57        } = inner_temporary_store;
58
59        let tx_digest = *transaction.digest();
60
61        // Get the actual set of objects that have been received -- any received
62        // object will show up in the modified-at set.
63        let modified_at: HashSet<_> = effects.modified_at_versions().into_iter().collect();
64        let possible_to_receive = transaction.transaction_data().receiving_objects();
65        let received_objects = possible_to_receive
66            .into_iter()
67            .filter(|obj_ref| modified_at.contains(&(obj_ref.0, obj_ref.1)));
68
69        // We record any received or deleted objects since they could be pruned, and smear object
70        // removals from consensus in the marker table. For deleted entries in the marker table we
71        // need to make sure we don't accidentally overwrite entries.
72        let markers: Vec<_> = {
73            let received = received_objects.clone().map(|objref| {
74                (
75                    // TODO: Add support for receiving consensus objects. For now this assumes fastpath.
76                    FullObjectKey::new(FullObjectID::new(objref.0, None), objref.1),
77                    MarkerValue::Received,
78                )
79            });
80
81            let tombstones = effects
82                .all_tombstones()
83                .into_iter()
84                .map(|(object_id, version)| {
85                    let consensus_key = input_objects
86                        .get(&object_id)
87                        .filter(|o| o.is_consensus())
88                        .map(|o| FullObjectKey::new(o.full_id(), version));
89                    if let Some(consensus_key) = consensus_key {
90                        (consensus_key, MarkerValue::ConsensusStreamEnded(tx_digest))
91                    } else {
92                        (
93                            FullObjectKey::new(FullObjectID::new(object_id, None), version),
94                            MarkerValue::FastpathStreamEnded,
95                        )
96                    }
97                });
98
99            let fastpath_stream_ended =
100                effects
101                    .transferred_to_consensus()
102                    .into_iter()
103                    .map(|(object_id, version, _)| {
104                        // Note: it's a bit of a misnomer to mark an object as `FastpathStreamEnded`
105                        // when it could have been transferred to consensus from `ObjectOwner`, as
106                        // its root owner may not have been a fastpath object. However, whether or
107                        // not it was technically in the fastpath at the version the marker is
108                        // written, it certainly is not in the fastpath *anymore*. This is needed
109                        // to produce the required behavior in `ObjectCacheRead::multi_input_objects_available`
110                        // when checking whether receiving objects are available.
111                        (
112                            FullObjectKey::new(FullObjectID::new(object_id, None), version),
113                            MarkerValue::FastpathStreamEnded,
114                        )
115                    });
116
117            let consensus_stream_ended = effects
118                .transferred_from_consensus()
119                .into_iter()
120                .chain(effects.consensus_owner_changed())
121                .map(|(object_id, version, _)| {
122                    let object = input_objects
123                        .get(&object_id)
124                        .expect("stream-ended object must be in input_objects");
125                    (
126                        FullObjectKey::new(object.full_id(), version),
127                        MarkerValue::ConsensusStreamEnded(tx_digest),
128                    )
129                });
130
131            // We "smear" removed consensus objects in the marker table to allow for proper
132            // sequencing of transactions that are submitted after the consensus stream ends.
133            // This means writing duplicate copies of the `ConsensusStreamEnded` marker for
134            // every output version that was scheduled to be created.
135            // NB: that we do _not_ smear objects that were taken immutably in the transaction
136            // (because these are not assigned output versions).
137            let smeared_objects = effects.stream_ended_mutably_accessed_consensus_objects();
138            let consensus_smears = smeared_objects.into_iter().map(|object_id| {
139                let id = input_objects
140                    .get(&object_id)
141                    .map(|obj| obj.full_id())
142                    .unwrap_or_else(|| {
143                        let start_version = stream_ended_consensus_objects.get(&object_id)
144                            .expect("stream-ended object must be in either input_objects or stream_ended_consensus_objects");
145                        FullObjectID::new(object_id, Some(*start_version))
146                    });
147                (
148                    FullObjectKey::new(id, lamport_version),
149                    MarkerValue::ConsensusStreamEnded(tx_digest),
150                )
151            });
152
153            received
154                .chain(tombstones)
155                .chain(fastpath_stream_ended)
156                .chain(consensus_stream_ended)
157                .chain(consensus_smears)
158                .collect()
159        };
160
161        let locks_to_delete: Vec<_> = mutable_inputs
162            .into_iter()
163            .filter_map(|(id, ((version, digest), owner))| {
164                owner.is_address_owned().then_some((id, version, digest))
165            })
166            .chain(received_objects)
167            .collect();
168
169        let new_locks_to_init: Vec<_> = written
170            .values()
171            .filter_map(|new_object| {
172                if new_object.is_address_owned() {
173                    Some(new_object.compute_object_reference())
174                } else {
175                    None
176                }
177            })
178            .collect();
179
180        let deleted = effects
181            .deleted()
182            .into_iter()
183            .chain(effects.unwrapped_then_deleted())
184            .map(ObjectKey::from)
185            .collect();
186
187        let wrapped = effects.wrapped().into_iter().map(ObjectKey::from).collect();
188
189        TransactionOutputs {
190            transaction: Arc::new(transaction),
191            effects,
192            events,
193            unchanged_loaded_runtime_objects,
194            accumulator_events: Mutex::new(accumulator_events),
195            markers,
196            wrapped,
197            deleted,
198            locks_to_delete,
199            new_locks_to_init,
200            written,
201            output_keys,
202        }
203    }
204
205    pub fn take_accumulator_events(&self) -> Vec<AccumulatorEvent> {
206        std::mem::take(&mut *self.accumulator_events.lock())
207    }
208
209    #[cfg(test)]
210    pub fn new_for_testing(transaction: VerifiedTransaction, effects: TransactionEffects) -> Self {
211        Self {
212            transaction: Arc::new(transaction),
213            effects,
214            events: TransactionEvents { data: vec![] },
215            unchanged_loaded_runtime_objects: vec![],
216            accumulator_events: Default::default(),
217            markers: vec![],
218            wrapped: vec![],
219            deleted: vec![],
220            locks_to_delete: vec![],
221            new_locks_to_init: vec![],
222            written: WrittenObjects::new(),
223            output_keys: vec![],
224        }
225    }
226}
227
228pub fn unchanged_loaded_runtime_objects(
229    _transaction: &TransactionData,
230    effects: &TransactionEffects,
231    loaded_runtime_objects: &ObjectSet,
232) -> Vec<ObjectKey> {
233    let mut unchanged_loaded_runtime_objects: BTreeMap<_, _> = loaded_runtime_objects
234        .iter()
235        // Don't include loaded packages (which are used for doing UID tracking inside the VM)
236        .filter(|o| !o.is_package())
237        .map(|o| (o.id(), o.version()))
238        .collect();
239
240    // Remove any object that is referenced in the changed objects effects set since it would be
241    // redundent to include it again.
242    for change in effects.object_changes() {
243        unchanged_loaded_runtime_objects.remove(&change.id);
244    }
245
246    unchanged_loaded_runtime_objects
247        .into_iter()
248        .map(|(id, v)| ObjectKey(id, v))
249        .collect()
250}