sui_surfer/
surfer_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use rand::rngs::StdRng;
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14use sui_json_rpc_types::{SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI};
15use sui_move_build::BuildConfig;
16use sui_protocol_config::{Chain, ProtocolConfig};
17use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
18use sui_types::object::{Object, Owner};
19use sui_types::storage::WriteKind;
20use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
21use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
22use test_cluster::TestCluster;
23use tokio::sync::RwLock;
24use tracing::{debug, error, info};
25
26type Type = normalized::Type<normalized::ArcIdentifier>;
27
28#[derive(Debug, Clone)]
29pub struct EntryFunction {
30    pub package: ObjectID,
31    pub module: String,
32    pub function: String,
33    pub parameters: Vec<Type>,
34}
35
36#[derive(Debug, Default)]
37pub struct SurfStatistics {
38    pub num_successful_transactions: u64,
39    pub num_failed_transactions: u64,
40    pub num_owned_obj_transactions: u64,
41    pub num_shared_obj_transactions: u64,
42    pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
43}
44
45impl SurfStatistics {
46    pub fn record_transaction(
47        &mut self,
48        has_shared_object: bool,
49        tx_succeeded: bool,
50        package: ObjectID,
51        module: String,
52        function: String,
53    ) {
54        if tx_succeeded {
55            self.num_successful_transactions += 1;
56        } else {
57            self.num_failed_transactions += 1;
58        }
59        if has_shared_object {
60            self.num_shared_obj_transactions += 1;
61        } else {
62            self.num_owned_obj_transactions += 1;
63        }
64        self.unique_move_functions_called
65            .insert((package, module, function));
66    }
67
68    pub fn aggregate(stats: Vec<Self>) -> Self {
69        let mut result = Self::default();
70        for stat in stats {
71            result.num_successful_transactions += stat.num_successful_transactions;
72            result.num_failed_transactions += stat.num_failed_transactions;
73            result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
74            result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
75            result
76                .unique_move_functions_called
77                .extend(stat.unique_move_functions_called);
78        }
79        result
80    }
81
82    pub fn print_stats(&self) {
83        info!(
84            "Executed {} transactions, {} succeeded, {} failed",
85            self.num_successful_transactions + self.num_failed_transactions,
86            self.num_successful_transactions,
87            self.num_failed_transactions
88        );
89        info!(
90            "{} are owned object transactions, {} are shared object transactions",
91            self.num_owned_obj_transactions, self.num_shared_obj_transactions
92        );
93        info!(
94            "Unique move functions called: {}",
95            self.unique_move_functions_called.len()
96        );
97    }
98}
99
100pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
101
102pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
103
104/// Map from StructTag to a vector of shared objects, where each shared object is a tuple of
105/// (object ID, initial shared version).
106pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
107
108pub struct SurferState {
109    pub pool: Arc<RwLock<normalized::ArcPool>>,
110    pub id: usize,
111    pub cluster: Arc<TestCluster>,
112    pub rng: StdRng,
113
114    pub address: SuiAddress,
115    pub gas_object: ObjectRef,
116    pub owned_objects: OwnedObjects,
117    pub immutable_objects: ImmObjects,
118    pub shared_objects: SharedObjects,
119    pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
120
121    pub stats: SurfStatistics,
122}
123
124impl SurferState {
125    pub fn new(
126        id: usize,
127        cluster: Arc<TestCluster>,
128        rng: StdRng,
129        address: SuiAddress,
130        gas_object: ObjectRef,
131        owned_objects: OwnedObjects,
132        immutable_objects: ImmObjects,
133        shared_objects: SharedObjects,
134        entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
135    ) -> Self {
136        Self {
137            pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
138            id,
139            cluster,
140            rng,
141            address,
142            gas_object,
143            owned_objects,
144            immutable_objects,
145            shared_objects,
146            entry_functions,
147            stats: Default::default(),
148        }
149    }
150
151    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
152    pub async fn execute_move_transaction(
153        &mut self,
154        package: ObjectID,
155        module: String,
156        function: String,
157        args: Vec<CallArg>,
158    ) {
159        let rgp = self.cluster.get_reference_gas_price().await;
160        let use_shared_object = args
161            .iter()
162            .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
163        let tx_data = TransactionData::new_move_call(
164            self.address,
165            package,
166            Identifier::new(module.as_str()).unwrap(),
167            Identifier::new(function.as_str()).unwrap(),
168            vec![],
169            self.gas_object,
170            args,
171            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
172            rgp,
173        )
174        .unwrap();
175        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
176        let response = loop {
177            debug!("Executing transaction {:?}", tx.digest());
178            match self
179                .cluster
180                .wallet
181                .execute_transaction_may_fail(tx.clone())
182                .await
183            {
184                Ok(effects) => break effects,
185                Err(e) => {
186                    error!("Error executing transaction {:?}: {e:?}", tx.digest());
187                    tokio::time::sleep(Duration::from_secs(1)).await;
188                }
189            }
190        };
191        debug!(
192            "Successfully executed transaction {:?} with response {:?}",
193            tx, response
194        );
195        let effects = response.effects.unwrap();
196        info!(
197            "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
198            self.address,
199            module,
200            function,
201            effects.status()
202        );
203        self.stats.record_transaction(
204            use_shared_object,
205            effects.status().is_ok(),
206            package,
207            module,
208            function,
209        );
210        self.process_tx_effects(&effects).await;
211    }
212
213    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
214    async fn process_tx_effects(&mut self, effects: &SuiTransactionBlockEffects) {
215        for (owned_ref, write_kind) in effects.all_changed_objects() {
216            if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
217                // For object owned objects, we don't need to do anything.
218                // We also cannot read them because in the case of shared objects, there can be
219                // races and the child object may no longer exist.
220                continue;
221            }
222            let obj_ref = owned_ref.reference.to_object_ref();
223            let object = self
224                .cluster
225                .get_object_from_fullnode_store(&obj_ref.0)
226                .await
227                .unwrap();
228            if object.is_package() {
229                self.discover_entry_functions(object).await;
230                continue;
231            }
232            let struct_tag = object.struct_tag().unwrap();
233            match owned_ref.owner {
234                Owner::Immutable => {
235                    self.immutable_objects
236                        .write()
237                        .await
238                        .entry(struct_tag)
239                        .or_default()
240                        .push(obj_ref);
241                }
242                Owner::AddressOwner(address) => {
243                    if address == self.address {
244                        self.owned_objects
245                            .entry(struct_tag)
246                            .or_default()
247                            .insert(obj_ref);
248                    }
249                }
250                Owner::ObjectOwner(_) => (),
251                Owner::Shared {
252                    initial_shared_version,
253                }
254                // TODO: Implement full support for ConsensusAddressOwner objects in sui-surfer.
255                | Owner::ConsensusAddressOwner {
256                    start_version: initial_shared_version,
257                    ..
258                } => {
259                    if write_kind != WriteKind::Mutate {
260                        self.shared_objects
261                            .write()
262                            .await
263                            .entry(struct_tag)
264                            .or_default()
265                            .push((obj_ref.0, initial_shared_version));
266                    }
267                    // We do not need to insert it if it's a Mutate, because it means
268                    // we should already have it in the inventory.
269                }
270            }
271            if obj_ref.0 == self.gas_object.0 {
272                self.gas_object = obj_ref;
273            }
274        }
275    }
276
277    async fn discover_entry_functions(&self, package: Object) {
278        let package_id = package.id();
279        let move_package = package.into_inner().data.try_into_package().unwrap();
280        let proto_version = self.cluster.highest_protocol_version();
281        let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
282        let binary_config = config.binary_config(None);
283        let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
284        let entry_functions: Vec<_> = move_package
285            .normalize(pool, &binary_config, /* include code */ false)
286            .unwrap()
287            .into_iter()
288            .flat_map(|(module_name, module)| {
289                module
290                    .functions
291                    .into_iter()
292                    .filter_map(|(func_name, func)| {
293                        // Either public function or entry function is callable.
294                        if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
295                            return None;
296                        }
297                        // Surfer doesn't support chaining transactions in a programmable transaction yet.
298                        if !func.return_.is_empty() {
299                            return None;
300                        }
301                        // Surfer doesn't support type parameter yet.
302                        if !func.type_parameters.is_empty() {
303                            return None;
304                        }
305                        let mut parameters = (*func.parameters).clone();
306                        if let Some(last_param) = parameters.last().as_ref()
307                            && is_type_tx_context(last_param)
308                        {
309                            parameters.pop();
310                        }
311                        Some(EntryFunction {
312                            package: package_id,
313                            module: module_name.clone(),
314                            function: func_name.to_string(),
315                            parameters: parameters
316                                .into_iter()
317                                .map(|rc_ty| (*rc_ty).clone())
318                                .collect(),
319                        })
320                    })
321                    .collect::<Vec<_>>()
322            })
323            .collect();
324        info!(
325            "Number of entry functions discovered: {:?}",
326            entry_functions.len()
327        );
328        debug!("Entry functions: {:?}", entry_functions);
329        self.entry_functions.write().await.extend(entry_functions);
330    }
331
332    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
333    pub async fn publish_package(&mut self, path: &Path) {
334        let rgp = self.cluster.get_reference_gas_price().await;
335        let package = BuildConfig::new_for_testing().build(path).unwrap();
336        let modules = package.get_package_bytes(false);
337        let tx_data = TransactionData::new_module(
338            self.address,
339            self.gas_object,
340            modules,
341            package.dependency_ids.published.values().cloned().collect(),
342            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
343            rgp,
344        );
345        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
346        let response = loop {
347            match self
348                .cluster
349                .wallet
350                .execute_transaction_may_fail(tx.clone())
351                .await
352            {
353                Ok(response) => {
354                    break response;
355                }
356                Err(err) => {
357                    error!("Failed to publish package: {:?}", err);
358                    tokio::time::sleep(Duration::from_secs(1)).await;
359                }
360            }
361        };
362        info!("Successfully published package in {:?}", path);
363        self.process_tx_effects(&response.effects.unwrap()).await;
364    }
365
366    pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
367        self.owned_objects
368            .get(type_tag)
369            .map(|objects| objects.len())
370            .unwrap_or(0)
371    }
372
373    pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
374        self.immutable_objects
375            .read()
376            .await
377            .get(type_tag)
378            .map(|objects| objects.len())
379            .unwrap_or(0)
380    }
381
382    pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
383        self.shared_objects
384            .read()
385            .await
386            .get(type_tag)
387            .map(|objects| objects.len())
388            .unwrap_or(0)
389    }
390
391    pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
392        self.owned_objects
393            .get_mut(type_tag)
394            .unwrap()
395            .swap_remove_index(n)
396            .unwrap()
397    }
398
399    pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
400        self.immutable_objects.read().await.get(type_tag).unwrap()[n]
401    }
402
403    pub async fn choose_nth_shared_object(
404        &self,
405        type_tag: &StructTag,
406        n: usize,
407    ) -> ConsensusObjectSequenceKey {
408        self.shared_objects.read().await.get(type_tag).unwrap()[n]
409    }
410}
411
412fn is_type_tx_context(ty: &Type) -> bool {
413    match ty {
414        Type::Reference(_, inner) => match inner.as_ref() {
415            Type::Datatype(dt) => {
416                dt.module.address == SUI_FRAMEWORK_ADDRESS
417                    && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
418                    && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
419                    && dt.type_arguments.is_empty()
420            }
421            _ => false,
422        },
423        _ => false,
424    }
425}