1use std::collections::BTreeMap;
5
6use crate::{errors::IndexerError, indexer_reader::IndexerReader};
7use async_trait::async_trait;
8use jsonrpsee::{RpcModule, core::RpcResult};
9
10use cached::{SizedCache, proc_macro::cached};
11use sui_json_rpc::{SuiRpcModule, governance_api::ValidatorExchangeRates};
12use sui_json_rpc_api::GovernanceReadApiServer;
13use sui_json_rpc_types::{
14 DelegatedStake, EpochInfo, StakeStatus, SuiCommittee, SuiObjectDataFilter, ValidatorApys,
15};
16use sui_open_rpc::Module;
17use sui_types::{
18 base_types::{MoveObjectType, ObjectID, SuiAddress},
19 committee::EpochId,
20 governance::StakedSui,
21 sui_serde::BigInt,
22 sui_system_state::{PoolTokenExchangeRate, sui_system_state_summary::SuiSystemStateSummary},
23};
24
25#[derive(Clone)]
26pub struct GovernanceReadApi {
27 inner: IndexerReader,
28}
29
30impl GovernanceReadApi {
31 pub fn new(inner: IndexerReader) -> Self {
32 Self { inner }
33 }
34
35 pub async fn get_epoch_info(&self, epoch: Option<EpochId>) -> Result<EpochInfo, IndexerError> {
36 match self.inner.get_epoch_info(epoch).await {
37 Ok(Some(epoch_info)) => Ok(epoch_info),
38 Ok(None) => Err(IndexerError::InvalidArgumentError(format!(
39 "Missing epoch {epoch:?}"
40 ))),
41 Err(e) => Err(e),
42 }
43 }
44
45 async fn get_latest_sui_system_state(&self) -> Result<SuiSystemStateSummary, IndexerError> {
46 self.inner.get_latest_sui_system_state().await
47 }
48
49 async fn get_stakes_by_ids(
50 &self,
51 ids: Vec<ObjectID>,
52 ) -> Result<Vec<DelegatedStake>, IndexerError> {
53 let mut stakes = vec![];
54 for stored_object in self.inner.multi_get_objects(ids).await? {
55 let object = sui_types::object::Object::try_from(stored_object)?;
56 let stake_object = StakedSui::try_from(&object)?;
57 stakes.push(stake_object);
58 }
59
60 self.get_delegated_stakes(stakes).await
61 }
62
63 async fn get_staked_by_owner(
64 &self,
65 owner: SuiAddress,
66 ) -> Result<Vec<DelegatedStake>, IndexerError> {
67 let mut stakes = vec![];
68 for stored_object in self
69 .inner
70 .get_owned_objects(
71 owner,
72 Some(SuiObjectDataFilter::StructType(
73 MoveObjectType::staked_sui().into(),
74 )),
75 None,
76 1000,
78 )
79 .await?
80 {
81 let object = sui_types::object::Object::try_from(stored_object)?;
82 let stake_object = StakedSui::try_from(&object)?;
83 stakes.push(stake_object);
84 }
85
86 self.get_delegated_stakes(stakes).await
87 }
88
89 pub async fn get_delegated_stakes(
90 &self,
91 stakes: Vec<StakedSui>,
92 ) -> Result<Vec<DelegatedStake>, IndexerError> {
93 let pools = stakes
94 .into_iter()
95 .fold(BTreeMap::<_, Vec<_>>::new(), |mut pools, stake| {
96 pools.entry(stake.pool_id()).or_default().push(stake);
97 pools
98 });
99
100 let system_state_summary = self.get_latest_sui_system_state().await?;
101 let epoch = system_state_summary.epoch;
102
103 let rates = exchange_rates(self, &system_state_summary)
104 .await?
105 .into_iter()
106 .map(|rates| (rates.pool_id, rates))
107 .collect::<BTreeMap<_, _>>();
108
109 let mut delegated_stakes = vec![];
110 for (pool_id, stakes) in pools {
111 let rate_table = rates.get(&pool_id).ok_or_else(|| {
113 IndexerError::InvalidArgumentError(
114 "Cannot find rates for staking pool {pool_id}".to_string(),
115 )
116 })?;
117 let current_rate = rate_table.rates.first().map(|(_, rate)| rate);
118
119 let mut delegations = vec![];
120 for stake in stakes {
121 let status = if epoch >= stake.activation_epoch() {
122 let estimated_reward = if let Some(current_rate) = current_rate {
123 let stake_rate = rate_table
124 .rates
125 .iter()
126 .find_map(|(epoch, rate)| {
127 if *epoch == stake.activation_epoch() {
128 Some(rate.clone())
129 } else {
130 None
131 }
132 })
133 .unwrap_or_default();
134 let estimated_reward = ((stake_rate.rate() / current_rate.rate()) - 1.0)
135 * stake.principal() as f64;
136 std::cmp::max(0, estimated_reward.round() as u64)
137 } else {
138 0
139 };
140 StakeStatus::Active { estimated_reward }
141 } else {
142 StakeStatus::Pending
143 };
144 delegations.push(sui_json_rpc_types::Stake {
145 staked_sui_id: stake.id(),
146 stake_request_epoch: stake.activation_epoch().saturating_sub(1),
148 stake_active_epoch: stake.activation_epoch(),
149 principal: stake.principal(),
150 status,
151 })
152 }
153 delegated_stakes.push(DelegatedStake {
154 validator_address: rate_table.address,
155 staking_pool: pool_id,
156 stakes: delegations,
157 })
158 }
159 Ok(delegated_stakes)
160 }
161}
162
163#[cached(
166 type = "SizedCache<EpochId, Vec<ValidatorExchangeRates>>",
167 create = "{ SizedCache::with_size(1) }",
168 convert = " { system_state_summary.epoch } ",
169 result = true
170)]
171pub async fn exchange_rates(
172 state: &GovernanceReadApi,
173 system_state_summary: &SuiSystemStateSummary,
174) -> Result<Vec<ValidatorExchangeRates>, IndexerError> {
175 let mut tables = vec![];
177
178 for validator in &system_state_summary.active_validators {
179 tables.push((
180 validator.sui_address,
181 validator.staking_pool_id,
182 validator.exchange_rates_id,
183 validator.exchange_rates_size,
184 true,
185 ));
186 }
187
188 for df in state
190 .inner
191 .get_dynamic_fields(
192 system_state_summary.inactive_pools_id,
193 None,
194 system_state_summary.inactive_pools_size as usize,
195 )
196 .await?
197 {
198 let pool_id: sui_types::id::ID = bcs::from_bytes(&df.bcs_name).map_err(|e| {
199 sui_types::error::SuiErrorKind::ObjectDeserializationError {
200 error: e.to_string(),
201 }
202 })?;
203 let inactive_pools_id = system_state_summary.inactive_pools_id;
204 let validator = state
205 .inner
206 .get_validator_from_table(inactive_pools_id, pool_id)
207 .await?;
208 tables.push((
209 validator.sui_address,
210 validator.staking_pool_id,
211 validator.exchange_rates_id,
212 validator.exchange_rates_size,
213 false,
214 ));
215 }
216
217 let mut exchange_rates = vec![];
218 for (address, pool_id, exchange_rates_id, exchange_rates_size, active) in tables {
220 let mut rates = vec![];
221 for df in state
222 .inner
223 .get_dynamic_fields_raw(exchange_rates_id, None, exchange_rates_size as usize)
224 .await?
225 {
226 let dynamic_field = df
227 .to_dynamic_field::<EpochId, PoolTokenExchangeRate>()
228 .ok_or_else(
229 || sui_types::error::SuiErrorKind::ObjectDeserializationError {
230 error: "dynamic field malformed".to_owned(),
231 },
232 )?;
233
234 rates.push((dynamic_field.name, dynamic_field.value));
235 }
236
237 rates.sort_by(|(a, _), (b, _)| a.cmp(b).reverse());
238
239 exchange_rates.push(ValidatorExchangeRates {
240 address,
241 pool_id,
242 active,
243 rates,
244 });
245 }
246 Ok(exchange_rates)
247}
248
249#[async_trait]
250impl GovernanceReadApiServer for GovernanceReadApi {
251 async fn get_stakes_by_ids(
252 &self,
253 staked_sui_ids: Vec<ObjectID>,
254 ) -> RpcResult<Vec<DelegatedStake>> {
255 self.get_stakes_by_ids(staked_sui_ids)
256 .await
257 .map_err(Into::into)
258 }
259
260 async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
261 self.get_staked_by_owner(owner).await.map_err(Into::into)
262 }
263
264 async fn get_committee_info(&self, epoch: Option<BigInt<u64>>) -> RpcResult<SuiCommittee> {
265 let epoch = self.get_epoch_info(epoch.as_deref().copied()).await?;
266 Ok(epoch.committee().map_err(IndexerError::from)?.into())
267 }
268
269 async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
270 self.get_latest_sui_system_state().await.map_err(Into::into)
271 }
272
273 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
274 let epoch = self.get_epoch_info(None).await?;
275 Ok(BigInt::from(epoch.reference_gas_price.ok_or_else(
276 || {
277 IndexerError::PersistentStorageDataCorruptionError(
278 "missing latest reference gas price".to_owned(),
279 )
280 },
281 )?))
282 }
283
284 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
285 Ok(self.get_validators_apy().await?)
286 }
287}
288
289impl SuiRpcModule for GovernanceReadApi {
290 fn rpc(self) -> RpcModule<Self> {
291 self.into_rpc()
292 }
293
294 fn rpc_doc_module() -> Module {
295 sui_json_rpc_api::GovernanceReadApiOpenRpc::module_doc()
296 }
297}