sui_indexer_alt_jsonrpc/api/
governance.rs1use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::convert::Infallible;
7use std::num::NonZeroUsize;
8use std::sync::Mutex;
9
10use anyhow::Context as _;
11use diesel::ExpressionMethods;
12use diesel::QueryDsl;
13use futures::future;
14use jsonrpsee::core::RpcResult;
15use jsonrpsee::proc_macros::rpc;
16use lru::LruCache;
17use move_core_types::language_storage::StructTag;
18use sui_indexer_alt_reader::consistent_reader::proto::owner::OwnerKind;
19use sui_indexer_alt_reader::governance::RewardsKey;
20use sui_indexer_alt_reader::governance::ValidatorAddressKey;
21use sui_indexer_alt_schema::epochs::StoredEpochStart;
22use sui_indexer_alt_schema::schema::kv_epoch_starts;
23use sui_json_rpc_types::DelegatedStake;
24use sui_json_rpc_types::Stake;
25use sui_json_rpc_types::StakeStatus;
26use sui_json_rpc_types::ValidatorApy;
27use sui_json_rpc_types::ValidatorApys;
28use sui_open_rpc::Module;
29use sui_open_rpc_macros::open_rpc;
30use sui_types::SUI_SYSTEM_ADDRESS;
31use sui_types::SUI_SYSTEM_STATE_OBJECT_ID;
32use sui_types::TypeTag;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SuiAddress;
35use sui_types::dynamic_field::Field;
36use sui_types::dynamic_field::derive_dynamic_field_id;
37use sui_types::governance::STAKED_SUI_STRUCT_NAME;
38use sui_types::governance::STAKING_POOL_MODULE_NAME;
39use sui_types::governance::StakedSui;
40use sui_types::sui_serde::BigInt;
41use sui_types::sui_system_state::PoolTokenExchangeRate;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::sui_system_state::SuiSystemStateWrapper;
45use sui_types::sui_system_state::sui_system_state_inner_v1::SuiSystemStateInnerV1;
46use sui_types::sui_system_state::sui_system_state_inner_v2::SuiSystemStateInnerV2;
47use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
48use tokio::try_join;
49
50use crate::api::rpc_module::RpcModule;
51use crate::context::Context;
52use crate::data::latest_epoch;
53use crate::data::load_live;
54use crate::data::load_live_deserialized;
55use crate::error::RpcError;
56use crate::error::rpc_bail;
57
58const APY_EPOCH_WINDOW: i64 = 31;
60
61#[open_rpc(namespace = "suix", tag = "Governance API")]
62#[rpc(server, namespace = "suix")]
63trait GovernanceApi {
64 #[method(name = "getReferenceGasPrice")]
66 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>>;
67
68 #[method(name = "getLatestSuiSystemState")]
70 async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary>;
71
72 #[method(name = "getStakesByIds")]
74 async fn get_stakes_by_ids(
75 &self,
76 staked_sui_ids: Vec<ObjectID>,
77 ) -> RpcResult<Vec<DelegatedStake>>;
78
79 #[method(name = "getStakes")]
81 async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>>;
82
83 #[method(name = "getValidatorsApy")]
85 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys>;
86}
87
88pub(crate) struct Governance {
89 ctx: Context,
90 apy_cache: Mutex<LruCache<u64, ValidatorApys>>,
93}
94
95impl Governance {
96 pub fn new(ctx: Context) -> Self {
97 Self {
98 ctx,
99 apy_cache: Mutex::new(LruCache::new(NonZeroUsize::new(1).unwrap())),
100 }
101 }
102}
103
104#[async_trait::async_trait]
105impl GovernanceApiServer for Governance {
106 async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
107 Ok(rgp_response(&self.ctx).await?)
108 }
109
110 async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
111 Ok(latest_sui_system_state_response(&self.ctx).await?)
112 }
113
114 async fn get_stakes_by_ids(
115 &self,
116 staked_sui_ids: Vec<ObjectID>,
117 ) -> RpcResult<Vec<DelegatedStake>> {
118 Ok(delegated_stakes_response(&self.ctx, staked_sui_ids).await?)
119 }
120
121 async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
122 let ctx = &self.ctx;
123 let config = &ctx.config().objects;
124
125 let tag = StructTag {
126 address: SUI_SYSTEM_ADDRESS,
127 module: STAKING_POOL_MODULE_NAME.to_owned(),
128 name: STAKED_SUI_STRUCT_NAME.to_owned(),
129 type_params: vec![],
130 };
131
132 let mut all_stake_ids: Vec<ObjectID> = Vec::new();
133 let mut after_cursor = None;
134
135 loop {
136 let page = ctx
137 .consistent_reader()
138 .list_owned_objects(
139 None,
140 OwnerKind::Address,
141 Some(owner.to_string()),
142 Some(tag.to_canonical_string(true)),
143 Some(config.max_page_size as u32),
144 after_cursor,
145 None,
146 true,
147 )
148 .await
149 .context("Failed to fetch owned StakedSui objects")
150 .map_err(RpcError::<Infallible>::from)?;
151
152 all_stake_ids.extend(page.results.iter().map(|edge| edge.value.0));
153
154 if page.has_next_page {
155 after_cursor = page.results.last().map(|edge| edge.token.clone());
156 } else {
157 break;
158 }
159 }
160
161 Ok(delegated_stakes_response(ctx, all_stake_ids).await?)
162 }
163
164 async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
165 let ctx = &self.ctx;
166 let epoch = latest_epoch(ctx)
167 .await
168 .context("Failed to fetch latest epoch for APY cache lookup")
169 .map_err(RpcError::<Infallible>::from)?;
170
171 if let Some(hit) = self.apy_cache.lock().unwrap().get(&epoch).cloned() {
172 return Ok(hit);
173 }
174
175 let apys = validators_apy_response(ctx).await?;
176 self.apy_cache.lock().unwrap().put(epoch, apys.clone());
177 Ok(apys)
178 }
179}
180
181impl RpcModule for Governance {
182 fn schema(&self) -> Module {
183 GovernanceApiOpenRpc::module_doc()
184 }
185
186 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
187 self.into_rpc()
188 }
189}
190
191async fn rgp_response(ctx: &Context) -> Result<BigInt<u64>, RpcError> {
193 use kv_epoch_starts::dsl as e;
194
195 let mut conn = ctx
196 .pg_reader()
197 .connect()
198 .await
199 .context("Failed to connect to the database")?;
200
201 let rgp: i64 = conn
202 .first(
203 e::kv_epoch_starts
204 .select(e::reference_gas_price)
205 .order(e::epoch.desc()),
206 )
207 .await
208 .context("Failed to fetch the reference gas price")?;
209
210 Ok((rgp as u64).into())
211}
212
213async fn latest_sui_system_state_response(
215 ctx: &Context,
216) -> Result<SuiSystemStateSummary, RpcError> {
217 let wrapper: SuiSystemStateWrapper = load_live_deserialized(ctx, SUI_SYSTEM_STATE_OBJECT_ID)
218 .await
219 .context("Failed to fetch system state wrapper object")?;
220
221 let inner_id = derive_dynamic_field_id(
222 SUI_SYSTEM_STATE_OBJECT_ID,
223 &TypeTag::U64,
224 &bcs::to_bytes(&wrapper.version).context("Failed to serialize system state version")?,
225 )
226 .context("Failed to derive inner system state field ID")?;
227
228 Ok(match wrapper.version {
229 1 => load_live_deserialized::<Field<u64, SuiSystemStateInnerV1>>(ctx, inner_id)
230 .await
231 .context("Failed to fetch inner system state object")?
232 .value
233 .into_sui_system_state_summary(),
234 2 => load_live_deserialized::<Field<u64, SuiSystemStateInnerV2>>(ctx, inner_id)
235 .await
236 .context("Failed to fetch inner system state object")?
237 .value
238 .into_sui_system_state_summary(),
239 v => rpc_bail!("Unexpected inner system state version: {v}"),
240 })
241}
242
243async fn delegated_stakes_response(
249 ctx: &Context,
250 stake_ids: Vec<ObjectID>,
251) -> Result<Vec<DelegatedStake>, RpcError> {
252 let execution_loader = ctx.execution_loader()?;
253
254 let staked_sui_futures = stake_ids.iter().map(|id| load_live(ctx, *id));
255 let maybe_objects = future::try_join_all(staked_sui_futures)
256 .await
257 .context("Failed to load StakedSui objects")?;
258
259 let staked_suis: Vec<StakedSui> = maybe_objects
260 .into_iter()
261 .flatten()
262 .map(|object| {
263 let move_object = object.data.try_as_move().context("Not a Move object")?;
264 bcs::from_bytes(move_object.contents()).context("Failed to deserialize StakedSui")
265 })
266 .collect::<anyhow::Result<Vec<_>>>()?;
267
268 let reward_keys: Vec<RewardsKey> = staked_suis
269 .iter()
270 .map(|s| RewardsKey(s.id().into()))
271 .collect();
272 let validator_keys: Vec<ValidatorAddressKey> = staked_suis
273 .iter()
274 .map(|s| ValidatorAddressKey(s.pool_id().into()))
275 .collect();
276
277 let (rewards, validator_addresses, current_epoch) = try_join!(
278 async {
279 execution_loader
280 .load_many(reward_keys)
281 .await
282 .context("Failed to dry run rewards calculation")
283 },
284 async {
285 execution_loader
286 .load_many(validator_keys)
287 .await
288 .context("Failed to dry run validator address lookup")
289 },
290 latest_epoch(ctx),
291 )?;
292
293 let mut grouped: BTreeMap<(SuiAddress, ObjectID), Vec<Stake>> = BTreeMap::new();
294
295 for staked_sui in &staked_suis {
299 let reward_key = RewardsKey(staked_sui.id().into());
300 let validator_key = ValidatorAddressKey(staked_sui.pool_id().into());
301
302 let estimated_reward = *rewards
303 .get(&reward_key)
304 .with_context(|| format!("Missing reward for StakedSui {}", staked_sui.id()))?;
305 let validator_address = validator_addresses
306 .get(&validator_key)
307 .map(|addr| SuiAddress::from(ObjectID::from(*addr)))
308 .with_context(|| {
309 format!(
310 "Missing validator address for staking pool {}",
311 staked_sui.pool_id()
312 )
313 })?;
314
315 let status = if current_epoch >= staked_sui.activation_epoch() {
316 StakeStatus::Active { estimated_reward }
317 } else {
318 StakeStatus::Pending
319 };
320
321 grouped
322 .entry((validator_address, staked_sui.pool_id()))
323 .or_default()
324 .push(Stake {
325 staked_sui_id: staked_sui.id(),
326 stake_request_epoch: staked_sui.request_epoch(),
327 stake_active_epoch: staked_sui.activation_epoch(),
328 principal: staked_sui.principal(),
329 status,
330 });
331 }
332
333 Ok(grouped
334 .into_iter()
335 .map(
336 |((validator_address, staking_pool), stakes)| DelegatedStake {
337 validator_address,
338 staking_pool,
339 stakes,
340 },
341 )
342 .collect())
343}
344
345async fn validators_apy_response(ctx: &Context) -> Result<ValidatorApys, RpcError> {
356 use kv_epoch_starts::dsl as e;
357
358 let mut conn = ctx
359 .pg_reader()
360 .connect()
361 .await
362 .context("Failed to connect to the database")?;
363
364 let rows: Vec<StoredEpochStart> = conn
365 .results(
366 e::kv_epoch_starts
367 .order(e::epoch.desc())
368 .limit(APY_EPOCH_WINDOW),
369 )
370 .await
371 .context("Failed to fetch epoch starts for APY calculation")?;
372
373 let latest = rows
374 .first()
375 .context("No epoch start rows available for APY calculation")?;
376
377 let latest_summary = decode_summary(&latest.system_state)?;
378 let current_epoch = latest_summary.epoch;
379 let stake_subsidy_start_epoch = latest_summary.stake_subsidy_start_epoch;
380
381 let mut by_pool: HashMap<ObjectID, Vec<PoolTokenExchangeRate>> = HashMap::new();
383 for row in &rows {
384 if (row.epoch as u64) < stake_subsidy_start_epoch {
385 continue;
386 }
387 let summary = decode_summary(&row.system_state)?;
388 for v in summary.active_validators {
389 by_pool
390 .entry(v.staking_pool_id)
391 .or_default()
392 .push(PoolTokenExchangeRate::new(
393 v.staking_pool_sui_balance,
394 v.pool_token_balance,
395 ));
396 }
397 }
398
399 let apys = latest_summary
400 .active_validators
401 .into_iter()
402 .map(|v| ValidatorApy {
403 address: v.sui_address,
404 apy: by_pool
405 .get(&v.staking_pool_id)
406 .map_or(0.0, |rates| compute_apy(rates)),
407 })
408 .collect();
409
410 Ok(ValidatorApys {
411 apys,
412 epoch: current_epoch,
413 })
414}
415
416fn decode_summary(bytes: &[u8]) -> Result<SuiSystemStateSummary, RpcError> {
417 Ok(bcs::from_bytes::<SuiSystemState>(bytes)
418 .context("Failed to deserialize SuiSystemState from kv_epoch_starts")?
419 .into_sui_system_state_summary())
420}
421
422fn compute_apy(rates: &[PoolTokenExchangeRate]) -> f64 {
428 let samples: Vec<f64> = rates
429 .windows(2)
430 .map(|w| (w[1].rate() / w[0].rate()).powf(365.0) - 1.0)
431 .filter(|apy| *apy > 0.0 && *apy < 0.1)
432 .take(30)
433 .collect();
434
435 if samples.is_empty() {
436 0.0
437 } else {
438 samples.iter().sum::<f64>() / samples.len() as f64
439 }
440}