sui_surfer/
surfer_task.rs1use std::{collections::HashMap, sync::Arc};
5
6use rand::{Rng, SeedableRng, rngs::StdRng};
7use sui_core::authority::authority_store_tables::LiveObject;
8use sui_types::{
9 base_types::{ObjectRef, SuiAddress},
10 object::Owner,
11};
12use test_cluster::TestCluster;
13use tokio::sync::{RwLock, watch};
14
15use crate::{
16 surf_strategy::SurfStrategy,
17 surfer_state::{ImmObjects, OwnedObjects, SharedObjects, SurfStatistics, SurferState},
18};
19
20pub struct SurferTask {
21 pub state: SurferState,
22 pub surf_strategy: SurfStrategy,
23 pub exit_rcv: watch::Receiver<()>,
24}
25
26impl SurferTask {
27 pub async fn create_surfer_tasks(
28 cluster: Arc<TestCluster>,
29 seed: u64,
30 exit_rcv: watch::Receiver<()>,
31 skip_accounts: usize,
32 surf_strategy: SurfStrategy,
33 ) -> Vec<SurferTask> {
34 let mut rng = StdRng::seed_from_u64(seed);
35 let immutable_objects: ImmObjects = Arc::new(RwLock::new(HashMap::new()));
36 let shared_objects: SharedObjects = Arc::new(RwLock::new(HashMap::new()));
37
38 let mut accounts: HashMap<SuiAddress, (Option<ObjectRef>, OwnedObjects)> = cluster
39 .get_addresses()
40 .iter()
41 .skip(skip_accounts)
42 .map(|address| (*address, (None, HashMap::new())))
43 .collect();
44 let node = cluster
45 .swarm
46 .all_nodes()
47 .flat_map(|node| node.get_node_handle())
48 .next()
49 .unwrap();
50 let all_live_objects: Vec<_> = node.with(|node| {
51 node.state()
52 .get_global_state_hash_store()
53 .iter_cached_live_object_set_for_testing(false)
54 .collect()
55 });
56 for obj in all_live_objects {
57 match obj {
58 LiveObject::Normal(obj) => {
59 if let Some(struct_tag) = obj.struct_tag() {
60 let obj_ref = obj.compute_object_reference();
61 match obj.owner {
62 Owner::Immutable => {
63 immutable_objects
64 .write()
65 .await
66 .entry(struct_tag)
67 .or_default()
68 .push(obj_ref);
69 }
70 Owner::Shared {
72 initial_shared_version,
73 }
74 | Owner::ConsensusAddressOwner {
75 start_version: initial_shared_version,
76 ..
77 }
78 | Owner::Party {
79 start_version: initial_shared_version,
80 ..
81 } => {
82 shared_objects
83 .write()
84 .await
85 .entry(struct_tag)
86 .or_default()
87 .push((obj_ref.0, initial_shared_version));
88 }
89 Owner::AddressOwner(address) => {
90 if let Some((gas_object, owned_objects)) =
91 accounts.get_mut(&address)
92 {
93 if obj.is_gas_coin() && gas_object.is_none() {
94 gas_object.replace(obj_ref);
95 } else {
96 owned_objects
97 .entry(struct_tag)
98 .or_default()
99 .insert(obj_ref);
100 }
101 }
102 }
103 Owner::ObjectOwner(_) => (),
104 }
105 }
106 }
107 LiveObject::Wrapped(_) => unreachable!("Explicitly skipped wrapped objects"),
108 }
109 }
110 let entry_functions = Arc::new(RwLock::new(vec![]));
111 accounts
112 .into_iter()
113 .enumerate()
114 .map(|(id, (address, (gas_object, owned_objects)))| {
115 let seed = rng.r#gen::<u64>();
116 let state_rng = StdRng::seed_from_u64(seed);
117 let state = SurferState::new(
118 id,
119 cluster.clone(),
120 state_rng,
121 address,
122 gas_object.unwrap(),
123 owned_objects,
124 immutable_objects.clone(),
125 shared_objects.clone(),
126 entry_functions.clone(),
127 );
128 SurferTask {
129 state,
130 surf_strategy: surf_strategy.clone(),
131 exit_rcv: exit_rcv.clone(),
132 }
133 })
134 .collect()
135 }
136
137 pub async fn surf(mut self) -> SurfStatistics {
138 loop {
139 let entry_functions = self.state.entry_functions.read().await.clone();
140
141 tokio::select! {
142 _ = self.surf_strategy
143 .surf_for_a_while(&mut self.state, entry_functions) => {
144 continue;
145 }
146
147 _ = self.exit_rcv.changed() => {
148 return self.state.stats;
149 }
150 }
151 }
152 }
153}