1use std::cmp::max;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cached::SizedCache;
10use cached::proc_macro::cached;
11use itertools::Itertools;
12use jsonrpsee::RpcModule;
13use jsonrpsee::core::RpcResult;
14use mysten_common::ZipDebugEqIteratorExt;
15use tracing::{info, instrument};
16
17use sui_core::authority::AuthorityState;
18use sui_json_rpc_api::{GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics};
19use sui_json_rpc_types::{DelegatedStake, Stake, StakeStatus};
20use sui_json_rpc_types::{SuiCommittee, ValidatorApy, ValidatorApys};
21use sui_open_rpc::Module;
22use sui_types::base_types::{ObjectID, SuiAddress};
23use sui_types::committee::EpochId;
24use sui_types::dynamic_field::get_dynamic_field_from_store;
25use sui_types::error::{SuiError, SuiErrorKind, UserInputError};
26use sui_types::governance::StakedSui;
27use sui_types::id::ID;
28use sui_types::object::ObjectRead;
29use sui_types::sui_serde::BigInt;
30use sui_types::sui_system_state::PoolTokenExchangeRate;
31use sui_types::sui_system_state::SuiSystemStateTrait;
32use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
33use sui_types::sui_system_state::{SuiSystemState, get_validator_from_table};
34
35use crate::authority_state::StateRead;
36use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
37use crate::{ObjectProvider, SuiRpcModule, with_tracing};
38
39#[derive(Clone)]
40pub struct GovernanceReadApi {
41 state: Arc<dyn StateRead>,
42 pub metrics: Arc<JsonRpcMetrics>,
43}
44
45impl GovernanceReadApi {
46 pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
47 Self { state, metrics }
48 }
49
50 async fn get_staked_sui(&self, owner: SuiAddress) -> Result<Vec<StakedSui>, Error> {
51 let state = self.state.clone();
52 let result = state.get_staked_sui(owner).await?;
53
54 self.metrics
55 .get_stake_sui_result_size
56 .observe(result.len() as f64);
57 self.metrics
58 .get_stake_sui_result_size_total
59 .inc_by(result.len() as u64);
60 Ok(result)
61 }
62
63 async fn get_stakes_by_ids(
64 &self,
65 staked_sui_ids: Vec<ObjectID>,
66 ) -> Result<Vec<DelegatedStake>, Error> {
67 let state = self.state.clone();
68 let stakes_read: Vec<_> = staked_sui_ids
69 .iter()
70 .map(|id| state.get_object_read(id))
71 .collect::<Result<Vec<_>, _>>()?;
72
73 if stakes_read.is_empty() {
74 return Ok(vec![]);
75 }
76
77 let mut stakes: Vec<(StakedSui, bool)> = vec![];
78 for stake in stakes_read.into_iter() {
79 match stake {
80 ObjectRead::Exists(_, o, _) => stakes.push((StakedSui::try_from(&o)?, true)),
81 ObjectRead::Deleted(oref) => {
82 match self
83 .state
84 .find_object_lt_or_eq_version(&oref.0, &oref.1.one_before().unwrap())
85 .await?
86 {
87 Some(o) => stakes.push((StakedSui::try_from(&o)?, false)),
88 None => Err(SuiRpcInputError::UserInputError(
89 UserInputError::ObjectNotFound {
90 object_id: oref.0,
91 version: None,
92 },
93 ))?,
94 }
95 }
96 ObjectRead::NotExists(id) => Err(SuiRpcInputError::UserInputError(
97 UserInputError::ObjectNotFound {
98 object_id: id,
99 version: None,
100 },
101 ))?,
102 }
103 }
104
105 self.get_delegated_stakes(stakes).await
106 }
107
108 async fn get_stakes(&self, owner: SuiAddress) -> Result<Vec<DelegatedStake>, Error> {
109 let timer = self.metrics.get_stake_sui_latency.start_timer();
110 let stakes = self.get_staked_sui(owner).await?;
111 if stakes.is_empty() {
112 return Ok(vec![]);
113 }
114 drop(timer);
115
116 let _timer = self.metrics.get_delegated_sui_latency.start_timer();
117
118 self.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
119 .await
120 }
121
122 async fn get_delegated_stakes(
123 &self,
124 stakes: Vec<(StakedSui, bool)>,
125 ) -> Result<Vec<DelegatedStake>, Error> {
126 let pools = stakes.into_iter().fold(
127 BTreeMap::<_, Vec<_>>::new(),
128 |mut pools, (stake, exists)| {
129 pools
130 .entry(stake.pool_id())
131 .or_default()
132 .push((stake, exists));
133 pools
134 },
135 );
136
137 let system_state = self.get_system_state()?;
138 let system_state_summary: SuiSystemStateSummary =
139 system_state.clone().into_sui_system_state_summary();
140
141 let rates = exchange_rates(&self.state, system_state_summary.epoch)
142 .await?
143 .into_iter()
144 .map(|rates| (rates.pool_id, rates))
145 .collect::<BTreeMap<_, _>>();
146
147 let mut delegated_stakes = vec![];
148 for (pool_id, stakes) in pools {
149 let rate_table = rates.get(&pool_id).ok_or_else(|| {
151 SuiRpcInputError::GenericNotFound(
152 "Cannot find rates for staking pool {pool_id}".to_string(),
153 )
154 })?;
155 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
156
157 let mut delegations = vec![];
158 for (stake, exists) in stakes {
159 let status = if !exists {
160 StakeStatus::Unstaked
161 } else if system_state_summary.epoch >= stake.activation_epoch() {
162 let estimated_reward = if let Some(current_rate) = current_rate {
164 let stake_rate = rate_table
165 .rates
166 .iter()
167 .find_map(|(epoch, rate)| {
168 if *epoch == stake.activation_epoch() {
169 Some(rate.clone())
170 } else {
171 None
172 }
173 })
174 .unwrap_or_default();
175 let estimated_reward = ((stake_rate.rate() / current_rate.rate()) - 1.0)
176 * stake.principal() as f64;
177 max(0, estimated_reward.round() as u64)
178 } else {
179 0
180 };
181 StakeStatus::Active { estimated_reward }
182 } else {
183 StakeStatus::Pending
184 };
185 delegations.push(Stake {
186 staked_sui_id: stake.id(),
187 stake_request_epoch: stake.activation_epoch() - 1,
189 stake_active_epoch: stake.activation_epoch(),
190 principal: stake.principal(),
191 status,
192 })
193 }
194 delegated_stakes.push(DelegatedStake {
195 validator_address: rate_table.address,
196 staking_pool: pool_id,
197 stakes: delegations,
198 })
199 }
200 Ok(delegated_stakes)
201 }
202
203 fn get_system_state(&self) -> Result<SuiSystemState, Error> {
204 Ok(self.state.get_system_state()?)
205 }
206}
207
208#[async_trait]
209impl GovernanceReadApiServer for GovernanceReadApi {
210 #[instrument(skip(self))]
211 async fn get_stakes_by_ids(
212 &self,
213 staked_sui_ids: Vec<ObjectID>,
214 ) -> RpcResult<Vec<DelegatedStake>> {
215 with_tracing!(async move { self.get_stakes_by_ids(staked_sui_ids).await })
216 }
217
218 #[instrument(skip(self))]
219 async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
220 with_tracing!(async move { self.get_stakes(owner).await })
221 }
222
223 #[instrument(skip(self))]
224 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<SuiCommittee> {
225 with_tracing!(async move {
226 self.state
227 .get_or_latest_committee(epoch)
228 .map(|committee| committee.into())
229 .map_err(Error::from)
230 })
231 }
232
233 #[instrument(skip(self))]
234 async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
235 with_tracing!(async move {
236 Ok(self
237 .state
238 .get_system_state()
239 .map_err(Error::from)?
240 .into_sui_system_state_summary())
241 })
242 }
243
244 #[instrument(skip(self))]
245 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
246 with_tracing!(async move {
247 let epoch_store = self.state.load_epoch_store_one_call_per_task();
248 Ok(epoch_store.reference_gas_price().into())
249 })
250 }
251
252 #[instrument(skip(self))]
253 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
254 info!("get_validator_apy");
255 let system_state_summary: SuiSystemStateSummary =
256 self.get_latest_sui_system_state().await?;
257
258 let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch).await?;
259
260 let apys = calculate_apys(
261 system_state_summary.stake_subsidy_start_epoch,
262 exchange_rate_table,
263 );
264
265 Ok(ValidatorApys {
266 apys,
267 epoch: system_state_summary.epoch,
268 })
269 }
270}
271
272pub fn calculate_apys(
273 stake_subsidy_start_epoch: u64,
274 exchange_rate_table: Vec<ValidatorExchangeRates>,
275) -> Vec<ValidatorApy> {
276 let mut apys = vec![];
277
278 for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
279 let exchange_rates = rates.rates.into_iter().filter_map(|(epoch, rate)| {
281 if epoch >= stake_subsidy_start_epoch {
282 Some(rate)
283 } else {
284 None
285 }
286 });
287
288 let average_apy = if exchange_rates.clone().count() >= 2 {
290 let er_e = exchange_rates.clone().dropping(1);
292 let er_e_1 = exchange_rates.dropping_back(1);
294 let apys = er_e
295 .zip_debug_eq(er_e_1)
296 .map(calculate_apy)
297 .filter(|apy| *apy > 0.0 && *apy < 0.1)
298 .take(30)
299 .collect::<Vec<_>>();
300
301 let apy_counts = apys.len() as f64;
302 apys.iter().sum::<f64>() / apy_counts
303 } else {
304 0.0
305 };
306 apys.push(ValidatorApy {
307 address: rates.address,
308 apy: average_apy,
309 });
310 }
311 apys
312}
313
314#[test]
315fn test_apys_calculation_filter_outliers() {
316 let file =
318 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
319 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
320 serde_json::from_reader(file).unwrap();
321
322 let mut address_map = BTreeMap::new();
323
324 let exchange_rates = rates
325 .into_iter()
326 .map(|(validator, rates)| {
327 let address = SuiAddress::random_for_testing_only();
328 address_map.insert(address, validator);
329 ValidatorExchangeRates {
330 address,
331 pool_id: ObjectID::random(),
332 active: true,
333 rates,
334 }
335 })
336 .collect();
337
338 let apys = calculate_apys(20, exchange_rates);
339
340 for apy in apys {
341 println!("{}: {}", address_map[&apy.address], apy.apy);
342 assert!(apy.apy < 0.07)
343 }
344}
345
346fn calculate_apy((rate_e, rate_e_1): (PoolTokenExchangeRate, PoolTokenExchangeRate)) -> f64 {
348 (rate_e.rate() / rate_e_1.rate()).powf(365.0) - 1.0
349}
350
351#[cached(
354 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
355 create = "{ SizedCache::with_size(1) }",
356 convert = "{ _current_epoch }",
357 result = true
358)]
359async fn exchange_rates(
360 state: &Arc<dyn StateRead>,
361 _current_epoch: EpochId,
362) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
363 let system_state = state.get_system_state()?;
364 let system_state_summary: SuiSystemStateSummary = system_state.into_sui_system_state_summary();
365
366 let mut tables = vec![];
368
369 for validator in system_state_summary.active_validators {
370 tables.push((
371 validator.sui_address,
372 validator.staking_pool_id,
373 validator.exchange_rates_id,
374 validator.exchange_rates_size,
375 true,
376 ));
377 }
378
379 for df in state.get_dynamic_fields(
381 system_state_summary.inactive_pools_id,
382 None,
383 system_state_summary.inactive_pools_size as usize,
384 )? {
385 let pool_id: ID = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
386 SuiErrorKind::ObjectDeserializationError {
387 error: e.to_string(),
388 }
389 })?;
390 let validator = get_validator_from_table(
391 state.get_object_store().as_ref(),
392 system_state_summary.inactive_pools_id,
393 &pool_id,
394 )?; tables.push((
396 validator.sui_address,
397 validator.staking_pool_id,
398 validator.exchange_rates_id,
399 validator.exchange_rates_size,
400 false,
401 ));
402 }
403
404 let mut exchange_rates = vec![];
405 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
407 let mut rates = state
408 .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
409 .into_iter()
410 .map(|df| {
411 let epoch: EpochId = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
412 SuiErrorKind::ObjectDeserializationError {
413 error: e.to_string(),
414 }
415 })?;
416
417 let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
418 &state.get_object_store().as_ref(),
419 exchange_rates_id,
420 &epoch,
421 )?;
422
423 Ok::<_, SuiError>((epoch, exchange_rate))
424 })
425 .collect::<Result<Vec<_>, _>>()?;
426
427 rates = backfill_rates(rates);
429
430 exchange_rates.push(ValidatorExchangeRates {
431 address,
432 pool_id,
433 active,
434 rates,
435 });
436 }
437 Ok(exchange_rates)
438}
439
440#[derive(Clone, Debug)]
441pub struct ValidatorExchangeRates {
442 pub address: SuiAddress,
443 pub pool_id: ObjectID,
444 pub active: bool,
445 pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
446}
447
448fn backfill_rates(
452 rates: Vec<(EpochId, PoolTokenExchangeRate)>,
453) -> Vec<(EpochId, PoolTokenExchangeRate)> {
454 if rates.is_empty() {
455 return rates;
456 }
457
458 let min_epoch = *rates.iter().map(|(e, _)| e).min().unwrap();
459 let max_epoch = *rates.iter().map(|(e, _)| e).max().unwrap();
460 let mut filled_rates = Vec::new();
461 let mut prev_rate = None;
462
463 for epoch in min_epoch..=max_epoch {
464 match rates.iter().find(|(e, _)| *e == epoch) {
465 Some((e, rate)) => {
466 prev_rate = Some(rate.clone());
467 filled_rates.push((*e, rate.clone()));
468 }
469 None => {
470 if let Some(rate) = prev_rate.clone() {
471 filled_rates.push((epoch, rate));
472 }
473 }
474 }
475 }
476 filled_rates.reverse();
477 filled_rates
478}
479
480impl SuiRpcModule for GovernanceReadApi {
481 fn rpc(self) -> RpcModule<Self> {
482 self.into_rpc()
483 }
484
485 fn rpc_doc_module() -> Module {
486 GovernanceReadApiOpenRpc::module_doc()
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use sui_types::sui_system_state::PoolTokenExchangeRate;
494
495 #[test]
496 fn test_backfill_rates_empty() {
497 let rates = vec![];
498 assert_eq!(backfill_rates(rates), vec![]);
499 }
500
501 #[test]
502 fn test_backfill_rates_no_gaps() {
503 let rate1 = PoolTokenExchangeRate::new(100, 100);
504 let rate2 = PoolTokenExchangeRate::new(200, 220);
505 let rate3 = PoolTokenExchangeRate::new(300, 330);
506 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
507
508 let expected: Vec<(u64, PoolTokenExchangeRate)> =
509 vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
510 assert_eq!(backfill_rates(rates), expected);
511 }
512
513 #[test]
514 fn test_backfill_rates_with_gaps() {
515 let rate1 = PoolTokenExchangeRate::new(100, 100);
516 let rate3 = PoolTokenExchangeRate::new(300, 330);
517 let rate5 = PoolTokenExchangeRate::new(500, 550);
518 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
519
520 let expected = vec![
521 (5, rate5.clone()),
522 (4, rate3.clone()),
523 (3, rate3.clone()),
524 (2, rate1.clone()),
525 (1, rate1),
526 ];
527 assert_eq!(backfill_rates(rates), expected);
528 }
529}