use std::collections::{BTreeMap, BTreeSet, HashMap};
use sui_types::base_types::TransactionDigest;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::effects::{InputSharedObject, TransactionEffects};
use sui_types::storage::ObjectKey;
use tracing::trace;
pub struct CausalOrder {
not_seen: BTreeMap<TransactionDigest, TransactionDependencies>,
output: Vec<TransactionEffects>,
}
impl CausalOrder {
pub fn causal_sort(effects: Vec<TransactionEffects>) -> Vec<TransactionEffects> {
let mut this = Self::from_vec(effects);
while let Some(item) = this.pop_first() {
this.insert(item);
}
this.into_list()
}
fn from_vec(effects: Vec<TransactionEffects>) -> Self {
let rwlock_builder = RWLockDependencyBuilder::from_effects(&effects);
let dependencies: Vec<_> = effects
.into_iter()
.map(|e| TransactionDependencies::from_effects(e, &rwlock_builder))
.collect();
let output = Vec::with_capacity(dependencies.len() * 2);
let not_seen = dependencies.into_iter().map(|e| (e.digest, e)).collect();
Self { not_seen, output }
}
fn pop_first(&mut self) -> Option<TransactionDependencies> {
let key = *self.not_seen.keys().next()?;
Some(self.not_seen.remove(&key).unwrap())
}
fn insert(&mut self, transaction: TransactionDependencies) {
let initial_state = InsertState::new(transaction);
let mut states = vec![initial_state];
while let Some(state) = states.last_mut() {
if let Some(new_state) = state.process(self) {
states.push(new_state);
} else {
states.pop().expect("Should contain an element");
}
}
}
fn into_list(self) -> Vec<TransactionEffects> {
self.output
}
}
struct TransactionDependencies {
digest: TransactionDigest,
dependencies: BTreeSet<TransactionDigest>,
effects: TransactionEffects,
}
impl TransactionDependencies {
fn from_effects(effects: TransactionEffects, rwlock_builder: &RWLockDependencyBuilder) -> Self {
let mut dependencies: BTreeSet<_> = effects.dependencies().iter().cloned().collect();
rwlock_builder.add_dependencies_for(*effects.transaction_digest(), &mut dependencies);
Self {
digest: *effects.transaction_digest(),
dependencies,
effects,
}
}
}
struct RWLockDependencyBuilder {
read_version: HashMap<ObjectKey, Vec<TransactionDigest>>,
overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>>,
}
impl RWLockDependencyBuilder {
pub fn from_effects(effects: &[TransactionEffects]) -> Self {
let mut read_version: HashMap<ObjectKey, Vec<TransactionDigest>> = Default::default();
let mut overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>> = Default::default();
for effect in effects {
for kind in effect.input_shared_objects() {
match kind {
InputSharedObject::ReadOnly(obj_ref) => {
let obj_key = obj_ref.into();
read_version
.entry(obj_key)
.or_default()
.push(*effect.transaction_digest());
}
InputSharedObject::Mutate(obj_ref) => {
let obj_key = obj_ref.into();
overwrite_versions
.entry(*effect.transaction_digest())
.or_default()
.push(obj_key);
}
InputSharedObject::ReadConsensusStreamEnded(oid, version) => read_version
.entry(ObjectKey(oid, version))
.or_default()
.push(*effect.transaction_digest()),
InputSharedObject::MutateConsensusStreamEnded(oid, version) => {
overwrite_versions
.entry(*effect.transaction_digest())
.or_default()
.push(ObjectKey(oid, version))
}
InputSharedObject::Cancelled(..) => (), }
}
}
Self {
read_version,
overwrite_versions,
}
}
pub fn add_dependencies_for(
&self,
digest: TransactionDigest,
v: &mut BTreeSet<TransactionDigest>,
) {
let Some(overwrites) = self.overwrite_versions.get(&digest) else {
return;
};
for obj_ver in overwrites {
let Some(reads) = self.read_version.get(obj_ver) else {
continue;
};
for dep in reads {
trace!(
"Assuming additional dependency when constructing checkpoint {:?} -> {:?}",
digest,
*dep
);
v.insert(*dep);
}
}
}
}
struct InsertState {
dependencies: Vec<TransactionDigest>,
transaction: Option<TransactionDependencies>,
}
impl InsertState {
pub fn new(transaction: TransactionDependencies) -> Self {
Self {
dependencies: transaction.dependencies.iter().cloned().collect(),
transaction: Some(transaction),
}
}
pub fn process(&mut self, causal_order: &mut CausalOrder) -> Option<InsertState> {
while let Some(dep) = self.dependencies.pop() {
if let Some(dep_transaction) = causal_order.not_seen.remove(&dep) {
return Some(InsertState::new(dep_transaction));
}
}
let transaction = self
.transaction
.take()
.expect("Can't use InsertState after it is finished");
causal_order.output.push(transaction.effects);
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use sui_types::base_types::ObjectDigest;
use sui_types::base_types::{ObjectID, SequenceNumber};
use sui_types::effects::TransactionEffects;
#[test]
pub fn test_causal_order() {
let e1 = e(d(1), vec![d(2), d(3)]);
let e2 = e(d(2), vec![d(3), d(4)]);
let e3 = e(d(3), vec![]);
let e4 = e(d(4), vec![]);
let r = extract(CausalOrder::causal_sort(vec![
e1.clone(),
e2,
e3,
e4.clone(),
]));
assert_eq!(r, vec![3, 4, 2, 1]);
let r = extract(CausalOrder::causal_sort(vec![e1.clone(), e4.clone()]));
assert_eq!(r, vec![1, 4]);
let r = extract(CausalOrder::causal_sort(vec![e4, e1]));
assert_eq!(r, vec![1, 4]);
}
#[test]
pub fn test_causal_order_rw_locks() {
let mut e5 = e(d(5), vec![]);
let mut e2 = e(d(2), vec![]);
let mut e3 = e(d(3), vec![]);
let obj_digest = ObjectDigest::new(Default::default());
e5.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
o(1),
SequenceNumber::from_u64(1),
obj_digest,
)));
e2.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
o(1),
SequenceNumber::from_u64(1),
obj_digest,
)));
e3.unsafe_add_input_shared_object_for_testing(InputSharedObject::Mutate((
o(1),
SequenceNumber::from_u64(1),
obj_digest,
)));
let r = extract(CausalOrder::causal_sort(vec![e5, e2, e3]));
assert_eq!(r.len(), 3);
assert_eq!(*r.get(2).unwrap(), 3); assert!(r.contains(&5));
assert!(r.contains(&2));
}
fn extract(e: Vec<TransactionEffects>) -> Vec<u8> {
e.into_iter()
.map(|e| e.transaction_digest().inner()[0])
.collect()
}
fn d(i: u8) -> TransactionDigest {
let mut bytes: [u8; 32] = Default::default();
bytes[0] = i;
TransactionDigest::new(bytes)
}
fn o(i: u8) -> ObjectID {
let mut bytes: [u8; ObjectID::LENGTH] = Default::default();
bytes[0] = i;
ObjectID::new(bytes)
}
fn e(
transaction_digest: TransactionDigest,
dependencies: Vec<TransactionDigest>,
) -> TransactionEffects {
let mut effects = TransactionEffects::default();
*effects.transaction_digest_mut_for_testing() = transaction_digest;
*effects.dependencies_mut_for_testing() = dependencies;
effects
}
}