1use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use mysten_common::fatal;
10use rand::rngs::StdRng;
11use std::collections::{HashMap, HashSet};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use sui_move_build::BuildConfig;
16use sui_protocol_config::{Chain, ProtocolConfig};
17use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
18use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
19use sui_types::object::{Object, Owner};
20use sui_types::storage::WriteKind;
21use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
22use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
23use test_cluster::TestCluster;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info};
26
27type Type = normalized::Type<normalized::ArcIdentifier>;
28
29#[derive(Debug, Clone)]
30pub struct EntryFunction {
31 pub package: ObjectID,
32 pub module: String,
33 pub function: String,
34 pub parameters: Vec<Type>,
35}
36
37#[derive(Debug, Default)]
38pub struct SurfStatistics {
39 pub num_successful_transactions: u64,
40 pub num_failed_transactions: u64,
41 pub num_owned_obj_transactions: u64,
42 pub num_shared_obj_transactions: u64,
43 pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
44}
45
46impl SurfStatistics {
47 pub fn record_transaction(
48 &mut self,
49 has_shared_object: bool,
50 tx_succeeded: bool,
51 package: ObjectID,
52 module: String,
53 function: String,
54 ) {
55 if tx_succeeded {
56 self.num_successful_transactions += 1;
57 } else {
58 self.num_failed_transactions += 1;
59 }
60 if has_shared_object {
61 self.num_shared_obj_transactions += 1;
62 } else {
63 self.num_owned_obj_transactions += 1;
64 }
65 self.unique_move_functions_called
66 .insert((package, module, function));
67 }
68
69 pub fn aggregate(stats: Vec<Self>) -> Self {
70 let mut result = Self::default();
71 for stat in stats {
72 result.num_successful_transactions += stat.num_successful_transactions;
73 result.num_failed_transactions += stat.num_failed_transactions;
74 result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
75 result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
76 result
77 .unique_move_functions_called
78 .extend(stat.unique_move_functions_called);
79 }
80 result
81 }
82
83 pub fn print_stats(&self) {
84 info!(
85 "Executed {} transactions, {} succeeded, {} failed",
86 self.num_successful_transactions + self.num_failed_transactions,
87 self.num_successful_transactions,
88 self.num_failed_transactions
89 );
90 info!(
91 "{} are owned object transactions, {} are shared object transactions",
92 self.num_owned_obj_transactions, self.num_shared_obj_transactions
93 );
94 info!(
95 "Unique move functions called: {}",
96 self.unique_move_functions_called.len()
97 );
98 }
99}
100
101pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
102
103pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
104
105pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
108
109pub struct SurferState {
110 pub pool: Arc<RwLock<normalized::ArcPool>>,
111 pub id: usize,
112 pub cluster: Arc<TestCluster>,
113 pub rng: StdRng,
114
115 pub address: SuiAddress,
116 pub gas_object: ObjectRef,
117 pub owned_objects: OwnedObjects,
118 pub immutable_objects: ImmObjects,
119 pub shared_objects: SharedObjects,
120 pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
121
122 pub stats: SurfStatistics,
123}
124
125impl SurferState {
126 pub fn new(
127 id: usize,
128 cluster: Arc<TestCluster>,
129 rng: StdRng,
130 address: SuiAddress,
131 gas_object: ObjectRef,
132 owned_objects: OwnedObjects,
133 immutable_objects: ImmObjects,
134 shared_objects: SharedObjects,
135 entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
136 ) -> Self {
137 Self {
138 pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
139 id,
140 cluster,
141 rng,
142 address,
143 gas_object,
144 owned_objects,
145 immutable_objects,
146 shared_objects,
147 entry_functions,
148 stats: Default::default(),
149 }
150 }
151
152 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153 pub async fn execute_move_transaction(
154 &mut self,
155 package: ObjectID,
156 module: String,
157 function: String,
158 args: Vec<CallArg>,
159 ) {
160 let rgp = self.cluster.get_reference_gas_price().await;
161 let use_shared_object = args
162 .iter()
163 .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164 let tx_data = TransactionData::new_move_call(
165 self.address,
166 package,
167 Identifier::new(module.as_str()).unwrap(),
168 Identifier::new(function.as_str()).unwrap(),
169 vec![],
170 self.gas_object,
171 args,
172 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173 rgp,
174 )
175 .unwrap();
176 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
177 let response = loop {
178 debug!("Executing transaction {:?}", tx.digest());
179 match self
180 .cluster
181 .wallet
182 .execute_transaction_may_fail(tx.clone())
183 .await
184 {
185 Ok(effects) => break effects,
186 Err(e) => {
187 error!("Error executing transaction {:?}: {e:?}", tx.digest());
188 tokio::time::sleep(Duration::from_secs(1)).await;
189 }
190 }
191 };
192 debug!(
193 "Successfully executed transaction {:?} with response {:?}",
194 tx, response
195 );
196 let effects = response.effects;
197 info!(
198 "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
199 self.address,
200 module,
201 function,
202 effects.status()
203 );
204 self.stats.record_transaction(
205 use_shared_object,
206 effects.status().is_ok(),
207 package,
208 module,
209 function,
210 );
211 self.process_tx_effects(&effects).await;
212 }
213
214 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
215 async fn process_tx_effects(&mut self, effects: &TransactionEffects) {
216 for (obj_ref, owner, write_kind) in effects.all_changed_objects() {
217 if matches!(owner, Owner::ObjectOwner(_)) {
218 continue;
222 }
223 let object = self
225 .cluster
226 .get_object_from_fullnode_store(&obj_ref.0)
227 .await
228 .unwrap();
229 if object.is_package() {
230 self.discover_entry_functions(object).await;
231 continue;
232 }
233 let struct_tag = object.struct_tag().unwrap();
234 match owner {
235 Owner::Immutable => {
236 self.immutable_objects
237 .write()
238 .await
239 .entry(struct_tag)
240 .or_default()
241 .push(obj_ref);
242 }
243 Owner::AddressOwner(address) => {
244 if address == self.address {
245 self.owned_objects
246 .entry(struct_tag)
247 .or_default()
248 .insert(obj_ref);
249 }
250 }
251 Owner::ObjectOwner(_) => (),
252 Owner::Shared {
254 initial_shared_version,
255 }
256 | Owner::ConsensusAddressOwner {
257 start_version: initial_shared_version,
258 ..
259 }
260 | Owner::Party {
261 start_version: initial_shared_version,
262 ..
263 } => {
264 if write_kind != WriteKind::Mutate {
265 self.shared_objects
266 .write()
267 .await
268 .entry(struct_tag)
269 .or_default()
270 .push((obj_ref.0, initial_shared_version));
271 }
272 }
275 }
276 if obj_ref.0 == self.gas_object.0 {
277 self.gas_object = obj_ref;
278 }
279 }
280 }
281
282 async fn discover_entry_functions(&self, package: Object) {
283 let package_id = package.id();
284 let move_package = package.into_inner().data.try_into_package().unwrap();
285 let proto_version = self.cluster.highest_protocol_version();
286 let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
287 let binary_config = config.binary_config(None);
288 let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
289 let entry_functions: Vec<_> = move_package
290 .normalize(pool, &binary_config, false)
291 .unwrap()
292 .into_iter()
293 .flat_map(|(module_name, module)| {
294 module
295 .functions
296 .into_iter()
297 .filter_map(|(func_name, func)| {
298 if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
300 return None;
301 }
302 if !func.return_.is_empty() {
304 return None;
305 }
306 if !func.type_parameters.is_empty() {
308 return None;
309 }
310 let mut parameters = (*func.parameters).clone();
311 if let Some(last_param) = parameters.last().as_ref()
312 && is_type_tx_context(last_param)
313 {
314 parameters.pop();
315 }
316 Some(EntryFunction {
317 package: package_id,
318 module: module_name.clone(),
319 function: func_name.to_string(),
320 parameters: parameters
321 .into_iter()
322 .map(|rc_ty| (*rc_ty).clone())
323 .collect(),
324 })
325 })
326 .collect::<Vec<_>>()
327 })
328 .collect();
329 info!(
330 "Number of entry functions discovered: {:?}",
331 entry_functions.len()
332 );
333 debug!("Entry functions: {:?}", entry_functions);
334 self.entry_functions.write().await.extend(entry_functions);
335 }
336
337 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
338 pub async fn publish_package(&mut self, path: &Path) {
339 let rgp = self.cluster.get_reference_gas_price().await;
340 let package = BuildConfig::new_for_testing()
341 .build_async(path)
342 .await
343 .unwrap();
344 let modules = package.get_package_bytes(false);
345 let tx_data = TransactionData::new_module(
346 self.address,
347 self.gas_object,
348 modules,
349 package.dependency_ids.published.values().cloned().collect(),
350 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
351 rgp,
352 );
353 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
354 let tx_digest = *tx.digest();
355 info!(?tx_digest, "Publishing package");
356 let start = Instant::now();
357 let response = loop {
358 match self
359 .cluster
360 .wallet
361 .execute_transaction_may_fail(tx.clone())
362 .await
363 {
364 Ok(response) => {
365 break response;
366 }
367 Err(err) => {
368 if start.elapsed() > Duration::from_secs(120) {
369 fatal!(
370 "Failed to publish package after 120 seconds: {} {}",
371 err,
372 tx.digest()
373 );
374 }
375 error!(?tx_digest, "Failed to publish package: {}", err);
376 tokio::time::sleep(Duration::from_secs(1)).await;
377 }
378 }
379 };
380 info!("Successfully published package in {:?}", path);
381 self.process_tx_effects(&response.effects).await;
382 }
383
384 pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
385 self.owned_objects
386 .get(type_tag)
387 .map(|objects| objects.len())
388 .unwrap_or(0)
389 }
390
391 pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
392 self.immutable_objects
393 .read()
394 .await
395 .get(type_tag)
396 .map(|objects| objects.len())
397 .unwrap_or(0)
398 }
399
400 pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
401 self.shared_objects
402 .read()
403 .await
404 .get(type_tag)
405 .map(|objects| objects.len())
406 .unwrap_or(0)
407 }
408
409 pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
410 self.owned_objects
411 .get_mut(type_tag)
412 .unwrap()
413 .swap_remove_index(n)
414 .unwrap()
415 }
416
417 pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
418 self.immutable_objects.read().await.get(type_tag).unwrap()[n]
419 }
420
421 pub async fn choose_nth_shared_object(
422 &self,
423 type_tag: &StructTag,
424 n: usize,
425 ) -> ConsensusObjectSequenceKey {
426 self.shared_objects.read().await.get(type_tag).unwrap()[n]
427 }
428}
429
430fn is_type_tx_context(ty: &Type) -> bool {
431 match ty {
432 Type::Reference(_, inner) => match inner.as_ref() {
433 Type::Datatype(dt) => {
434 dt.module.address == SUI_FRAMEWORK_ADDRESS
435 && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
436 && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
437 && dt.type_arguments.is_empty()
438 }
439 _ => false,
440 },
441 _ => false,
442 }
443}