1use std::cmp::max;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use cached::SizedCache;
10use cached::proc_macro::cached;
11use itertools::Itertools;
12use jsonrpsee::RpcModule;
13use jsonrpsee::core::RpcResult;
14use tracing::{info, instrument};
15
16use sui_core::authority::AuthorityState;
17use sui_json_rpc_api::{GovernanceReadApiOpenRpc, GovernanceReadApiServer, JsonRpcMetrics};
18use sui_json_rpc_types::{DelegatedStake, Stake, StakeStatus};
19use sui_json_rpc_types::{SuiCommittee, ValidatorApy, ValidatorApys};
20use sui_open_rpc::Module;
21use sui_types::base_types::{ObjectID, SuiAddress};
22use sui_types::committee::EpochId;
23use sui_types::dynamic_field::get_dynamic_field_from_store;
24use sui_types::error::{SuiError, SuiErrorKind, UserInputError};
25use sui_types::governance::StakedSui;
26use sui_types::id::ID;
27use sui_types::object::ObjectRead;
28use sui_types::sui_serde::BigInt;
29use sui_types::sui_system_state::PoolTokenExchangeRate;
30use sui_types::sui_system_state::SuiSystemStateTrait;
31use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
32use sui_types::sui_system_state::{SuiSystemState, get_validator_from_table};
33
34use crate::authority_state::StateRead;
35use crate::error::{Error, RpcInterimResult, SuiRpcInputError};
36use crate::{ObjectProvider, SuiRpcModule, with_tracing};
37
38#[derive(Clone)]
39pub struct GovernanceReadApi {
40 state: Arc<dyn StateRead>,
41 pub metrics: Arc<JsonRpcMetrics>,
42}
43
44impl GovernanceReadApi {
45 pub fn new(state: Arc<AuthorityState>, metrics: Arc<JsonRpcMetrics>) -> Self {
46 Self { state, metrics }
47 }
48
49 async fn get_staked_sui(&self, owner: SuiAddress) -> Result<Vec<StakedSui>, Error> {
50 let state = self.state.clone();
51 let result = state.get_staked_sui(owner).await?;
52
53 self.metrics
54 .get_stake_sui_result_size
55 .observe(result.len() as f64);
56 self.metrics
57 .get_stake_sui_result_size_total
58 .inc_by(result.len() as u64);
59 Ok(result)
60 }
61
62 async fn get_stakes_by_ids(
63 &self,
64 staked_sui_ids: Vec<ObjectID>,
65 ) -> Result<Vec<DelegatedStake>, Error> {
66 let state = self.state.clone();
67 let stakes_read: Vec<_> = staked_sui_ids
68 .iter()
69 .map(|id| state.get_object_read(id))
70 .collect::<Result<Vec<_>, _>>()?;
71
72 if stakes_read.is_empty() {
73 return Ok(vec![]);
74 }
75
76 let mut stakes: Vec<(StakedSui, bool)> = vec![];
77 for stake in stakes_read.into_iter() {
78 match stake {
79 ObjectRead::Exists(_, o, _) => stakes.push((StakedSui::try_from(&o)?, true)),
80 ObjectRead::Deleted(oref) => {
81 match self
82 .state
83 .find_object_lt_or_eq_version(&oref.0, &oref.1.one_before().unwrap())
84 .await?
85 {
86 Some(o) => stakes.push((StakedSui::try_from(&o)?, false)),
87 None => Err(SuiRpcInputError::UserInputError(
88 UserInputError::ObjectNotFound {
89 object_id: oref.0,
90 version: None,
91 },
92 ))?,
93 }
94 }
95 ObjectRead::NotExists(id) => Err(SuiRpcInputError::UserInputError(
96 UserInputError::ObjectNotFound {
97 object_id: id,
98 version: None,
99 },
100 ))?,
101 }
102 }
103
104 self.get_delegated_stakes(stakes).await
105 }
106
107 async fn get_stakes(&self, owner: SuiAddress) -> Result<Vec<DelegatedStake>, Error> {
108 let timer = self.metrics.get_stake_sui_latency.start_timer();
109 let stakes = self.get_staked_sui(owner).await?;
110 if stakes.is_empty() {
111 return Ok(vec![]);
112 }
113 drop(timer);
114
115 let _timer = self.metrics.get_delegated_sui_latency.start_timer();
116
117 self.get_delegated_stakes(stakes.into_iter().map(|s| (s, true)).collect())
118 .await
119 }
120
121 async fn get_delegated_stakes(
122 &self,
123 stakes: Vec<(StakedSui, bool)>,
124 ) -> Result<Vec<DelegatedStake>, Error> {
125 let pools = stakes.into_iter().fold(
126 BTreeMap::<_, Vec<_>>::new(),
127 |mut pools, (stake, exists)| {
128 pools
129 .entry(stake.pool_id())
130 .or_default()
131 .push((stake, exists));
132 pools
133 },
134 );
135
136 let system_state = self.get_system_state()?;
137 let system_state_summary: SuiSystemStateSummary =
138 system_state.clone().into_sui_system_state_summary();
139
140 let rates = exchange_rates(&self.state, system_state_summary.epoch)
141 .await?
142 .into_iter()
143 .map(|rates| (rates.pool_id, rates))
144 .collect::<BTreeMap<_, _>>();
145
146 let mut delegated_stakes = vec![];
147 for (pool_id, stakes) in pools {
148 let rate_table = rates.get(&pool_id).ok_or_else(|| {
150 SuiRpcInputError::GenericNotFound(
151 "Cannot find rates for staking pool {pool_id}".to_string(),
152 )
153 })?;
154 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
155
156 let mut delegations = vec![];
157 for (stake, exists) in stakes {
158 let status = if !exists {
159 StakeStatus::Unstaked
160 } else if system_state_summary.epoch >= stake.activation_epoch() {
161 let estimated_reward = if let Some(current_rate) = current_rate {
163 let stake_rate = rate_table
164 .rates
165 .iter()
166 .find_map(|(epoch, rate)| {
167 if *epoch == stake.activation_epoch() {
168 Some(rate.clone())
169 } else {
170 None
171 }
172 })
173 .unwrap_or_default();
174 let estimated_reward = ((stake_rate.rate() / current_rate.rate()) - 1.0)
175 * stake.principal() as f64;
176 max(0, estimated_reward.round() as u64)
177 } else {
178 0
179 };
180 StakeStatus::Active { estimated_reward }
181 } else {
182 StakeStatus::Pending
183 };
184 delegations.push(Stake {
185 staked_sui_id: stake.id(),
186 stake_request_epoch: stake.activation_epoch() - 1,
188 stake_active_epoch: stake.activation_epoch(),
189 principal: stake.principal(),
190 status,
191 })
192 }
193 delegated_stakes.push(DelegatedStake {
194 validator_address: rate_table.address,
195 staking_pool: pool_id,
196 stakes: delegations,
197 })
198 }
199 Ok(delegated_stakes)
200 }
201
202 fn get_system_state(&self) -> Result<SuiSystemState, Error> {
203 Ok(self.state.get_system_state()?)
204 }
205}
206
207#[async_trait]
208impl GovernanceReadApiServer for GovernanceReadApi {
209 #[instrument(skip(self))]
210 async fn get_stakes_by_ids(
211 &self,
212 staked_sui_ids: Vec<ObjectID>,
213 ) -> RpcResult<Vec<DelegatedStake>> {
214 with_tracing!(async move { self.get_stakes_by_ids(staked_sui_ids).await })
215 }
216
217 #[instrument(skip(self))]
218 async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
219 with_tracing!(async move { self.get_stakes(owner).await })
220 }
221
222 #[instrument(skip(self))]
223 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<SuiCommittee> {
224 with_tracing!(async move {
225 self.state
226 .get_or_latest_committee(epoch)
227 .map(|committee| committee.into())
228 .map_err(Error::from)
229 })
230 }
231
232 #[instrument(skip(self))]
233 async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
234 with_tracing!(async move {
235 Ok(self
236 .state
237 .get_system_state()
238 .map_err(Error::from)?
239 .into_sui_system_state_summary())
240 })
241 }
242
243 #[instrument(skip(self))]
244 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
245 with_tracing!(async move {
246 let epoch_store = self.state.load_epoch_store_one_call_per_task();
247 Ok(epoch_store.reference_gas_price().into())
248 })
249 }
250
251 #[instrument(skip(self))]
252 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
253 info!("get_validator_apy");
254 let system_state_summary: SuiSystemStateSummary =
255 self.get_latest_sui_system_state().await?;
256
257 let exchange_rate_table = exchange_rates(&self.state, system_state_summary.epoch).await?;
258
259 let apys = calculate_apys(
260 system_state_summary.stake_subsidy_start_epoch,
261 exchange_rate_table,
262 );
263
264 Ok(ValidatorApys {
265 apys,
266 epoch: system_state_summary.epoch,
267 })
268 }
269}
270
271pub fn calculate_apys(
272 stake_subsidy_start_epoch: u64,
273 exchange_rate_table: Vec<ValidatorExchangeRates>,
274) -> Vec<ValidatorApy> {
275 let mut apys = vec![];
276
277 for rates in exchange_rate_table.into_iter().filter(|r| r.active) {
278 let exchange_rates = rates.rates.into_iter().filter_map(|(epoch, rate)| {
280 if epoch >= stake_subsidy_start_epoch {
281 Some(rate)
282 } else {
283 None
284 }
285 });
286
287 let average_apy = if exchange_rates.clone().count() >= 2 {
289 let er_e = exchange_rates.clone().dropping(1);
291 let er_e_1 = exchange_rates.dropping_back(1);
293 let apys = er_e
294 .zip(er_e_1)
295 .map(calculate_apy)
296 .filter(|apy| *apy > 0.0 && *apy < 0.1)
297 .take(30)
298 .collect::<Vec<_>>();
299
300 let apy_counts = apys.len() as f64;
301 apys.iter().sum::<f64>() / apy_counts
302 } else {
303 0.0
304 };
305 apys.push(ValidatorApy {
306 address: rates.address,
307 apy: average_apy,
308 });
309 }
310 apys
311}
312
313#[test]
314fn test_apys_calculation_filter_outliers() {
315 let file =
317 std::fs::File::open("src/unit_tests/data/validator_exchange_rate/rates.json").unwrap();
318 let rates: BTreeMap<String, Vec<(u64, PoolTokenExchangeRate)>> =
319 serde_json::from_reader(file).unwrap();
320
321 let mut address_map = BTreeMap::new();
322
323 let exchange_rates = rates
324 .into_iter()
325 .map(|(validator, rates)| {
326 let address = SuiAddress::random_for_testing_only();
327 address_map.insert(address, validator);
328 ValidatorExchangeRates {
329 address,
330 pool_id: ObjectID::random(),
331 active: true,
332 rates,
333 }
334 })
335 .collect();
336
337 let apys = calculate_apys(20, exchange_rates);
338
339 for apy in apys {
340 println!("{}: {}", address_map[&apy.address], apy.apy);
341 assert!(apy.apy < 0.07)
342 }
343}
344
345fn calculate_apy((rate_e, rate_e_1): (PoolTokenExchangeRate, PoolTokenExchangeRate)) -> f64 {
347 (rate_e.rate() / rate_e_1.rate()).powf(365.0) - 1.0
348}
349
350#[cached(
353 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
354 create = "{ SizedCache::with_size(1) }",
355 convert = "{ _current_epoch }",
356 result = true
357)]
358async fn exchange_rates(
359 state: &Arc<dyn StateRead>,
360 _current_epoch: EpochId,
361) -> RpcInterimResult<Vec<ValidatorExchangeRates>> {
362 let system_state = state.get_system_state()?;
363 let system_state_summary: SuiSystemStateSummary = system_state.into_sui_system_state_summary();
364
365 let mut tables = vec![];
367
368 for validator in system_state_summary.active_validators {
369 tables.push((
370 validator.sui_address,
371 validator.staking_pool_id,
372 validator.exchange_rates_id,
373 validator.exchange_rates_size,
374 true,
375 ));
376 }
377
378 for df in state.get_dynamic_fields(
380 system_state_summary.inactive_pools_id,
381 None,
382 system_state_summary.inactive_pools_size as usize,
383 )? {
384 let pool_id: ID = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
385 SuiErrorKind::ObjectDeserializationError {
386 error: e.to_string(),
387 }
388 })?;
389 let validator = get_validator_from_table(
390 state.get_object_store().as_ref(),
391 system_state_summary.inactive_pools_id,
392 &pool_id,
393 )?; tables.push((
395 validator.sui_address,
396 validator.staking_pool_id,
397 validator.exchange_rates_id,
398 validator.exchange_rates_size,
399 false,
400 ));
401 }
402
403 let mut exchange_rates = vec![];
404 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
406 let mut rates = state
407 .get_dynamic_fields(exchange_rates_id, None, exchange_rates_size as usize)?
408 .into_iter()
409 .map(|df| {
410 let epoch: EpochId = bcs::from_bytes(&df.1.bcs_name).map_err(|e| {
411 SuiErrorKind::ObjectDeserializationError {
412 error: e.to_string(),
413 }
414 })?;
415
416 let exchange_rate: PoolTokenExchangeRate = get_dynamic_field_from_store(
417 &state.get_object_store().as_ref(),
418 exchange_rates_id,
419 &epoch,
420 )?;
421
422 Ok::<_, SuiError>((epoch, exchange_rate))
423 })
424 .collect::<Result<Vec<_>, _>>()?;
425
426 rates = backfill_rates(rates);
428
429 exchange_rates.push(ValidatorExchangeRates {
430 address,
431 pool_id,
432 active,
433 rates,
434 });
435 }
436 Ok(exchange_rates)
437}
438
439#[derive(Clone, Debug)]
440pub struct ValidatorExchangeRates {
441 pub address: SuiAddress,
442 pub pool_id: ObjectID,
443 pub active: bool,
444 pub rates: Vec<(EpochId, PoolTokenExchangeRate)>,
445}
446
447fn backfill_rates(
451 rates: Vec<(EpochId, PoolTokenExchangeRate)>,
452) -> Vec<(EpochId, PoolTokenExchangeRate)> {
453 if rates.is_empty() {
454 return rates;
455 }
456
457 let min_epoch = *rates.iter().map(|(e, _)| e).min().unwrap();
458 let max_epoch = *rates.iter().map(|(e, _)| e).max().unwrap();
459 let mut filled_rates = Vec::new();
460 let mut prev_rate = None;
461
462 for epoch in min_epoch..=max_epoch {
463 match rates.iter().find(|(e, _)| *e == epoch) {
464 Some((e, rate)) => {
465 prev_rate = Some(rate.clone());
466 filled_rates.push((*e, rate.clone()));
467 }
468 None => {
469 if let Some(rate) = prev_rate.clone() {
470 filled_rates.push((epoch, rate));
471 }
472 }
473 }
474 }
475 filled_rates.reverse();
476 filled_rates
477}
478
479impl SuiRpcModule for GovernanceReadApi {
480 fn rpc(self) -> RpcModule<Self> {
481 self.into_rpc()
482 }
483
484 fn rpc_doc_module() -> Module {
485 GovernanceReadApiOpenRpc::module_doc()
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use sui_types::sui_system_state::PoolTokenExchangeRate;
493
494 #[test]
495 fn test_backfill_rates_empty() {
496 let rates = vec![];
497 assert_eq!(backfill_rates(rates), vec![]);
498 }
499
500 #[test]
501 fn test_backfill_rates_no_gaps() {
502 let rate1 = PoolTokenExchangeRate::new(100, 100);
503 let rate2 = PoolTokenExchangeRate::new(200, 220);
504 let rate3 = PoolTokenExchangeRate::new(300, 330);
505 let rates = vec![(2, rate2.clone()), (3, rate3.clone()), (1, rate1.clone())];
506
507 let expected: Vec<(u64, PoolTokenExchangeRate)> =
508 vec![(3, rate3.clone()), (2, rate2), (1, rate1)];
509 assert_eq!(backfill_rates(rates), expected);
510 }
511
512 #[test]
513 fn test_backfill_rates_with_gaps() {
514 let rate1 = PoolTokenExchangeRate::new(100, 100);
515 let rate3 = PoolTokenExchangeRate::new(300, 330);
516 let rate5 = PoolTokenExchangeRate::new(500, 550);
517 let rates = vec![(3, rate3.clone()), (1, rate1.clone()), (5, rate5.clone())];
518
519 let expected = vec![
520 (5, rate5.clone()),
521 (4, rate3.clone()),
522 (3, rate3.clone()),
523 (2, rate1.clone()),
524 (1, rate1),
525 ];
526 assert_eq!(backfill_rates(rates), expected);
527 }
528}