sui_surfer/
surfer_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use indexmap::IndexSet;
5use move_binary_format::file_format::Visibility;
6use move_binary_format::normalized;
7use move_core_types::identifier::IdentStr;
8use move_core_types::language_storage::StructTag;
9use mysten_common::fatal;
10use rand::rngs::StdRng;
11use std::collections::{HashMap, HashSet};
12use std::path::Path;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use sui_move_build::BuildConfig;
16use sui_protocol_config::{Chain, ProtocolConfig};
17use sui_types::base_types::{ConsensusObjectSequenceKey, ObjectID, ObjectRef, SuiAddress};
18use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
19use sui_types::object::{Object, Owner};
20use sui_types::storage::WriteKind;
21use sui_types::transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData};
22use sui_types::{Identifier, SUI_FRAMEWORK_ADDRESS};
23use test_cluster::TestCluster;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info};
26
27type Type = normalized::Type<normalized::ArcIdentifier>;
28
29#[derive(Debug, Clone)]
30pub struct EntryFunction {
31    pub package: ObjectID,
32    pub module: String,
33    pub function: String,
34    pub parameters: Vec<Type>,
35}
36
37#[derive(Debug, Default)]
38pub struct SurfStatistics {
39    pub num_successful_transactions: u64,
40    pub num_failed_transactions: u64,
41    pub num_owned_obj_transactions: u64,
42    pub num_shared_obj_transactions: u64,
43    pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
44}
45
46impl SurfStatistics {
47    pub fn record_transaction(
48        &mut self,
49        has_shared_object: bool,
50        tx_succeeded: bool,
51        package: ObjectID,
52        module: String,
53        function: String,
54    ) {
55        if tx_succeeded {
56            self.num_successful_transactions += 1;
57        } else {
58            self.num_failed_transactions += 1;
59        }
60        if has_shared_object {
61            self.num_shared_obj_transactions += 1;
62        } else {
63            self.num_owned_obj_transactions += 1;
64        }
65        self.unique_move_functions_called
66            .insert((package, module, function));
67    }
68
69    pub fn aggregate(stats: Vec<Self>) -> Self {
70        let mut result = Self::default();
71        for stat in stats {
72            result.num_successful_transactions += stat.num_successful_transactions;
73            result.num_failed_transactions += stat.num_failed_transactions;
74            result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
75            result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
76            result
77                .unique_move_functions_called
78                .extend(stat.unique_move_functions_called);
79        }
80        result
81    }
82
83    pub fn print_stats(&self) {
84        info!(
85            "Executed {} transactions, {} succeeded, {} failed",
86            self.num_successful_transactions + self.num_failed_transactions,
87            self.num_successful_transactions,
88            self.num_failed_transactions
89        );
90        info!(
91            "{} are owned object transactions, {} are shared object transactions",
92            self.num_owned_obj_transactions, self.num_shared_obj_transactions
93        );
94        info!(
95            "Unique move functions called: {}",
96            self.unique_move_functions_called.len()
97        );
98    }
99}
100
101pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
102
103pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
104
105/// Map from StructTag to a vector of shared objects, where each shared object is a tuple of
106/// (object ID, initial shared version).
107pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<ConsensusObjectSequenceKey>>>>;
108
109pub struct SurferState {
110    pub pool: Arc<RwLock<normalized::ArcPool>>,
111    pub id: usize,
112    pub cluster: Arc<TestCluster>,
113    pub rng: StdRng,
114
115    pub address: SuiAddress,
116    pub gas_object: ObjectRef,
117    pub owned_objects: OwnedObjects,
118    pub immutable_objects: ImmObjects,
119    pub shared_objects: SharedObjects,
120    pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
121
122    pub stats: SurfStatistics,
123}
124
125impl SurferState {
126    pub fn new(
127        id: usize,
128        cluster: Arc<TestCluster>,
129        rng: StdRng,
130        address: SuiAddress,
131        gas_object: ObjectRef,
132        owned_objects: OwnedObjects,
133        immutable_objects: ImmObjects,
134        shared_objects: SharedObjects,
135        entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
136    ) -> Self {
137        Self {
138            pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
139            id,
140            cluster,
141            rng,
142            address,
143            gas_object,
144            owned_objects,
145            immutable_objects,
146            shared_objects,
147            entry_functions,
148            stats: Default::default(),
149        }
150    }
151
152    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153    pub async fn execute_move_transaction(
154        &mut self,
155        package: ObjectID,
156        module: String,
157        function: String,
158        args: Vec<CallArg>,
159    ) {
160        let rgp = self.cluster.get_reference_gas_price().await;
161        let use_shared_object = args
162            .iter()
163            .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164        let tx_data = TransactionData::new_move_call(
165            self.address,
166            package,
167            Identifier::new(module.as_str()).unwrap(),
168            Identifier::new(function.as_str()).unwrap(),
169            vec![],
170            self.gas_object,
171            args,
172            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173            rgp,
174        )
175        .unwrap();
176        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
177        let response = loop {
178            debug!("Executing transaction {:?}", tx.digest());
179            match self
180                .cluster
181                .wallet
182                .execute_transaction_may_fail(tx.clone())
183                .await
184            {
185                Ok(effects) => break effects,
186                Err(e) => {
187                    error!("Error executing transaction {:?}: {e:?}", tx.digest());
188                    tokio::time::sleep(Duration::from_secs(1)).await;
189                }
190            }
191        };
192        debug!(
193            "Successfully executed transaction {:?} with response {:?}",
194            tx, response
195        );
196        let effects = response.effects;
197        info!(
198            "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
199            self.address,
200            module,
201            function,
202            effects.status()
203        );
204        self.stats.record_transaction(
205            use_shared_object,
206            effects.status().is_ok(),
207            package,
208            module,
209            function,
210        );
211        self.process_tx_effects(&effects).await;
212    }
213
214    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
215    async fn process_tx_effects(&mut self, effects: &TransactionEffects) {
216        for (obj_ref, owner, write_kind) in effects.all_changed_objects() {
217            if matches!(owner, Owner::ObjectOwner(_)) {
218                // For object owned objects, we don't need to do anything.
219                // We also cannot read them because in the case of shared objects, there can be
220                // races and the child object may no longer exist.
221                continue;
222            }
223            // let obj_ref = owned_ref.reference.to_object_ref();
224            let object = self
225                .cluster
226                .get_object_from_fullnode_store(&obj_ref.0)
227                .await
228                .unwrap();
229            if object.is_package() {
230                self.discover_entry_functions(object).await;
231                continue;
232            }
233            let struct_tag = object.struct_tag().unwrap();
234            match owner {
235                Owner::Immutable => {
236                    self.immutable_objects
237                        .write()
238                        .await
239                        .entry(struct_tag)
240                        .or_default()
241                        .push(obj_ref);
242                }
243                Owner::AddressOwner(address) => {
244                    if address == self.address {
245                        self.owned_objects
246                            .entry(struct_tag)
247                            .or_default()
248                            .insert(obj_ref);
249                    }
250                }
251                Owner::ObjectOwner(_) => (),
252                // TODO(Party WIP) Implement full support for Party objects in sui-surfer.
253                Owner::Shared {
254                    initial_shared_version,
255                }
256                | Owner::ConsensusAddressOwner {
257                    start_version: initial_shared_version,
258                    ..
259                }
260                | Owner::Party {
261                    start_version: initial_shared_version,
262                    ..
263                } => {
264                    if write_kind != WriteKind::Mutate {
265                        self.shared_objects
266                            .write()
267                            .await
268                            .entry(struct_tag)
269                            .or_default()
270                            .push((obj_ref.0, initial_shared_version));
271                    }
272                    // We do not need to insert it if it's a Mutate, because it means
273                    // we should already have it in the inventory.
274                }
275            }
276            if obj_ref.0 == self.gas_object.0 {
277                self.gas_object = obj_ref;
278            }
279        }
280    }
281
282    async fn discover_entry_functions(&self, package: Object) {
283        let package_id = package.id();
284        let move_package = package.into_inner().data.try_into_package().unwrap();
285        let proto_version = self.cluster.highest_protocol_version();
286        let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
287        let binary_config = config.binary_config(None);
288        let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
289        let entry_functions: Vec<_> = move_package
290            .normalize(pool, &binary_config, /* include code */ false)
291            .unwrap()
292            .into_iter()
293            .flat_map(|(module_name, module)| {
294                module
295                    .functions
296                    .into_iter()
297                    .filter_map(|(func_name, func)| {
298                        // Either public function or entry function is callable.
299                        if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
300                            return None;
301                        }
302                        // Surfer doesn't support chaining transactions in a programmable transaction yet.
303                        if !func.return_.is_empty() {
304                            return None;
305                        }
306                        // Surfer doesn't support type parameter yet.
307                        if !func.type_parameters.is_empty() {
308                            return None;
309                        }
310                        let mut parameters = (*func.parameters).clone();
311                        if let Some(last_param) = parameters.last().as_ref()
312                            && is_type_tx_context(last_param)
313                        {
314                            parameters.pop();
315                        }
316                        Some(EntryFunction {
317                            package: package_id,
318                            module: module_name.clone(),
319                            function: func_name.to_string(),
320                            parameters: parameters
321                                .into_iter()
322                                .map(|rc_ty| (*rc_ty).clone())
323                                .collect(),
324                        })
325                    })
326                    .collect::<Vec<_>>()
327            })
328            .collect();
329        info!(
330            "Number of entry functions discovered: {:?}",
331            entry_functions.len()
332        );
333        debug!("Entry functions: {:?}", entry_functions);
334        self.entry_functions.write().await.extend(entry_functions);
335    }
336
337    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
338    pub async fn publish_package(&mut self, path: &Path) {
339        let rgp = self.cluster.get_reference_gas_price().await;
340        let package = BuildConfig::new_for_testing()
341            .build_async(path)
342            .await
343            .unwrap();
344        let modules = package.get_package_bytes(false);
345        let tx_data = TransactionData::new_module(
346            self.address,
347            self.gas_object,
348            modules,
349            package.dependency_ids.published.values().cloned().collect(),
350            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
351            rgp,
352        );
353        let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
354        let tx_digest = *tx.digest();
355        info!(?tx_digest, "Publishing package");
356        let start = Instant::now();
357        let response = loop {
358            match self
359                .cluster
360                .wallet
361                .execute_transaction_may_fail(tx.clone())
362                .await
363            {
364                Ok(response) => {
365                    break response;
366                }
367                Err(err) => {
368                    if start.elapsed() > Duration::from_secs(120) {
369                        fatal!(
370                            "Failed to publish package after 120 seconds: {} {}",
371                            err,
372                            tx.digest()
373                        );
374                    }
375                    error!(?tx_digest, "Failed to publish package: {}", err);
376                    tokio::time::sleep(Duration::from_secs(1)).await;
377                }
378            }
379        };
380        info!("Successfully published package in {:?}", path);
381        self.process_tx_effects(&response.effects).await;
382    }
383
384    pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
385        self.owned_objects
386            .get(type_tag)
387            .map(|objects| objects.len())
388            .unwrap_or(0)
389    }
390
391    pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
392        self.immutable_objects
393            .read()
394            .await
395            .get(type_tag)
396            .map(|objects| objects.len())
397            .unwrap_or(0)
398    }
399
400    pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
401        self.shared_objects
402            .read()
403            .await
404            .get(type_tag)
405            .map(|objects| objects.len())
406            .unwrap_or(0)
407    }
408
409    pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
410        self.owned_objects
411            .get_mut(type_tag)
412            .unwrap()
413            .swap_remove_index(n)
414            .unwrap()
415    }
416
417    pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
418        self.immutable_objects.read().await.get(type_tag).unwrap()[n]
419    }
420
421    pub async fn choose_nth_shared_object(
422        &self,
423        type_tag: &StructTag,
424        n: usize,
425    ) -> ConsensusObjectSequenceKey {
426        self.shared_objects.read().await.get(type_tag).unwrap()[n]
427    }
428}
429
430fn is_type_tx_context(ty: &Type) -> bool {
431    match ty {
432        Type::Reference(_, inner) => match inner.as_ref() {
433            Type::Datatype(dt) => {
434                dt.module.address == SUI_FRAMEWORK_ADDRESS
435                    && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
436                    && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
437                    && dt.type_arguments.is_empty()
438            }
439            _ => false,
440        },
441        _ => false,
442    }
443}