1use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use rand::rngs::StdRng;
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_json_rpc_types::{SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI};
15use sui_move_build::BuildConfig;
16use sui_protocol_config::{Chain, ProtocolConfig};
17use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
18use sui_types::object::{Object, Owner};
19use sui_types::storage::WriteKind;
20use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
21use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
22use test_cluster::TestCluster;
23use tokio::sync::RwLock;
24use tracing::{debug, error, info};
25
26type Type = normalized::Type<normalized::ArcIdentifier>;
27
28#[derive(Debug, Clone)]
29pub struct EntryFunction {
30 pub package: ObjectID,
31 pub module: String,
32 pub function: String,
33 pub parameters: Vec<Type>,
34}
35
36#[derive(Debug, Default)]
37pub struct SurfStatistics {
38 pub num_successful_transactions: u64,
39 pub num_failed_transactions: u64,
40 pub num_owned_obj_transactions: u64,
41 pub num_shared_obj_transactions: u64,
42 pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
43}
44
45impl SurfStatistics {
46 pub fn record_transaction(
47 &mut self,
48 has_shared_object: bool,
49 tx_succeeded: bool,
50 package: ObjectID,
51 module: String,
52 function: String,
53 ) {
54 if tx_succeeded {
55 self.num_successful_transactions += 1;
56 } else {
57 self.num_failed_transactions += 1;
58 }
59 if has_shared_object {
60 self.num_shared_obj_transactions += 1;
61 } else {
62 self.num_owned_obj_transactions += 1;
63 }
64 self.unique_move_functions_called
65 .insert((package, module, function));
66 }
67
68 pub fn aggregate(stats: Vec<Self>) -> Self {
69 let mut result = Self::default();
70 for stat in stats {
71 result.num_successful_transactions += stat.num_successful_transactions;
72 result.num_failed_transactions += stat.num_failed_transactions;
73 result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
74 result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
75 result
76 .unique_move_functions_called
77 .extend(stat.unique_move_functions_called);
78 }
79 result
80 }
81
82 pub fn print_stats(&self) {
83 info!(
84 "Executed {} transactions, {} succeeded, {} failed",
85 self.num_successful_transactions + self.num_failed_transactions,
86 self.num_successful_transactions,
87 self.num_failed_transactions
88 );
89 info!(
90 "{} are owned object transactions, {} are shared object transactions",
91 self.num_owned_obj_transactions, self.num_shared_obj_transactions
92 );
93 info!(
94 "Unique move functions called: {}",
95 self.unique_move_functions_called.len()
96 );
97 }
98}
99
100pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
101
102pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
103
104pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
107
108pub struct SurferState {
109 pub pool: Arc<RwLock<normalized::ArcPool>>,
110 pub id: usize,
111 pub cluster: Arc<TestCluster>,
112 pub rng: StdRng,
113
114 pub address: SuiAddress,
115 pub gas_object: ObjectRef,
116 pub owned_objects: OwnedObjects,
117 pub immutable_objects: ImmObjects,
118 pub shared_objects: SharedObjects,
119 pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
120
121 pub stats: SurfStatistics,
122}
123
124impl SurferState {
125 pub fn new(
126 id: usize,
127 cluster: Arc<TestCluster>,
128 rng: StdRng,
129 address: SuiAddress,
130 gas_object: ObjectRef,
131 owned_objects: OwnedObjects,
132 immutable_objects: ImmObjects,
133 shared_objects: SharedObjects,
134 entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
135 ) -> Self {
136 Self {
137 pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
138 id,
139 cluster,
140 rng,
141 address,
142 gas_object,
143 owned_objects,
144 immutable_objects,
145 shared_objects,
146 entry_functions,
147 stats: Default::default(),
148 }
149 }
150
151 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
152 pub async fn execute_move_transaction(
153 &mut self,
154 package: ObjectID,
155 module: String,
156 function: String,
157 args: Vec<CallArg>,
158 ) {
159 let rgp = self.cluster.get_reference_gas_price().await;
160 let use_shared_object = args
161 .iter()
162 .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
163 let tx_data = TransactionData::new_move_call(
164 self.address,
165 package,
166 Identifier::new(module.as_str()).unwrap(),
167 Identifier::new(function.as_str()).unwrap(),
168 vec![],
169 self.gas_object,
170 args,
171 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
172 rgp,
173 )
174 .unwrap();
175 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
176 let response = loop {
177 debug!("Executing transaction {:?}", tx.digest());
178 match self
179 .cluster
180 .wallet
181 .execute_transaction_may_fail(tx.clone())
182 .await
183 {
184 Ok(effects) => break effects,
185 Err(e) => {
186 error!("Error executing transaction {:?}: {e:?}", tx.digest());
187 tokio::time::sleep(Duration::from_secs(1)).await;
188 }
189 }
190 };
191 debug!(
192 "Successfully executed transaction {:?} with response {:?}",
193 tx, response
194 );
195 let effects = response.effects.unwrap();
196 info!(
197 "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
198 self.address,
199 module,
200 function,
201 effects.status()
202 );
203 self.stats.record_transaction(
204 use_shared_object,
205 effects.status().is_ok(),
206 package,
207 module,
208 function,
209 );
210 self.process_tx_effects(&effects).await;
211 }
212
213 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
214 async fn process_tx_effects(&mut self, effects: &SuiTransactionBlockEffects) {
215 for (owned_ref, write_kind) in effects.all_changed_objects() {
216 if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
217 continue;
221 }
222 let obj_ref = owned_ref.reference.to_object_ref();
223 let object = self
224 .cluster
225 .get_object_from_fullnode_store(&obj_ref.0)
226 .await
227 .unwrap();
228 if object.is_package() {
229 self.discover_entry_functions(object).await;
230 continue;
231 }
232 let struct_tag = object.struct_tag().unwrap();
233 match owned_ref.owner {
234 Owner::Immutable => {
235 self.immutable_objects
236 .write()
237 .await
238 .entry(struct_tag)
239 .or_default()
240 .push(obj_ref);
241 }
242 Owner::AddressOwner(address) => {
243 if address == self.address {
244 self.owned_objects
245 .entry(struct_tag)
246 .or_default()
247 .insert(obj_ref);
248 }
249 }
250 Owner::ObjectOwner(_) => (),
251 Owner::Shared {
252 initial_shared_version,
253 }
254 | Owner::ConsensusAddressOwner {
256 start_version: initial_shared_version,
257 ..
258 } => {
259 if write_kind != WriteKind::Mutate {
260 self.shared_objects
261 .write()
262 .await
263 .entry(struct_tag)
264 .or_default()
265 .push((obj_ref.0, initial_shared_version));
266 }
267 }
270 }
271 if obj_ref.0 == self.gas_object.0 {
272 self.gas_object = obj_ref;
273 }
274 }
275 }
276
277 async fn discover_entry_functions(&self, package: Object) {
278 let package_id = package.id();
279 let move_package = package.into_inner().data.try_into_package().unwrap();
280 let proto_version = self.cluster.highest_protocol_version();
281 let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
282 let binary_config = config.binary_config(None);
283 let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
284 let entry_functions: Vec<_> = move_package
285 .normalize(pool, &binary_config, false)
286 .unwrap()
287 .into_iter()
288 .flat_map(|(module_name, module)| {
289 module
290 .functions
291 .into_iter()
292 .filter_map(|(func_name, func)| {
293 if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
295 return None;
296 }
297 if !func.return_.is_empty() {
299 return None;
300 }
301 if !func.type_parameters.is_empty() {
303 return None;
304 }
305 let mut parameters = (*func.parameters).clone();
306 if let Some(last_param) = parameters.last().as_ref()
307 && is_type_tx_context(last_param)
308 {
309 parameters.pop();
310 }
311 Some(EntryFunction {
312 package: package_id,
313 module: module_name.clone(),
314 function: func_name.to_string(),
315 parameters: parameters
316 .into_iter()
317 .map(|rc_ty| (*rc_ty).clone())
318 .collect(),
319 })
320 })
321 .collect::<Vec<_>>()
322 })
323 .collect();
324 info!(
325 "Number of entry functions discovered: {:?}",
326 entry_functions.len()
327 );
328 debug!("Entry functions: {:?}", entry_functions);
329 self.entry_functions.write().await.extend(entry_functions);
330 }
331
332 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
333 pub async fn publish_package(&mut self, path: &Path) {
334 let rgp = self.cluster.get_reference_gas_price().await;
335 let package = BuildConfig::new_for_testing().build(path).unwrap();
336 let modules = package.get_package_bytes(false);
337 let tx_data = TransactionData::new_module(
338 self.address,
339 self.gas_object,
340 modules,
341 package.dependency_ids.published.values().cloned().collect(),
342 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
343 rgp,
344 );
345 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
346 let response = loop {
347 match self
348 .cluster
349 .wallet
350 .execute_transaction_may_fail(tx.clone())
351 .await
352 {
353 Ok(response) => {
354 break response;
355 }
356 Err(err) => {
357 error!("Failed to publish package: {:?}", err);
358 tokio::time::sleep(Duration::from_secs(1)).await;
359 }
360 }
361 };
362 info!("Successfully published package in {:?}", path);
363 self.process_tx_effects(&response.effects.unwrap()).await;
364 }
365
366 pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
367 self.owned_objects
368 .get(type_tag)
369 .map(|objects| objects.len())
370 .unwrap_or(0)
371 }
372
373 pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
374 self.immutable_objects
375 .read()
376 .await
377 .get(type_tag)
378 .map(|objects| objects.len())
379 .unwrap_or(0)
380 }
381
382 pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
383 self.shared_objects
384 .read()
385 .await
386 .get(type_tag)
387 .map(|objects| objects.len())
388 .unwrap_or(0)
389 }
390
391 pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
392 self.owned_objects
393 .get_mut(type_tag)
394 .unwrap()
395 .swap_remove_index(n)
396 .unwrap()
397 }
398
399 pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
400 self.immutable_objects.read().await.get(type_tag).unwrap()[n]
401 }
402
403 pub async fn choose_nth_shared_object(
404 &self,
405 type_tag: &StructTag,
406 n: usize,
407 ) -> ConsensusObjectSequenceKey {
408 self.shared_objects.read().await.get(type_tag).unwrap()[n]
409 }
410}
411
412fn is_type_tx_context(ty: &Type) -> bool {
413 match ty {
414 Type::Reference(_, inner) => match inner.as_ref() {
415 Type::Datatype(dt) => {
416 dt.module.address == SUI_FRAMEWORK_ADDRESS
417 && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
418 && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
419 && dt.type_arguments.is_empty()
420 }
421 _ => false,
422 },
423 _ => false,
424 }
425}