1use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use mysten_common::fatal;
10use rand::rngs::StdRng;
11use std::collections::{HashMap, HashSet};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use sui_json_rpc_types::{SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI};
16use sui_move_build::BuildConfig;
17use sui_protocol_config::{Chain, ProtocolConfig};
18use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
19use sui_types::object::{Object, Owner};
20use sui_types::storage::WriteKind;
21use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
22use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
23use test_cluster::TestCluster;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info};
26
27type Type = normalized::Type<normalized::ArcIdentifier>;
28
29#[derive(Debug, Clone)]
30pub struct EntryFunction {
31 pub package: ObjectID,
32 pub module: String,
33 pub function: String,
34 pub parameters: Vec<Type>,
35}
36
37#[derive(Debug, Default)]
38pub struct SurfStatistics {
39 pub num_successful_transactions: u64,
40 pub num_failed_transactions: u64,
41 pub num_owned_obj_transactions: u64,
42 pub num_shared_obj_transactions: u64,
43 pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
44}
45
46impl SurfStatistics {
47 pub fn record_transaction(
48 &mut self,
49 has_shared_object: bool,
50 tx_succeeded: bool,
51 package: ObjectID,
52 module: String,
53 function: String,
54 ) {
55 if tx_succeeded {
56 self.num_successful_transactions += 1;
57 } else {
58 self.num_failed_transactions += 1;
59 }
60 if has_shared_object {
61 self.num_shared_obj_transactions += 1;
62 } else {
63 self.num_owned_obj_transactions += 1;
64 }
65 self.unique_move_functions_called
66 .insert((package, module, function));
67 }
68
69 pub fn aggregate(stats: Vec<Self>) -> Self {
70 let mut result = Self::default();
71 for stat in stats {
72 result.num_successful_transactions += stat.num_successful_transactions;
73 result.num_failed_transactions += stat.num_failed_transactions;
74 result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
75 result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
76 result
77 .unique_move_functions_called
78 .extend(stat.unique_move_functions_called);
79 }
80 result
81 }
82
83 pub fn print_stats(&self) {
84 info!(
85 "Executed {} transactions, {} succeeded, {} failed",
86 self.num_successful_transactions + self.num_failed_transactions,
87 self.num_successful_transactions,
88 self.num_failed_transactions
89 );
90 info!(
91 "{} are owned object transactions, {} are shared object transactions",
92 self.num_owned_obj_transactions, self.num_shared_obj_transactions
93 );
94 info!(
95 "Unique move functions called: {}",
96 self.unique_move_functions_called.len()
97 );
98 }
99}
100
101pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
102
103pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
104
105pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
108
109pub struct SurferState {
110 pub pool: Arc<RwLock<normalized::ArcPool>>,
111 pub id: usize,
112 pub cluster: Arc<TestCluster>,
113 pub rng: StdRng,
114
115 pub address: SuiAddress,
116 pub gas_object: ObjectRef,
117 pub owned_objects: OwnedObjects,
118 pub immutable_objects: ImmObjects,
119 pub shared_objects: SharedObjects,
120 pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
121
122 pub stats: SurfStatistics,
123}
124
125impl SurferState {
126 pub fn new(
127 id: usize,
128 cluster: Arc<TestCluster>,
129 rng: StdRng,
130 address: SuiAddress,
131 gas_object: ObjectRef,
132 owned_objects: OwnedObjects,
133 immutable_objects: ImmObjects,
134 shared_objects: SharedObjects,
135 entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
136 ) -> Self {
137 Self {
138 pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
139 id,
140 cluster,
141 rng,
142 address,
143 gas_object,
144 owned_objects,
145 immutable_objects,
146 shared_objects,
147 entry_functions,
148 stats: Default::default(),
149 }
150 }
151
152 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153 pub async fn execute_move_transaction(
154 &mut self,
155 package: ObjectID,
156 module: String,
157 function: String,
158 args: Vec<CallArg>,
159 ) {
160 let rgp = self.cluster.get_reference_gas_price().await;
161 let use_shared_object = args
162 .iter()
163 .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164 let tx_data = TransactionData::new_move_call(
165 self.address,
166 package,
167 Identifier::new(module.as_str()).unwrap(),
168 Identifier::new(function.as_str()).unwrap(),
169 vec![],
170 self.gas_object,
171 args,
172 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173 rgp,
174 )
175 .unwrap();
176 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
177 let response = loop {
178 debug!("Executing transaction {:?}", tx.digest());
179 match self
180 .cluster
181 .wallet
182 .execute_transaction_may_fail(tx.clone())
183 .await
184 {
185 Ok(effects) => break effects,
186 Err(e) => {
187 error!("Error executing transaction {:?}: {e:?}", tx.digest());
188 tokio::time::sleep(Duration::from_secs(1)).await;
189 }
190 }
191 };
192 debug!(
193 "Successfully executed transaction {:?} with response {:?}",
194 tx, response
195 );
196 let effects = response.effects.unwrap();
197 info!(
198 "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
199 self.address,
200 module,
201 function,
202 effects.status()
203 );
204 self.stats.record_transaction(
205 use_shared_object,
206 effects.status().is_ok(),
207 package,
208 module,
209 function,
210 );
211 self.process_tx_effects(&effects).await;
212 }
213
214 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
215 async fn process_tx_effects(&mut self, effects: &SuiTransactionBlockEffects) {
216 for (owned_ref, write_kind) in effects.all_changed_objects() {
217 if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
218 continue;
222 }
223 let obj_ref = owned_ref.reference.to_object_ref();
224 let object = self
225 .cluster
226 .get_object_from_fullnode_store(&obj_ref.0)
227 .await
228 .unwrap();
229 if object.is_package() {
230 self.discover_entry_functions(object).await;
231 continue;
232 }
233 let struct_tag = object.struct_tag().unwrap();
234 match owned_ref.owner {
235 Owner::Immutable => {
236 self.immutable_objects
237 .write()
238 .await
239 .entry(struct_tag)
240 .or_default()
241 .push(obj_ref);
242 }
243 Owner::AddressOwner(address) => {
244 if address == self.address {
245 self.owned_objects
246 .entry(struct_tag)
247 .or_default()
248 .insert(obj_ref);
249 }
250 }
251 Owner::ObjectOwner(_) => (),
252 Owner::Shared {
253 initial_shared_version,
254 }
255 | Owner::ConsensusAddressOwner {
257 start_version: initial_shared_version,
258 ..
259 } => {
260 if write_kind != WriteKind::Mutate {
261 self.shared_objects
262 .write()
263 .await
264 .entry(struct_tag)
265 .or_default()
266 .push((obj_ref.0, initial_shared_version));
267 }
268 }
271 }
272 if obj_ref.0 == self.gas_object.0 {
273 self.gas_object = obj_ref;
274 }
275 }
276 }
277
278 async fn discover_entry_functions(&self, package: Object) {
279 let package_id = package.id();
280 let move_package = package.into_inner().data.try_into_package().unwrap();
281 let proto_version = self.cluster.highest_protocol_version();
282 let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
283 let binary_config = config.binary_config(None);
284 let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
285 let entry_functions: Vec<_> = move_package
286 .normalize(pool, &binary_config, false)
287 .unwrap()
288 .into_iter()
289 .flat_map(|(module_name, module)| {
290 module
291 .functions
292 .into_iter()
293 .filter_map(|(func_name, func)| {
294 if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
296 return None;
297 }
298 if !func.return_.is_empty() {
300 return None;
301 }
302 if !func.type_parameters.is_empty() {
304 return None;
305 }
306 let mut parameters = (*func.parameters).clone();
307 if let Some(last_param) = parameters.last().as_ref()
308 && is_type_tx_context(last_param)
309 {
310 parameters.pop();
311 }
312 Some(EntryFunction {
313 package: package_id,
314 module: module_name.clone(),
315 function: func_name.to_string(),
316 parameters: parameters
317 .into_iter()
318 .map(|rc_ty| (*rc_ty).clone())
319 .collect(),
320 })
321 })
322 .collect::<Vec<_>>()
323 })
324 .collect();
325 info!(
326 "Number of entry functions discovered: {:?}",
327 entry_functions.len()
328 );
329 debug!("Entry functions: {:?}", entry_functions);
330 self.entry_functions.write().await.extend(entry_functions);
331 }
332
333 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
334 pub async fn publish_package(&mut self, path: &Path) {
335 let rgp = self.cluster.get_reference_gas_price().await;
336 let package = BuildConfig::new_for_testing()
337 .build_async(path)
338 .await
339 .unwrap();
340 let modules = package.get_package_bytes(false);
341 let tx_data = TransactionData::new_module(
342 self.address,
343 self.gas_object,
344 modules,
345 package.dependency_ids.published.values().cloned().collect(),
346 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
347 rgp,
348 );
349 let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
350 let tx_digest = *tx.digest();
351 info!(?tx_digest, "Publishing package");
352 let start = Instant::now();
353 let response = loop {
354 match self
355 .cluster
356 .wallet
357 .execute_transaction_may_fail(tx.clone())
358 .await
359 {
360 Ok(response) => {
361 break response;
362 }
363 Err(err) => {
364 if start.elapsed() > Duration::from_secs(120) {
365 fatal!(
366 "Failed to publish package after 120 seconds: {} {}",
367 err,
368 tx.digest()
369 );
370 }
371 error!(?tx_digest, "Failed to publish package: {}", err);
372 tokio::time::sleep(Duration::from_secs(1)).await;
373 }
374 }
375 };
376 info!("Successfully published package in {:?}", path);
377 self.process_tx_effects(&response.effects.unwrap()).await;
378 }
379
380 pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
381 self.owned_objects
382 .get(type_tag)
383 .map(|objects| objects.len())
384 .unwrap_or(0)
385 }
386
387 pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
388 self.immutable_objects
389 .read()
390 .await
391 .get(type_tag)
392 .map(|objects| objects.len())
393 .unwrap_or(0)
394 }
395
396 pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
397 self.shared_objects
398 .read()
399 .await
400 .get(type_tag)
401 .map(|objects| objects.len())
402 .unwrap_or(0)
403 }
404
405 pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
406 self.owned_objects
407 .get_mut(type_tag)
408 .unwrap()
409 .swap_remove_index(n)
410 .unwrap()
411 }
412
413 pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
414 self.immutable_objects.read().await.get(type_tag).unwrap()[n]
415 }
416
417 pub async fn choose_nth_shared_object(
418 &self,
419 type_tag: &StructTag,
420 n: usize,
421 ) -> ConsensusObjectSequenceKey {
422 self.shared_objects.read().await.get(type_tag).unwrap()[n]
423 }
424}
425
426fn is_type_tx_context(ty: &Type) -> bool {
427 match ty {
428 Type::Reference(_, inner) => match inner.as_ref() {
429 Type::Datatype(dt) => {
430 dt.module.address == SUI_FRAMEWORK_ADDRESS
431 && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
432 && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
433 && dt.type_arguments.is_empty()
434 }
435 _ => false,
436 },
437 _ => false,
438 }
439}