sui_surfer/
surfer_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use mysten_common::fatal;
10use rand::rngs::StdRng;
11use std::collections::{HashMap, HashSet};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use sui_json_rpc_types::{SuiTransactionBlockEffects, SuiTransactionBlockEffectsAPI};
16use sui_move_build::BuildConfig;
17use sui_protocol_config::{Chain, ProtocolConfig};
18use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
19use sui_types::object::{Object, Owner};
20use sui_types::storage::WriteKind;
21use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
22use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
23use test_cluster::TestCluster;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info};
26
27type Type = normalized::Type<normalized::ArcIdentifier>;
28
29#[derive(Debug, Clone)]
30pub struct EntryFunction {
31    pub package: ObjectID,
32    pub module: String,
33    pub function: String,
34    pub parameters: Vec<Type>,
35}
36
37#[derive(Debug, Default)]
38pub struct SurfStatistics {
39    pub num_successful_transactions: u64,
40    pub num_failed_transactions: u64,
41    pub num_owned_obj_transactions: u64,
42    pub num_shared_obj_transactions: u64,
43    pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
44}
45
46impl SurfStatistics {
47    pub fn record_transaction(
48        &mut self,
49        has_shared_object: bool,
50        tx_succeeded: bool,
51        package: ObjectID,
52        module: String,
53        function: String,
54    ) {
55        if tx_succeeded {
56            self.num_successful_transactions += 1;
57        } else {
58            self.num_failed_transactions += 1;
59        }
60        if has_shared_object {
61            self.num_shared_obj_transactions += 1;
62        } else {
63            self.num_owned_obj_transactions += 1;
64        }
65        self.unique_move_functions_called
66            .insert((package, module, function));
67    }
68
69    pub fn aggregate(stats: Vec<Self>) -> Self {
70        let mut result = Self::default();
71        for stat in stats {
72            result.num_successful_transactions += stat.num_successful_transactions;
73            result.num_failed_transactions += stat.num_failed_transactions;
74            result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
75            result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
76            result
77                .unique_move_functions_called
78                .extend(stat.unique_move_functions_called);
79        }
80        result
81    }
82
83    pub fn print_stats(&self) {
84        info!(
85            "Executed {} transactions, {} succeeded, {} failed",
86            self.num_successful_transactions + self.num_failed_transactions,
87            self.num_successful_transactions,
88            self.num_failed_transactions
89        );
90        info!(
91            "{} are owned object transactions, {} are shared object transactions",
92            self.num_owned_obj_transactions, self.num_shared_obj_transactions
93        );
94        info!(
95            "Unique move functions called: {}",
96            self.unique_move_functions_called.len()
97        );
98    }
99}
100
101pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
102
103pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
104
105/// Map from StructTag to a vector of shared objects, where each shared object is a tuple of
106/// (object ID, initial shared version).
107pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
108
109pub struct SurferState {
110    pub pool: Arc<RwLock<normalized::ArcPool>>,
111    pub id: usize,
112    pub cluster: Arc<TestCluster>,
113    pub rng: StdRng,
114
115    pub address: SuiAddress,
116    pub gas_object: ObjectRef,
117    pub owned_objects: OwnedObjects,
118    pub immutable_objects: ImmObjects,
119    pub shared_objects: SharedObjects,
120    pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
121
122    pub stats: SurfStatistics,
123}
124
125impl SurferState {
126    pub fn new(
127        id: usize,
128        cluster: Arc<TestCluster>,
129        rng: StdRng,
130        address: SuiAddress,
131        gas_object: ObjectRef,
132        owned_objects: OwnedObjects,
133        immutable_objects: ImmObjects,
134        shared_objects: SharedObjects,
135        entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
136    ) -> Self {
137        Self {
138            pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
139            id,
140            cluster,
141            rng,
142            address,
143            gas_object,
144            owned_objects,
145            immutable_objects,
146            shared_objects,
147            entry_functions,
148            stats: Default::default(),
149        }
150    }
151
152    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153    pub async fn execute_move_transaction(
154        &mut self,
155        package: ObjectID,
156        module: String,
157        function: String,
158        args: Vec<CallArg>,
159    ) {
160        let rgp = self.cluster.get_reference_gas_price().await;
161        let use_shared_object = args
162            .iter()
163            .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164        let tx_data = TransactionData::new_move_call(
165            self.address,
166            package,
167            Identifier::new(module.as_str()).unwrap(),
168            Identifier::new(function.as_str()).unwrap(),
169            vec![],
170            self.gas_object,
171            args,
172            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173            rgp,
174        )
175        .unwrap();
176        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
177        let response = loop {
178            debug!("Executing transaction {:?}", tx.digest());
179            match self
180                .cluster
181                .wallet
182                .execute_transaction_may_fail(tx.clone())
183                .await
184            {
185                Ok(effects) => break effects,
186                Err(e) => {
187                    error!("Error executing transaction {:?}: {e:?}", tx.digest());
188                    tokio::time::sleep(Duration::from_secs(1)).await;
189                }
190            }
191        };
192        debug!(
193            "Successfully executed transaction {:?} with response {:?}",
194            tx, response
195        );
196        let effects = response.effects.unwrap();
197        info!(
198            "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
199            self.address,
200            module,
201            function,
202            effects.status()
203        );
204        self.stats.record_transaction(
205            use_shared_object,
206            effects.status().is_ok(),
207            package,
208            module,
209            function,
210        );
211        self.process_tx_effects(&effects).await;
212    }
213
214    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
215    async fn process_tx_effects(&mut self, effects: &SuiTransactionBlockEffects) {
216        for (owned_ref, write_kind) in effects.all_changed_objects() {
217            if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
218                // For object owned objects, we don't need to do anything.
219                // We also cannot read them because in the case of shared objects, there can be
220                // races and the child object may no longer exist.
221                continue;
222            }
223            let obj_ref = owned_ref.reference.to_object_ref();
224            let object = self
225                .cluster
226                .get_object_from_fullnode_store(&obj_ref.0)
227                .await
228                .unwrap();
229            if object.is_package() {
230                self.discover_entry_functions(object).await;
231                continue;
232            }
233            let struct_tag = object.struct_tag().unwrap();
234            match owned_ref.owner {
235                Owner::Immutable => {
236                    self.immutable_objects
237                        .write()
238                        .await
239                        .entry(struct_tag)
240                        .or_default()
241                        .push(obj_ref);
242                }
243                Owner::AddressOwner(address) => {
244                    if address == self.address {
245                        self.owned_objects
246                            .entry(struct_tag)
247                            .or_default()
248                            .insert(obj_ref);
249                    }
250                }
251                Owner::ObjectOwner(_) => (),
252                Owner::Shared {
253                    initial_shared_version,
254                }
255                // TODO: Implement full support for ConsensusAddressOwner objects in sui-surfer.
256                | Owner::ConsensusAddressOwner {
257                    start_version: initial_shared_version,
258                    ..
259                } => {
260                    if write_kind != WriteKind::Mutate {
261                        self.shared_objects
262                            .write()
263                            .await
264                            .entry(struct_tag)
265                            .or_default()
266                            .push((obj_ref.0, initial_shared_version));
267                    }
268                    // We do not need to insert it if it's a Mutate, because it means
269                    // we should already have it in the inventory.
270                }
271            }
272            if obj_ref.0 == self.gas_object.0 {
273                self.gas_object = obj_ref;
274            }
275        }
276    }
277
278    async fn discover_entry_functions(&self, package: Object) {
279        let package_id = package.id();
280        let move_package = package.into_inner().data.try_into_package().unwrap();
281        let proto_version = self.cluster.highest_protocol_version();
282        let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
283        let binary_config = config.binary_config(None);
284        let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
285        let entry_functions: Vec<_> = move_package
286            .normalize(pool, &binary_config, /* include code */ false)
287            .unwrap()
288            .into_iter()
289            .flat_map(|(module_name, module)| {
290                module
291                    .functions
292                    .into_iter()
293                    .filter_map(|(func_name, func)| {
294                        // Either public function or entry function is callable.
295                        if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
296                            return None;
297                        }
298                        // Surfer doesn't support chaining transactions in a programmable transaction yet.
299                        if !func.return_.is_empty() {
300                            return None;
301                        }
302                        // Surfer doesn't support type parameter yet.
303                        if !func.type_parameters.is_empty() {
304                            return None;
305                        }
306                        let mut parameters = (*func.parameters).clone();
307                        if let Some(last_param) = parameters.last().as_ref()
308                            && is_type_tx_context(last_param)
309                        {
310                            parameters.pop();
311                        }
312                        Some(EntryFunction {
313                            package: package_id,
314                            module: module_name.clone(),
315                            function: func_name.to_string(),
316                            parameters: parameters
317                                .into_iter()
318                                .map(|rc_ty| (*rc_ty).clone())
319                                .collect(),
320                        })
321                    })
322                    .collect::<Vec<_>>()
323            })
324            .collect();
325        info!(
326            "Number of entry functions discovered: {:?}",
327            entry_functions.len()
328        );
329        debug!("Entry functions: {:?}", entry_functions);
330        self.entry_functions.write().await.extend(entry_functions);
331    }
332
333    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
334    pub async fn publish_package(&mut self, path: &Path) {
335        let rgp = self.cluster.get_reference_gas_price().await;
336        let package = BuildConfig::new_for_testing()
337            .build_async(path)
338            .await
339            .unwrap();
340        let modules = package.get_package_bytes(false);
341        let tx_data = TransactionData::new_module(
342            self.address,
343            self.gas_object,
344            modules,
345            package.dependency_ids.published.values().cloned().collect(),
346            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
347            rgp,
348        );
349        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
350        let tx_digest = *tx.digest();
351        info!(?tx_digest, "Publishing package");
352        let start = Instant::now();
353        let response = loop {
354            match self
355                .cluster
356                .wallet
357                .execute_transaction_may_fail(tx.clone())
358                .await
359            {
360                Ok(response) => {
361                    break response;
362                }
363                Err(err) => {
364                    if start.elapsed() > Duration::from_secs(120) {
365                        fatal!(
366                            "Failed to publish package after 120 seconds: {} {}",
367                            err,
368                            tx.digest()
369                        );
370                    }
371                    error!(?tx_digest, "Failed to publish package: {}", err);
372                    tokio::time::sleep(Duration::from_secs(1)).await;
373                }
374            }
375        };
376        info!("Successfully published package in {:?}", path);
377        self.process_tx_effects(&response.effects.unwrap()).await;
378    }
379
380    pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
381        self.owned_objects
382            .get(type_tag)
383            .map(|objects| objects.len())
384            .unwrap_or(0)
385    }
386
387    pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
388        self.immutable_objects
389            .read()
390            .await
391            .get(type_tag)
392            .map(|objects| objects.len())
393            .unwrap_or(0)
394    }
395
396    pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
397        self.shared_objects
398            .read()
399            .await
400            .get(type_tag)
401            .map(|objects| objects.len())
402            .unwrap_or(0)
403    }
404
405    pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
406        self.owned_objects
407            .get_mut(type_tag)
408            .unwrap()
409            .swap_remove_index(n)
410            .unwrap()
411    }
412
413    pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
414        self.immutable_objects.read().await.get(type_tag).unwrap()[n]
415    }
416
417    pub async fn choose_nth_shared_object(
418        &self,
419        type_tag: &StructTag,
420        n: usize,
421    ) -> ConsensusObjectSequenceKey {
422        self.shared_objects.read().await.get(type_tag).unwrap()[n]
423    }
424}
425
426fn is_type_tx_context(ty: &Type) -> bool {
427    match ty {
428        Type::Reference(_, inner) => match inner.as_ref() {
429            Type::Datatype(dt) => {
430                dt.module.address == SUI_FRAMEWORK_ADDRESS
431                    && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
432                    && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
433                    && dt.type_arguments.is_empty()
434            }
435            _ => false,
436        },
437        _ => false,
438    }
439}