sui_json_rpc/
governance_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::cmp::max;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cached::SizedCache;
10use cached::proc_macro::cached;
11use itertools::Itertools;
12use jsonrpsee::RpcModule;
13use jsonrpsee::core::RpcResult;
14use mysten_common::ZipDebugEqIteratorExt;
15use tracing::{info, instrument};
16
17use sui_core::authority::AuthorityState;
18use sui_json_rpc_api::{GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics};
19use sui_json_rpc_types::{DelegatedStake, Stake, StakeStatus};
20use sui_json_rpc_types::{SuiCommittee, ValidatorApy, ValidatorApys};
21use sui_open_rpc::Module;
22use sui_types::base_types::{ObjectID, SuiAddress};
23use sui_types::committee::EpochId;
24use sui_types::dynamic_field::get_dynamic_field_from_store;
25use sui_types::error::{SuiError, SuiErrorKind, UserInputError};
26use sui_types::governance::StakedSui;
27use sui_types::id::ID;
28use sui_types::object::ObjectRead;
29use sui_types::sui_serde::BigInt;
30use sui_types::sui_system_state::PoolTokenExchangeRate;
31use sui_types::sui_system_state::SuiSystemStateTrait;
32use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
33use sui_types::sui_system_state::{SuiSystemState, get_validator_from_table};
34
35use crate::authority_state::StateRead;
36use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
37use crate::{ObjectProvider, SuiRpcModule, with_tracing};
38
39#[derive(Clone)]
40pub struct GovernanceReadApi {
41    state: Arc<dyn StateRead>,
42    pub metrics: Arc<JsonRpcMetrics>,
43}
44
45impl GovernanceReadApi {
46    pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
47        Self { state, metrics }
48    }
49
50    async fn get_staked_sui(&self, owner: SuiAddress) -> Result<Vec<StakedSui>, Error> {
51        let state = self.state.clone();
52        let result = state.get_staked_sui(owner).await?;
53
54        self.metrics
55            .get_stake_sui_result_size
56            .observe(result.len() as f64);
57        self.metrics
58            .get_stake_sui_result_size_total
59            .inc_by(result.len() as u64);
60        Ok(result)
61    }
62
63    async fn get_stakes_by_ids(
64        &self,
65        staked_sui_ids: Vec<ObjectID>,
66    ) -> Result<Vec<DelegatedStake>, Error> {
67        let state = self.state.clone();
68        let stakes_read: Vec<_> = staked_sui_ids
69            .iter()
70            .map(|id| state.get_object_read(id))
71            .collect::<Result<Vec<_>, _>>()?;
72
73        if stakes_read.is_empty() {
74            return Ok(vec![]);
75        }
76
77        let mut stakes: Vec<(StakedSui, bool)> = vec![];
78        for stake in stakes_read.into_iter() {
79            match stake {
80                ObjectRead::Exists(_, o, _) => stakes.push((StakedSui::try_from(&o)?, true)),
81                ObjectRead::Deleted(oref) => {
82                    match self
83                        .state
84                        .find_object_lt_or_eq_version(&oref.0, &oref.1.one_before().unwrap())
85                        .await?
86                    {
87                        Some(o) => stakes.push((StakedSui::try_from(&o)?, false)),
88                        None => Err(SuiRpcInputError::UserInputError(
89                            UserInputError::ObjectNotFound {
90                                object_id: oref.0,
91                                version: None,
92                            },
93                        ))?,
94                    }
95                }
96                ObjectRead::NotExists(id) => Err(SuiRpcInputError::UserInputError(
97                    UserInputError::ObjectNotFound {
98                        object_id: id,
99                        version: None,
100                    },
101                ))?,
102            }
103        }
104
105        self.get_delegated_stakes(stakes).await
106    }
107
108    async fn get_stakes(&self, owner: SuiAddress) -> Result<Vec<DelegatedStake>, Error> {
109        let timer = self.metrics.get_stake_sui_latency.start_timer();
110        let stakes = self.get_staked_sui(owner).await?;
111        if stakes.is_empty() {
112            return Ok(vec![]);
113        }
114        drop(timer);
115
116        let _timer = self.metrics.get_delegated_sui_latency.start_timer();
117
118        self.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
119            .await
120    }
121
122    async fn get_delegated_stakes(
123        &self,
124        stakes: Vec<(StakedSui, bool)>,
125    ) -> Result<Vec<DelegatedStake>, Error> {
126        let pools = stakes.into_iter().fold(
127            BTreeMap::<_, Vec<_>>::new(),
128            |mut pools, (stake, exists)| {
129                pools
130                    .entry(stake.pool_id())
131                    .or_default()
132                    .push((stake, exists));
133                pools
134            },
135        );
136
137        let system_state = self.get_system_state()?;
138        let system_state_summary: SuiSystemStateSummary =
139            system_state.clone().into_sui_system_state_summary();
140
141        let rates = exchange_rates(&self.state, system_state_summary.epoch)
142            .await?
143            .into_iter()
144            .map(|rates| (rates.pool_id, rates))
145            .collect::<BTreeMap<_, _>>();
146
147        let mut delegated_stakes = vec![];
148        for (pool_id, stakes) in pools {
149            // Rate table and rate can be null when the pool is not active
150            let rate_table = rates.get(&pool_id).ok_or_else(|| {
151                SuiRpcInputError::GenericNotFound(
152                    "Cannot find rates for staking pool {pool_id}".to_string(),
153                )
154            })?;
155            let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
156
157            let mut delegations = vec![];
158            for (stake, exists) in stakes {
159                let status = if !exists {
160                    StakeStatus::Unstaked
161                } else if system_state_summary.epoch >= stake.activation_epoch() {
162                    // TODO: use dev_inspect to call a move function to get the estimated reward
163                    let estimated_reward = if let Some(current_rate) = current_rate {
164                        let stake_rate = rate_table
165                            .rates
166                            .iter()
167                            .find_map(|(epoch, rate)| {
168                                if *epoch == stake.activation_epoch() {
169                                    Some(rate.clone())
170                                } else {
171                                    None
172                                }
173                            })
174                            .unwrap_or_default();
175                        let estimated_reward = ((stake_rate.rate() / current_rate.rate()) - 1.0)
176                            * stake.principal() as f64;
177                        max(0, estimated_reward.round() as u64)
178                    } else {
179                        0
180                    };
181                    StakeStatus::Active { estimated_reward }
182                } else {
183                    StakeStatus::Pending
184                };
185                delegations.push(Stake {
186                    staked_sui_id: stake.id(),
187                    // TODO: this might change when we implement warm up period.
188                    stake_request_epoch: stake.activation_epoch() - 1,
189                    stake_active_epoch: stake.activation_epoch(),
190                    principal: stake.principal(),
191                    status,
192                })
193            }
194            delegated_stakes.push(DelegatedStake {
195                validator_address: rate_table.address,
196                staking_pool: pool_id,
197                stakes: delegations,
198            })
199        }
200        Ok(delegated_stakes)
201    }
202
203    fn get_system_state(&self) -> Result<SuiSystemState, Error> {
204        Ok(self.state.get_system_state()?)
205    }
206}
207
208#[async_trait]
209impl GovernanceReadApiServer for GovernanceReadApi {
210    #[instrument(skip(self))]
211    async fn get_stakes_by_ids(
212        &self,
213        staked_sui_ids: Vec<ObjectID>,
214    ) -> RpcResult<Vec<DelegatedStake>> {
215        with_tracing!(async move { self.get_stakes_by_ids(staked_sui_ids).await })
216    }
217
218    #[instrument(skip(self))]
219    async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
220        with_tracing!(async move { self.get_stakes(owner).await })
221    }
222
223    #[instrument(skip(self))]
224    async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<SuiCommittee> {
225        with_tracing!(async move {
226            self.state
227                .get_or_latest_committee(epoch)
228                .map(|committee| committee.into())
229                .map_err(Error::from)
230        })
231    }
232
233    #[instrument(skip(self))]
234    async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
235        with_tracing!(async move {
236            Ok(self
237                .state
238                .get_system_state()
239                .map_err(Error::from)?
240                .into_sui_system_state_summary())
241        })
242    }
243
244    #[instrument(skip(self))]
245    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
246        with_tracing!(async move {
247            let epoch_store = self.state.load_epoch_store_one_call_per_task();
248            Ok(epoch_store.reference_gas_price().into())
249        })
250    }
251
252    #[instrument(skip(self))]
253    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
254        info!("get_validator_apy");
255        let system_state_summary: SuiSystemStateSummary =
256            self.get_latest_sui_system_state().await?;
257
258        let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch).await?;
259
260        let apys = calculate_apys(
261            system_state_summary.stake_subsidy_start_epoch,
262            exchange_rate_table,
263        );
264
265        Ok(ValidatorApys {
266            apys,
267            epoch: system_state_summary.epoch,
268        })
269    }
270}
271
272pub fn calculate_apys(
273    stake_subsidy_start_epoch: u64,
274    exchange_rate_table: Vec<ValidatorExchangeRates>,
275) -> Vec<ValidatorApy> {
276    let mut apys = vec![];
277
278    for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
279        // we start the apy calculation from the epoch when the stake subsidy starts
280        let exchange_rates = rates.rates.into_iter().filter_map(|(epoch, rate)| {
281            if epoch >= stake_subsidy_start_epoch {
282                Some(rate)
283            } else {
284                None
285            }
286        });
287
288        // we need at least 2 data points to calculate apy
289        let average_apy = if exchange_rates.clone().count() >= 2 {
290            // rates are sorted by epoch in descending order.
291            let er_e = exchange_rates.clone().dropping(1);
292            // rate e+1
293            let er_e_1 = exchange_rates.dropping_back(1);
294            let apys = er_e
295                .zip_debug_eq(er_e_1)
296                .map(calculate_apy)
297                .filter(|apy| *apy > 0.0 && *apy < 0.1)
298                .take(30)
299                .collect::<Vec<_>>();
300
301            let apy_counts = apys.len() as f64;
302            apys.iter().sum::<f64>() / apy_counts
303        } else {
304            0.0
305        };
306        apys.push(ValidatorApy {
307            address: rates.address,
308            apy: average_apy,
309        });
310    }
311    apys
312}
313
314#[test]
315fn test_apys_calculation_filter_outliers() {
316    // staking pool exchange rates extracted from mainnet
317    let file =
318        std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
319    let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
320        serde_json::from_reader(file).unwrap();
321
322    let mut address_map = BTreeMap::new();
323
324    let exchange_rates = rates
325        .into_iter()
326        .map(|(validator, rates)| {
327            let address = SuiAddress::random_for_testing_only();
328            address_map.insert(address, validator);
329            ValidatorExchangeRates {
330                address,
331                pool_id: ObjectID::random(),
332                active: true,
333                rates,
334            }
335        })
336        .collect();
337
338    let apys = calculate_apys(20, exchange_rates);
339
340    for apy in apys {
341        println!("{}: {}", address_map[&apy.address], apy.apy);
342        assert!(apy.apy < 0.07)
343    }
344}
345
346// APY_e = (ER_e+1 / ER_e) ^ 365
347fn calculate_apy((rate_e, rate_e_1): (PoolTokenExchangeRate, PoolTokenExchangeRate)) -> f64 {
348    (rate_e.rate() / rate_e_1.rate()).powf(365.0) - 1.0
349}
350
351/// Cached exchange rates for validators for the given epoch, the cache size is 1, it will be cleared when the epoch changes.
352/// rates are in descending order by epoch.
353#[cached(
354    type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
355    create = "{ SizedCache::with_size(1) }",
356    convert = "{ _current_epoch }",
357    result = true
358)]
359async fn exchange_rates(
360    state: &Arc<dyn StateRead>,
361    _current_epoch: EpochId,
362) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
363    let system_state = state.get_system_state()?;
364    let system_state_summary: SuiSystemStateSummary = system_state.into_sui_system_state_summary();
365
366    // Get validator rate tables
367    let mut tables = vec![];
368
369    for validator in system_state_summary.active_validators {
370        tables.push((
371            validator.sui_address,
372            validator.staking_pool_id,
373            validator.exchange_rates_id,
374            validator.exchange_rates_size,
375            true,
376        ));
377    }
378
379    // Get inactive validator rate tables
380    for df in state.get_dynamic_fields(
381        system_state_summary.inactive_pools_id,
382        None,
383        system_state_summary.inactive_pools_size as usize,
384    )? {
385        let pool_id: ID = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
386            SuiErrorKind::ObjectDeserializationError {
387                error: e.to_string(),
388            }
389        })?;
390        let validator = get_validator_from_table(
391            state.get_object_store().as_ref(),
392            system_state_summary.inactive_pools_id,
393            &pool_id,
394        )?; // TODO(wlmyng): roll this into StateReadError
395        tables.push((
396            validator.sui_address,
397            validator.staking_pool_id,
398            validator.exchange_rates_id,
399            validator.exchange_rates_size,
400            false,
401        ));
402    }
403
404    let mut exchange_rates = vec![];
405    // Get exchange rates for each validator
406    for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
407        let mut rates = state
408            .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
409            .into_iter()
410            .map(|df| {
411                let epoch: EpochId = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
412                    SuiErrorKind::ObjectDeserializationError {
413                        error: e.to_string(),
414                    }
415                })?;
416
417                let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
418                    &state.get_object_store().as_ref(),
419                    exchange_rates_id,
420                    &epoch,
421                )?;
422
423                Ok::<_, SuiError>((epoch, exchange_rate))
424            })
425            .collect::<Result<Vec<_>, _>>()?;
426
427        // Rates for some epochs might be missing due to safe mode, we need to backfill them.
428        rates = backfill_rates(rates);
429
430        exchange_rates.push(ValidatorExchangeRates {
431            address,
432            pool_id,
433            active,
434            rates,
435        });
436    }
437    Ok(exchange_rates)
438}
439
440#[derive(Clone, Debug)]
441pub struct ValidatorExchangeRates {
442    pub address: SuiAddress,
443    pub pool_id: ObjectID,
444    pub active: bool,
445    pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
446}
447
448/// Backfill missing rates for some epochs due to safe mode. If a rate is missing for epoch e,
449/// we will use the rate for epoch e-1 to fill it.
450/// Rates returned are in descending order by epoch.
451fn backfill_rates(
452    rates: Vec<(EpochId, PoolTokenExchangeRate)>,
453) -> Vec<(EpochId, PoolTokenExchangeRate)> {
454    if rates.is_empty() {
455        return rates;
456    }
457
458    let min_epoch = *rates.iter().map(|(e, _)| e).min().unwrap();
459    let max_epoch = *rates.iter().map(|(e, _)| e).max().unwrap();
460    let mut filled_rates = Vec::new();
461    let mut prev_rate = None;
462
463    for epoch in min_epoch..=max_epoch {
464        match rates.iter().find(|(e, _)| *e == epoch) {
465            Some((e, rate)) => {
466                prev_rate = Some(rate.clone());
467                filled_rates.push((*e, rate.clone()));
468            }
469            None => {
470                if let Some(rate) = prev_rate.clone() {
471                    filled_rates.push((epoch, rate));
472                }
473            }
474        }
475    }
476    filled_rates.reverse();
477    filled_rates
478}
479
480impl SuiRpcModule for GovernanceReadApi {
481    fn rpc(self) -> RpcModule<Self> {
482        self.into_rpc()
483    }
484
485    fn rpc_doc_module() -> Module {
486        GovernanceReadApiOpenRpc::module_doc()
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use sui_types::sui_system_state::PoolTokenExchangeRate;
494
495    #[test]
496    fn test_backfill_rates_empty() {
497        let rates = vec![];
498        assert_eq!(backfill_rates(rates), vec![]);
499    }
500
501    #[test]
502    fn test_backfill_rates_no_gaps() {
503        let rate1 = PoolTokenExchangeRate::new(100, 100);
504        let rate2 = PoolTokenExchangeRate::new(200, 220);
505        let rate3 = PoolTokenExchangeRate::new(300, 330);
506        let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
507
508        let expected: Vec<(u64, PoolTokenExchangeRate)> =
509            vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
510        assert_eq!(backfill_rates(rates), expected);
511    }
512
513    #[test]
514    fn test_backfill_rates_with_gaps() {
515        let rate1 = PoolTokenExchangeRate::new(100, 100);
516        let rate3 = PoolTokenExchangeRate::new(300, 330);
517        let rate5 = PoolTokenExchangeRate::new(500, 550);
518        let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
519
520        let expected = vec![
521            (5, rate5.clone()),
522            (4, rate3.clone()),
523            (3, rate3.clone()),
524            (2, rate1.clone()),
525            (1, rate1),
526        ];
527        assert_eq!(backfill_rates(rates), expected);
528    }
529}