sui_surfer/
surfer_task.rs1use std::{collections::HashMap, sync::Arc};
5
6use rand::{Rng, SeedableRng, rngs::StdRng};
7use sui_core::authority::authority_store_tables::LiveObject;
8use sui_types::{
9 base_types::{ObjectRef, SuiAddress},
10 object::Owner,
11};
12use test_cluster::TestCluster;
13use tokio::sync::{RwLock, watch};
14
15use crate::{
16 surf_strategy::SurfStrategy,
17 surfer_state::{ImmObjects, OwnedObjects, SharedObjects, SurfStatistics, SurferState},
18};
19
20pub struct SurferTask {
21 pub state: SurferState,
22 pub surf_strategy: SurfStrategy,
23 pub exit_rcv: watch::Receiver<()>,
24}
25
26impl SurferTask {
27 pub async fn create_surfer_tasks(
28 cluster: Arc<TestCluster>,
29 seed: u64,
30 exit_rcv: watch::Receiver<()>,
31 skip_accounts: usize,
32 surf_strategy: SurfStrategy,
33 ) -> Vec<SurferTask> {
34 let mut rng = StdRng::seed_from_u64(seed);
35 let immutable_objects: ImmObjects = Arc::new(RwLock::new(HashMap::new()));
36 let shared_objects: SharedObjects = Arc::new(RwLock::new(HashMap::new()));
37
38 let mut accounts: HashMap<SuiAddress, (Option<ObjectRef>, OwnedObjects)> = cluster
39 .get_addresses()
40 .iter()
41 .skip(skip_accounts)
42 .map(|address| (*address, (None, HashMap::new())))
43 .collect();
44 let node = cluster
45 .swarm
46 .all_nodes()
47 .flat_map(|node| node.get_node_handle())
48 .next()
49 .unwrap();
50 let all_live_objects: Vec<_> = node.with(|node| {
51 node.state()
52 .get_global_state_hash_store()
53 .iter_cached_live_object_set_for_testing(false)
54 .collect()
55 });
56 for obj in all_live_objects {
57 match obj {
58 LiveObject::Normal(obj) => {
59 if let Some(struct_tag) = obj.struct_tag() {
60 let obj_ref = obj.compute_object_reference();
61 match obj.owner {
62 Owner::Immutable => {
63 immutable_objects
64 .write()
65 .await
66 .entry(struct_tag)
67 .or_default()
68 .push(obj_ref);
69 }
70 Owner::Shared {
71 initial_shared_version,
72 }
73 | Owner::ConsensusAddressOwner {
75 start_version: initial_shared_version,
76 ..
77 } => {
78 shared_objects
79 .write()
80 .await
81 .entry(struct_tag)
82 .or_default()
83 .push((obj_ref.0, initial_shared_version));
84 }
85 Owner::AddressOwner(address) => {
86 if let Some((gas_object, owned_objects)) =
87 accounts.get_mut(&address)
88 {
89 if obj.is_gas_coin() && gas_object.is_none() {
90 gas_object.replace(obj_ref);
91 } else {
92 owned_objects
93 .entry(struct_tag)
94 .or_default()
95 .insert(obj_ref);
96 }
97 }
98 }
99 Owner::ObjectOwner(_) => (),
100 }
101 }
102 }
103 LiveObject::Wrapped(_) => unreachable!("Explicitly skipped wrapped objects"),
104 }
105 }
106 let entry_functions = Arc::new(RwLock::new(vec![]));
107 accounts
108 .into_iter()
109 .enumerate()
110 .map(|(id, (address, (gas_object, owned_objects)))| {
111 let seed = rng.r#gen::<u64>();
112 let state_rng = StdRng::seed_from_u64(seed);
113 let state = SurferState::new(
114 id,
115 cluster.clone(),
116 state_rng,
117 address,
118 gas_object.unwrap(),
119 owned_objects,
120 immutable_objects.clone(),
121 shared_objects.clone(),
122 entry_functions.clone(),
123 );
124 SurferTask {
125 state,
126 surf_strategy: surf_strategy.clone(),
127 exit_rcv: exit_rcv.clone(),
128 }
129 })
130 .collect()
131 }
132
133 pub async fn surf(mut self) -> SurfStatistics {
134 loop {
135 let entry_functions = self.state.entry_functions.read().await.clone();
136
137 tokio::select! {
138 _ = self.surf_strategy
139 .surf_for_a_while(&mut self.state, entry_functions) => {
140 continue;
141 }
142
143 _ = self.exit_rcv.changed() => {
144 return self.state.stats;
145 }
146 }
147 }
148 }
149}