use parking_lot::Mutex;
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use sui_types::accumulator_event::AccumulatorEvent;
use sui_types::base_types::{FullObjectID, ObjectRef};
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents};
use sui_types::full_checkpoint_content::ObjectSet;
use sui_types::inner_temporary_store::{InnerTemporaryStore, WrittenObjects};
use sui_types::storage::{FullObjectKey, InputKey, MarkerValue, ObjectKey};
use sui_types::transaction::{TransactionData, TransactionDataAPI, VerifiedTransaction};
#[derive(Debug)]
pub struct TransactionOutputs {
pub transaction: Arc<VerifiedTransaction>,
pub effects: TransactionEffects,
pub events: TransactionEvents,
pub unchanged_loaded_runtime_objects: Vec<ObjectKey>,
pub accumulator_events: Mutex<Vec<AccumulatorEvent>>,
pub markers: Vec<(FullObjectKey, MarkerValue)>,
pub wrapped: Vec<ObjectKey>,
pub deleted: Vec<ObjectKey>,
pub locks_to_delete: Vec<ObjectRef>,
pub new_locks_to_init: Vec<ObjectRef>,
pub written: WrittenObjects,
pub output_keys: Vec<InputKey>,
}
impl TransactionOutputs {
pub fn build_transaction_outputs(
transaction: VerifiedTransaction,
effects: TransactionEffects,
inner_temporary_store: InnerTemporaryStore,
unchanged_loaded_runtime_objects: Vec<ObjectKey>,
) -> TransactionOutputs {
let output_keys = inner_temporary_store.get_output_keys(&effects);
let InnerTemporaryStore {
input_objects,
stream_ended_consensus_objects,
mutable_inputs,
written,
events,
accumulator_events,
loaded_runtime_objects: _,
binary_config: _,
runtime_packages_loaded_from_db: _,
lamport_version,
} = inner_temporary_store;
let tx_digest = *transaction.digest();
let modified_at: HashSet<_> = effects.modified_at_versions().into_iter().collect();
let possible_to_receive = transaction.transaction_data().receiving_objects();
let received_objects = possible_to_receive
.into_iter()
.filter(|obj_ref| modified_at.contains(&(obj_ref.0, obj_ref.1)));
let markers: Vec<_> = {
let received = received_objects.clone().map(|objref| {
(
FullObjectKey::new(FullObjectID::new(objref.0, None), objref.1),
MarkerValue::Received,
)
});
let tombstones = effects
.all_tombstones()
.into_iter()
.map(|(object_id, version)| {
let consensus_key = input_objects
.get(&object_id)
.filter(|o| o.is_consensus())
.map(|o| FullObjectKey::new(o.full_id(), version));
if let Some(consensus_key) = consensus_key {
(consensus_key, MarkerValue::ConsensusStreamEnded(tx_digest))
} else {
(
FullObjectKey::new(FullObjectID::new(object_id, None), version),
MarkerValue::FastpathStreamEnded,
)
}
});
let fastpath_stream_ended =
effects
.transferred_to_consensus()
.into_iter()
.map(|(object_id, version, _)| {
(
FullObjectKey::new(FullObjectID::new(object_id, None), version),
MarkerValue::FastpathStreamEnded,
)
});
let consensus_stream_ended = effects
.transferred_from_consensus()
.into_iter()
.chain(effects.consensus_owner_changed())
.map(|(object_id, version, _)| {
let object = input_objects
.get(&object_id)
.expect("stream-ended object must be in input_objects");
(
FullObjectKey::new(object.full_id(), version),
MarkerValue::ConsensusStreamEnded(tx_digest),
)
});
let smeared_objects = effects.stream_ended_mutably_accessed_consensus_objects();
let consensus_smears = smeared_objects.into_iter().map(|object_id| {
let id = input_objects
.get(&object_id)
.map(|obj| obj.full_id())
.unwrap_or_else(|| {
let start_version = stream_ended_consensus_objects.get(&object_id)
.expect("stream-ended object must be in either input_objects or stream_ended_consensus_objects");
FullObjectID::new(object_id, Some(*start_version))
});
(
FullObjectKey::new(id, lamport_version),
MarkerValue::ConsensusStreamEnded(tx_digest),
)
});
received
.chain(tombstones)
.chain(fastpath_stream_ended)
.chain(consensus_stream_ended)
.chain(consensus_smears)
.collect()
};
let locks_to_delete: Vec<_> = mutable_inputs
.into_iter()
.filter_map(|(id, ((version, digest), owner))| {
owner.is_address_owned().then_some((id, version, digest))
})
.chain(received_objects)
.collect();
let new_locks_to_init: Vec<_> = written
.values()
.filter_map(|new_object| {
if new_object.is_address_owned() {
Some(new_object.compute_object_reference())
} else {
None
}
})
.collect();
let deleted = effects
.deleted()
.into_iter()
.chain(effects.unwrapped_then_deleted())
.map(ObjectKey::from)
.collect();
let wrapped = effects.wrapped().into_iter().map(ObjectKey::from).collect();
TransactionOutputs {
transaction: Arc::new(transaction),
effects,
events,
unchanged_loaded_runtime_objects,
accumulator_events: Mutex::new(accumulator_events),
markers,
wrapped,
deleted,
locks_to_delete,
new_locks_to_init,
written,
output_keys,
}
}
pub fn take_accumulator_events(&self) -> Vec<AccumulatorEvent> {
std::mem::take(&mut *self.accumulator_events.lock())
}
#[cfg(test)]
pub fn new_for_testing(transaction: VerifiedTransaction, effects: TransactionEffects) -> Self {
Self {
transaction: Arc::new(transaction),
effects,
events: TransactionEvents { data: vec![] },
unchanged_loaded_runtime_objects: vec![],
accumulator_events: Default::default(),
markers: vec![],
wrapped: vec![],
deleted: vec![],
locks_to_delete: vec![],
new_locks_to_init: vec![],
written: WrittenObjects::new(),
output_keys: vec![],
}
}
}
pub fn unchanged_loaded_runtime_objects(
_transaction: &TransactionData,
effects: &TransactionEffects,
loaded_runtime_objects: &ObjectSet,
) -> Vec<ObjectKey> {
let mut unchanged_loaded_runtime_objects: BTreeMap<_, _> = loaded_runtime_objects
.iter()
.filter(|o| !o.is_package())
.map(|o| (o.id(), o.version()))
.collect();
for change in effects.object_changes() {
unchanged_loaded_runtime_objects.remove(&change.id);
}
unchanged_loaded_runtime_objects
.into_iter()
.map(|(id, v)| ObjectKey(id, v))
.collect()
}