sui_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cached::SizedCache;
10use cached::proc_macro::cached;
11use itertools::Itertools;
12use jsonrpsee::RpcModule;
13use jsonrpsee::core::RpcResult;
14use tracing::{info, instrument};
15
16use sui_core::authority::AuthorityState;
17use sui_json_rpc_api::{GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics};
18use sui_json_rpc_types::{DelegatedStake, Stake, StakeStatus};
19use sui_json_rpc_types::{SuiCommittee, ValidatorApy, ValidatorApys};
20use sui_open_rpc::Module;
21use sui_types::base_types::{ObjectID, SuiAddress};
22use sui_types::committee::EpochId;
23use sui_types::dynamic_field::get_dynamic_field_from_store;
24use sui_types::error::{SuiError, SuiErrorKind, UserInputError};
25use sui_types::governance::StakedSui;
26use sui_types::id::ID;
27use sui_types::object::ObjectRead;
28use sui_types::sui_serde::BigInt;
29use sui_types::sui_system_state::PoolTokenExchangeRate;
30use sui_types::sui_system_state::SuiSystemStateTrait;
31use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
32use sui_types::sui_system_state::{SuiSystemState, get_validator_from_table};
33
34use crate::authority_state::StateRead;
35use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
36use crate::{ObjectProvider, SuiRpcModule, with_tracing};
37
38#[derive(Clone)]
39pub struct GovernanceReadApi {
40    state: Arc<dyn StateRead>,
41    pub metrics: Arc<JsonRpcMetrics>,
42}
43
44impl GovernanceReadApi {
45    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
46        Self { state, metrics }
47    }
48
49    async fn get_staked_sui(&self, owner: SuiAddress) -> Result<Vec<StakedSui>, Error> {
50        let state = self.state.clone();
51        let result = state.get_staked_sui(owner).await?;
52
53        self.metrics
54            .get_stake_sui_result_size
55            .observe(result.len() as f64);
56        self.metrics
57            .get_stake_sui_result_size_total
58            .inc_by(result.len() as u64);
59        Ok(result)
60    }
61
62    async fn get_stakes_by_ids(
63        &self,
64        staked_sui_ids: Vec<ObjectID>,
65    ) -> Result<Vec<DelegatedStake>, Error> {
66        let state = self.state.clone();
67        let stakes_read: Vec<_> = staked_sui_ids
68            .iter()
69            .map(|id| state.get_object_read(id))
70            .collect::<Result<Vec<_>, _>>()?;
71
72        if stakes_read.is_empty() {
73            return Ok(vec![]);
74        }
75
76        let mut stakes: Vec<(StakedSui, bool)> = vec![];
77        for stake in stakes_read.into_iter() {
78            match stake {
79                ObjectRead::Exists(_, o, _) => stakes.push((StakedSui::try_from(&o)?, true)),
80                ObjectRead::Deleted(oref) => {
81                    match self
82                        .state
83                        .find_object_lt_or_eq_version(&oref.0, &oref.1.one_before().unwrap())
84                        .await?
85                    {
86                        Some(o) => stakes.push((StakedSui::try_from(&o)?, false)),
87                        None => Err(SuiRpcInputError::UserInputError(
88                            UserInputError::ObjectNotFound {
89                                object_id: oref.0,
90                                version: None,
91                            },
92                        ))?,
93                    }
94                }
95                ObjectRead::NotExists(id) => Err(SuiRpcInputError::UserInputError(
96                    UserInputError::ObjectNotFound {
97                        object_id: id,
98                        version: None,
99                    },
100                ))?,
101            }
102        }
103
104        self.get_delegated_stakes(stakes).await
105    }
106
107    async fn get_stakes(&self, owner: SuiAddress) -> Result<Vec<DelegatedStake>, Error> {
108        let timer = self.metrics.get_stake_sui_latency.start_timer();
109        let stakes = self.get_staked_sui(owner).await?;
110        if stakes.is_empty() {
111            return Ok(vec![]);
112        }
113        drop(timer);
114
115        let _timer = self.metrics.get_delegated_sui_latency.start_timer();
116
117        self.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
118            .await
119    }
120
121    async fn get_delegated_stakes(
122        &self,
123        stakes: Vec<(StakedSui, bool)>,
124    ) -> Result<Vec<DelegatedStake>, Error> {
125        let pools = stakes.into_iter().fold(
126            BTreeMap::<_, Vec<_>>::new(),
127            |mut pools, (stake, exists)| {
128                pools
129                    .entry(stake.pool_id())
130                    .or_default()
131                    .push((stake, exists));
132                pools
133            },
134        );
135
136        let system_state = self.get_system_state()?;
137        let system_state_summary: SuiSystemStateSummary =
138            system_state.clone().into_sui_system_state_summary();
139
140        let rates = exchange_rates(&self.state, system_state_summary.epoch)
141            .await?
142            .into_iter()
143            .map(|rates| (rates.pool_id, rates))
144            .collect::<BTreeMap<_, _>>();
145
146        let mut delegated_stakes = vec![];
147        for (pool_id, stakes) in pools {
148            // Rate table and rate can be null when the pool is not active
149            let rate_table = rates.get(&pool_id).ok_or_else(|| {
150                SuiRpcInputError::GenericNotFound(
151                    "Cannot find rates for staking pool {pool_id}".to_string(),
152                )
153            })?;
154            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
155
156            let mut delegations = vec![];
157            for (stake, exists) in stakes {
158                let status = if !exists {
159                    StakeStatus::Unstaked
160                } else if system_state_summary.epoch >= stake.activation_epoch() {
161                    // TODO: use dev_inspect to call a move function to get the estimated reward
162                    let estimated_reward = if let Some(current_rate) = current_rate {
163                        let stake_rate = rate_table
164                            .rates
165                            .iter()
166                            .find_map(|(epoch, rate)| {
167                                if *epoch == stake.activation_epoch() {
168                                    Some(rate.clone())
169                                } else {
170                                    None
171                                }
172                            })
173                            .unwrap_or_default();
174                        let estimated_reward = ((stake_rate.rate() / current_rate.rate()) - 1.0)
175                            * stake.principal() as f64;
176                        max(0, estimated_reward.round() as u64)
177                    } else {
178                        0
179                    };
180                    StakeStatus::Active { estimated_reward }
181                } else {
182                    StakeStatus::Pending
183                };
184                delegations.push(Stake {
185                    staked_sui_id: stake.id(),
186                    // TODO: this might change when we implement warm up period.
187                    stake_request_epoch: stake.activation_epoch() - 1,
188                    stake_active_epoch: stake.activation_epoch(),
189                    principal: stake.principal(),
190                    status,
191                })
192            }
193            delegated_stakes.push(DelegatedStake {
194                validator_address: rate_table.address,
195                staking_pool: pool_id,
196                stakes: delegations,
197            })
198        }
199        Ok(delegated_stakes)
200    }
201
202    fn get_system_state(&self) -> Result<SuiSystemState, Error> {
203        Ok(self.state.get_system_state()?)
204    }
205}
206
207#[async_trait]
208impl GovernanceReadApiServer for GovernanceReadApi {
209    #[instrument(skip(self))]
210    async fn get_stakes_by_ids(
211        &self,
212        staked_sui_ids: Vec<ObjectID>,
213    ) -> RpcResult<Vec<DelegatedStake>> {
214        with_tracing!(async move { self.get_stakes_by_ids(staked_sui_ids).await })
215    }
216
217    #[instrument(skip(self))]
218    async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
219        with_tracing!(async move { self.get_stakes(owner).await })
220    }
221
222    #[instrument(skip(self))]
223    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<SuiCommittee> {
224        with_tracing!(async move {
225            self.state
226                .get_or_latest_committee(epoch)
227                .map(|committee| committee.into())
228                .map_err(Error::from)
229        })
230    }
231
232    #[instrument(skip(self))]
233    async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
234        with_tracing!(async move {
235            Ok(self
236                .state
237                .get_system_state()
238                .map_err(Error::from)?
239                .into_sui_system_state_summary())
240        })
241    }
242
243    #[instrument(skip(self))]
244    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
245        with_tracing!(async move {
246            let epoch_store = self.state.load_epoch_store_one_call_per_task();
247            Ok(epoch_store.reference_gas_price().into())
248        })
249    }
250
251    #[instrument(skip(self))]
252    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
253        info!("get_validator_apy");
254        let system_state_summary: SuiSystemStateSummary =
255            self.get_latest_sui_system_state().await?;
256
257        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch).await?;
258
259        let apys = calculate_apys(
260            system_state_summary.stake_subsidy_start_epoch,
261            exchange_rate_table,
262        );
263
264        Ok(ValidatorApys {
265            apys,
266            epoch: system_state_summary.epoch,
267        })
268    }
269}
270
271pub fn calculate_apys(
272    stake_subsidy_start_epoch: u64,
273    exchange_rate_table: Vec<ValidatorExchangeRates>,
274) -> Vec<ValidatorApy> {
275    let mut apys = vec![];
276
277    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
278        // we start the apy calculation from the epoch when the stake subsidy starts
279        let exchange_rates = rates.rates.into_iter().filter_map(|(epoch, rate)| {
280            if epoch >= stake_subsidy_start_epoch {
281                Some(rate)
282            } else {
283                None
284            }
285        });
286
287        // we need at least 2 data points to calculate apy
288        let average_apy = if exchange_rates.clone().count() >= 2 {
289            // rates are sorted by epoch in descending order.
290            let er_e = exchange_rates.clone().dropping(1);
291            // rate e+1
292            let er_e_1 = exchange_rates.dropping_back(1);
293            let apys = er_e
294                .zip(er_e_1)
295                .map(calculate_apy)
296                .filter(|apy| *apy > 0.0 && *apy < 0.1)
297                .take(30)
298                .collect::<Vec<_>>();
299
300            let apy_counts = apys.len() as f64;
301            apys.iter().sum::<f64>() / apy_counts
302        } else {
303            0.0
304        };
305        apys.push(ValidatorApy {
306            address: rates.address,
307            apy: average_apy,
308        });
309    }
310    apys
311}
312
313#[test]
314fn test_apys_calculation_filter_outliers() {
315    // staking pool exchange rates extracted from mainnet
316    let file =
317        std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
318    let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
319        serde_json::from_reader(file).unwrap();
320
321    let mut address_map = BTreeMap::new();
322
323    let exchange_rates = rates
324        .into_iter()
325        .map(|(validator, rates)| {
326            let address = SuiAddress::random_for_testing_only();
327            address_map.insert(address, validator);
328            ValidatorExchangeRates {
329                address,
330                pool_id: ObjectID::random(),
331                active: true,
332                rates,
333            }
334        })
335        .collect();
336
337    let apys = calculate_apys(20, exchange_rates);
338
339    for apy in apys {
340        println!("{}: {}", address_map[&apy.address], apy.apy);
341        assert!(apy.apy < 0.07)
342    }
343}
344
345// APY_e = (ER_e+1 / ER_e) ^ 365
346fn calculate_apy((rate_e, rate_e_1): (PoolTokenExchangeRate, PoolTokenExchangeRate)) -> f64 {
347    (rate_e.rate() / rate_e_1.rate()).powf(365.0) - 1.0
348}
349
350/// Cached exchange rates for validators for the given epoch, the cache size is 1, it will be cleared when the epoch changes.
351/// rates are in descending order by epoch.
352#[cached(
353    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
354    create = "{ SizedCache::with_size(1) }",
355    convert = "{ _current_epoch }",
356    result = true
357)]
358async fn exchange_rates(
359    state: &Arc<dyn StateRead>,
360    _current_epoch: EpochId,
361) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
362    let system_state = state.get_system_state()?;
363    let system_state_summary: SuiSystemStateSummary = system_state.into_sui_system_state_summary();
364
365    // Get validator rate tables
366    let mut tables = vec![];
367
368    for validator in system_state_summary.active_validators {
369        tables.push((
370            validator.sui_address,
371            validator.staking_pool_id,
372            validator.exchange_rates_id,
373            validator.exchange_rates_size,
374            true,
375        ));
376    }
377
378    // Get inactive validator rate tables
379    for df in state.get_dynamic_fields(
380        system_state_summary.inactive_pools_id,
381        None,
382        system_state_summary.inactive_pools_size as usize,
383    )? {
384        let pool_id: ID = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
385            SuiErrorKind::ObjectDeserializationError {
386                error: e.to_string(),
387            }
388        })?;
389        let validator = get_validator_from_table(
390            state.get_object_store().as_ref(),
391            system_state_summary.inactive_pools_id,
392            &pool_id,
393        )?; // TODO(wlmyng): roll this into StateReadError
394        tables.push((
395            validator.sui_address,
396            validator.staking_pool_id,
397            validator.exchange_rates_id,
398            validator.exchange_rates_size,
399            false,
400        ));
401    }
402
403    let mut exchange_rates = vec![];
404    // Get exchange rates for each validator
405    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
406        let mut rates = state
407            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
408            .into_iter()
409            .map(|df| {
410                let epoch: EpochId = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
411                    SuiErrorKind::ObjectDeserializationError {
412                        error: e.to_string(),
413                    }
414                })?;
415
416                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
417                    &state.get_object_store().as_ref(),
418                    exchange_rates_id,
419                    &epoch,
420                )?;
421
422                Ok::<_, SuiError>((epoch, exchange_rate))
423            })
424            .collect::<Result<Vec<_>, _>>()?;
425
426        // Rates for some epochs might be missing due to safe mode, we need to backfill them.
427        rates = backfill_rates(rates);
428
429        exchange_rates.push(ValidatorExchangeRates {
430            address,
431            pool_id,
432            active,
433            rates,
434        });
435    }
436    Ok(exchange_rates)
437}
438
439#[derive(Clone, Debug)]
440pub struct ValidatorExchangeRates {
441    pub address: SuiAddress,
442    pub pool_id: ObjectID,
443    pub active: bool,
444    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
445}
446
447/// Backfill missing rates for some epochs due to safe mode. If a rate is missing for epoch e,
448/// we will use the rate for epoch e-1 to fill it.
449/// Rates returned are in descending order by epoch.
450fn backfill_rates(
451    rates: Vec<(EpochId, PoolTokenExchangeRate)>,
452) -> Vec<(EpochId, PoolTokenExchangeRate)> {
453    if rates.is_empty() {
454        return rates;
455    }
456
457    let min_epoch = *rates.iter().map(|(e, _)| e).min().unwrap();
458    let max_epoch = *rates.iter().map(|(e, _)| e).max().unwrap();
459    let mut filled_rates = Vec::new();
460    let mut prev_rate = None;
461
462    for epoch in min_epoch..=max_epoch {
463        match rates.iter().find(|(e, _)| *e == epoch) {
464            Some((e, rate)) => {
465                prev_rate = Some(rate.clone());
466                filled_rates.push((*e, rate.clone()));
467            }
468            None => {
469                if let Some(rate) = prev_rate.clone() {
470                    filled_rates.push((epoch, rate));
471                }
472            }
473        }
474    }
475    filled_rates.reverse();
476    filled_rates
477}
478
479impl SuiRpcModule for GovernanceReadApi {
480    fn rpc(self) -> RpcModule<Self> {
481        self.into_rpc()
482    }
483
484    fn rpc_doc_module() -> Module {
485        GovernanceReadApiOpenRpc::module_doc()
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use sui_types::sui_system_state::PoolTokenExchangeRate;
493
494    #[test]
495    fn test_backfill_rates_empty() {
496        let rates = vec![];
497        assert_eq!(backfill_rates(rates), vec![]);
498    }
499
500    #[test]
501    fn test_backfill_rates_no_gaps() {
502        let rate1 = PoolTokenExchangeRate::new(100, 100);
503        let rate2 = PoolTokenExchangeRate::new(200, 220);
504        let rate3 = PoolTokenExchangeRate::new(300, 330);
505        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
506
507        let expected: Vec<(u64, PoolTokenExchangeRate)> =
508            vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
509        assert_eq!(backfill_rates(rates), expected);
510    }
511
512    #[test]
513    fn test_backfill_rates_with_gaps() {
514        let rate1 = PoolTokenExchangeRate::new(100, 100);
515        let rate3 = PoolTokenExchangeRate::new(300, 330);
516        let rate5 = PoolTokenExchangeRate::new(500, 550);
517        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
518
519        let expected = vec![
520            (5, rate5.clone()),
521            (4, rate3.clone()),
522            (3, rate3.clone()),
523            (2, rate1.clone()),
524            (1, rate1),
525        ];
526        assert_eq!(backfill_rates(rates), expected);
527    }
528}