sui_indexer_alt_jsonrpc/api/
governance.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::convert::Infallible;
7use std::num::NonZeroUsize;
8use std::sync::Mutex;
9
10use anyhow::Context as _;
11use diesel::ExpressionMethods;
12use diesel::QueryDsl;
13use futures::future;
14use jsonrpsee::core::RpcResult;
15use jsonrpsee::proc_macros::rpc;
16use lru::LruCache;
17use move_core_types::language_storage::StructTag;
18use sui_indexer_alt_reader::consistent_reader::proto::owner::OwnerKind;
19use sui_indexer_alt_reader::governance::RewardsKey;
20use sui_indexer_alt_reader::governance::ValidatorAddressKey;
21use sui_indexer_alt_schema::epochs::StoredEpochStart;
22use sui_indexer_alt_schema::schema::kv_epoch_starts;
23use sui_json_rpc_types::DelegatedStake;
24use sui_json_rpc_types::Stake;
25use sui_json_rpc_types::StakeStatus;
26use sui_json_rpc_types::ValidatorApy;
27use sui_json_rpc_types::ValidatorApys;
28use sui_open_rpc::Module;
29use sui_open_rpc_macros::open_rpc;
30use sui_types::SUI_SYSTEM_ADDRESS;
31use sui_types::SUI_SYSTEM_STATE_OBJECT_ID;
32use sui_types::TypeTag;
33use sui_types::base_types::ObjectID;
34use sui_types::base_types::SuiAddress;
35use sui_types::dynamic_field::Field;
36use sui_types::dynamic_field::derive_dynamic_field_id;
37use sui_types::governance::STAKED_SUI_STRUCT_NAME;
38use sui_types::governance::STAKING_POOL_MODULE_NAME;
39use sui_types::governance::StakedSui;
40use sui_types::sui_serde::BigInt;
41use sui_types::sui_system_state::PoolTokenExchangeRate;
42use sui_types::sui_system_state::SuiSystemState;
43use sui_types::sui_system_state::SuiSystemStateTrait;
44use sui_types::sui_system_state::SuiSystemStateWrapper;
45use sui_types::sui_system_state::sui_system_state_inner_v1::SuiSystemStateInnerV1;
46use sui_types::sui_system_state::sui_system_state_inner_v2::SuiSystemStateInnerV2;
47use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary;
48use tokio::try_join;
49
50use crate::api::rpc_module::RpcModule;
51use crate::context::Context;
52use crate::data::latest_epoch;
53use crate::data::load_live;
54use crate::data::load_live_deserialized;
55use crate::error::RpcError;
56use crate::error::rpc_bail;
57
58/// Number of most recent epochs to load from `kv_epoch_starts` when computing validator APYs.
59const APY_EPOCH_WINDOW: i64 = 31;
60
61#[open_rpc(namespace = "suix", tag = "Governance API")]
62#[rpc(server, namespace = "suix")]
63trait GovernanceApi {
64    /// Return the reference gas price for the network as of the latest epoch.
65    #[method(name = "getReferenceGasPrice")]
66    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>>;
67
68    /// Return a summary of the latest version of the Sui System State object (0x5), on-chain.
69    #[method(name = "getLatestSuiSystemState")]
70    async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary>;
71
72    /// Return one or more [DelegatedStake]. If a Stake was withdrawn its status will be Unstaked.
73    #[method(name = "getStakesByIds")]
74    async fn get_stakes_by_ids(
75        &self,
76        staked_sui_ids: Vec<ObjectID>,
77    ) -> RpcResult<Vec<DelegatedStake>>;
78
79    /// Return all [DelegatedStake].
80    #[method(name = "getStakes")]
81    async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>>;
82
83    /// Return the validator APY
84    #[method(name = "getValidatorsApy")]
85    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys>;
86}
87
88pub(crate) struct Governance {
89    ctx: Context,
90    /// Caches the most recent `getValidatorsApy` response, keyed by epoch. APY inputs are fixed for
91    /// the duration of an epoch, so a capacity-1 cache evicts automatically on epoch advance.
92    apy_cache: Mutex<LruCache<u64, ValidatorApys>>,
93}
94
95impl Governance {
96    pub fn new(ctx: Context) -> Self {
97        Self {
98            ctx,
99            apy_cache: Mutex::new(LruCache::new(NonZeroUsize::new(1).unwrap())),
100        }
101    }
102}
103
104#[async_trait::async_trait]
105impl GovernanceApiServer for Governance {
106    async fn get_reference_gas_price(&self) -> RpcResult<BigInt<u64>> {
107        Ok(rgp_response(&self.ctx).await?)
108    }
109
110    async fn get_latest_sui_system_state(&self) -> RpcResult<SuiSystemStateSummary> {
111        Ok(latest_sui_system_state_response(&self.ctx).await?)
112    }
113
114    async fn get_stakes_by_ids(
115        &self,
116        staked_sui_ids: Vec<ObjectID>,
117    ) -> RpcResult<Vec<DelegatedStake>> {
118        Ok(delegated_stakes_response(&self.ctx, staked_sui_ids).await?)
119    }
120
121    async fn get_stakes(&self, owner: SuiAddress) -> RpcResult<Vec<DelegatedStake>> {
122        let ctx = &self.ctx;
123        let config = &ctx.config().objects;
124
125        let tag = StructTag {
126            address: SUI_SYSTEM_ADDRESS,
127            module: STAKING_POOL_MODULE_NAME.to_owned(),
128            name: STAKED_SUI_STRUCT_NAME.to_owned(),
129            type_params: vec![],
130        };
131
132        let mut all_stake_ids: Vec<ObjectID> = Vec::new();
133        let mut after_cursor = None;
134
135        loop {
136            let page = ctx
137                .consistent_reader()
138                .list_owned_objects(
139                    None,
140                    OwnerKind::Address,
141                    Some(owner.to_string()),
142                    Some(tag.to_canonical_string(true)),
143                    Some(config.max_page_size as u32),
144                    after_cursor,
145                    None,
146                    true,
147                )
148                .await
149                .context("Failed to fetch owned StakedSui objects")
150                .map_err(RpcError::<Infallible>::from)?;
151
152            all_stake_ids.extend(page.results.iter().map(|edge| edge.value.0));
153
154            if page.has_next_page {
155                after_cursor = page.results.last().map(|edge| edge.token.clone());
156            } else {
157                break;
158            }
159        }
160
161        Ok(delegated_stakes_response(ctx, all_stake_ids).await?)
162    }
163
164    async fn get_validators_apy(&self) -> RpcResult<ValidatorApys> {
165        let ctx = &self.ctx;
166        let epoch = latest_epoch(ctx)
167            .await
168            .context("Failed to fetch latest epoch for APY cache lookup")
169            .map_err(RpcError::<Infallible>::from)?;
170
171        if let Some(hit) = self.apy_cache.lock().unwrap().get(&epoch).cloned() {
172            return Ok(hit);
173        }
174
175        let apys = validators_apy_response(ctx).await?;
176        self.apy_cache.lock().unwrap().put(epoch, apys.clone());
177        Ok(apys)
178    }
179}
180
181impl RpcModule for Governance {
182    fn schema(&self) -> Module {
183        GovernanceApiOpenRpc::module_doc()
184    }
185
186    fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
187        self.into_rpc()
188    }
189}
190
191/// Load data and generate response for `getReferenceGasPrice`.
192async fn rgp_response(ctx: &Context) -> Result<BigInt<u64>, RpcError> {
193    use kv_epoch_starts::dsl as e;
194
195    let mut conn = ctx
196        .pg_reader()
197        .connect()
198        .await
199        .context("Failed to connect to the database")?;
200
201    let rgp: i64 = conn
202        .first(
203            e::kv_epoch_starts
204                .select(e::reference_gas_price)
205                .order(e::epoch.desc()),
206        )
207        .await
208        .context("Failed to fetch the reference gas price")?;
209
210    Ok((rgp as u64).into())
211}
212
213/// Load data and generate response for `getLatestSuiSystemState`.
214async fn latest_sui_system_state_response(
215    ctx: &Context,
216) -> Result<SuiSystemStateSummary, RpcError> {
217    let wrapper: SuiSystemStateWrapper = load_live_deserialized(ctx, SUI_SYSTEM_STATE_OBJECT_ID)
218        .await
219        .context("Failed to fetch system state wrapper object")?;
220
221    let inner_id = derive_dynamic_field_id(
222        SUI_SYSTEM_STATE_OBJECT_ID,
223        &TypeTag::U64,
224        &bcs::to_bytes(&wrapper.version).context("Failed to serialize system state version")?,
225    )
226    .context("Failed to derive inner system state field ID")?;
227
228    Ok(match wrapper.version {
229        1 => load_live_deserialized::<Field<u64, SuiSystemStateInnerV1>>(ctx, inner_id)
230            .await
231            .context("Failed to fetch inner system state object")?
232            .value
233            .into_sui_system_state_summary(),
234        2 => load_live_deserialized::<Field<u64, SuiSystemStateInnerV2>>(ctx, inner_id)
235            .await
236            .context("Failed to fetch inner system state object")?
237            .value
238            .into_sui_system_state_summary(),
239        v => rpc_bail!("Unexpected inner system state version: {v}"),
240    })
241}
242
243/// Given a list of StakedSui object IDs, load them, fetch rewards and validator addresses, and
244/// return grouped DelegatedStake entries.
245///
246/// Returns only live staked objects. Stakes that have been withdrawn (or wrapped, deleted,
247/// never existed) will be omitted from the response.
248async fn delegated_stakes_response(
249    ctx: &Context,
250    stake_ids: Vec<ObjectID>,
251) -> Result<Vec<DelegatedStake>, RpcError> {
252    let execution_loader = ctx.execution_loader()?;
253
254    let staked_sui_futures = stake_ids.iter().map(|id| load_live(ctx, *id));
255    let maybe_objects = future::try_join_all(staked_sui_futures)
256        .await
257        .context("Failed to load StakedSui objects")?;
258
259    let staked_suis: Vec<StakedSui> = maybe_objects
260        .into_iter()
261        .flatten()
262        .map(|object| {
263            let move_object = object.data.try_as_move().context("Not a Move object")?;
264            bcs::from_bytes(move_object.contents()).context("Failed to deserialize StakedSui")
265        })
266        .collect::<anyhow::Result<Vec<_>>>()?;
267
268    let reward_keys: Vec<RewardsKey> = staked_suis
269        .iter()
270        .map(|s| RewardsKey(s.id().into()))
271        .collect();
272    let validator_keys: Vec<ValidatorAddressKey> = staked_suis
273        .iter()
274        .map(|s| ValidatorAddressKey(s.pool_id().into()))
275        .collect();
276
277    let (rewards, validator_addresses, current_epoch) = try_join!(
278        async {
279            execution_loader
280                .load_many(reward_keys)
281                .await
282                .context("Failed to dry run rewards calculation")
283        },
284        async {
285            execution_loader
286                .load_many(validator_keys)
287                .await
288                .context("Failed to dry run validator address lookup")
289        },
290        latest_epoch(ctx),
291    )?;
292
293    let mut grouped: BTreeMap<(SuiAddress, ObjectID), Vec<Stake>> = BTreeMap::new();
294
295    // Clients can at most control which stake ids to query. Only live stakes are loaded. Valid
296    // stakes should return a reward (could be 0 for pending stakes) and validator (pools are looked
297    // up against active and inactive validators.)
298    for staked_sui in &staked_suis {
299        let reward_key = RewardsKey(staked_sui.id().into());
300        let validator_key = ValidatorAddressKey(staked_sui.pool_id().into());
301
302        let estimated_reward = *rewards
303            .get(&reward_key)
304            .with_context(|| format!("Missing reward for StakedSui {}", staked_sui.id()))?;
305        let validator_address = validator_addresses
306            .get(&validator_key)
307            .map(|addr| SuiAddress::from(ObjectID::from(*addr)))
308            .with_context(|| {
309                format!(
310                    "Missing validator address for staking pool {}",
311                    staked_sui.pool_id()
312                )
313            })?;
314
315        let status = if current_epoch >= staked_sui.activation_epoch() {
316            StakeStatus::Active { estimated_reward }
317        } else {
318            StakeStatus::Pending
319        };
320
321        grouped
322            .entry((validator_address, staked_sui.pool_id()))
323            .or_default()
324            .push(Stake {
325                staked_sui_id: staked_sui.id(),
326                stake_request_epoch: staked_sui.request_epoch(),
327                stake_active_epoch: staked_sui.activation_epoch(),
328                principal: staked_sui.principal(),
329                status,
330            });
331    }
332
333    Ok(grouped
334        .into_iter()
335        .map(
336            |((validator_address, staking_pool), stakes)| DelegatedStake {
337                validator_address,
338                staking_pool,
339                stakes,
340            },
341        )
342        .collect())
343}
344
345/// Load data and generate response for `getValidatorsApy`.
346///
347/// Rates are derived from `staking_pool_sui_balance` and `pool_token_balance` of each active
348/// validator in the latest `APY_EPOCH_WINDOW` rows of `kv_epoch_starts`. These are the same numbers
349/// that `advance_epoch` writes into each staking pool's `exchange_rates` table. APYs are calculated
350/// from adjacent pairs of rates, and then filtered and averaged to produce each validator's APY.
351/// This mirrors the legacy fullnode jsonrpc's `backfill_rates` implementation.
352///
353/// In safe mode, a pool's sui and token balance carry over unchanged, which produces 0% APY and
354/// also gets filtered out.
355async fn validators_apy_response(ctx: &Context) -> Result<ValidatorApys, RpcError> {
356    use kv_epoch_starts::dsl as e;
357
358    let mut conn = ctx
359        .pg_reader()
360        .connect()
361        .await
362        .context("Failed to connect to the database")?;
363
364    let rows: Vec<StoredEpochStart> = conn
365        .results(
366            e::kv_epoch_starts
367                .order(e::epoch.desc())
368                .limit(APY_EPOCH_WINDOW),
369        )
370        .await
371        .context("Failed to fetch epoch starts for APY calculation")?;
372
373    let latest = rows
374        .first()
375        .context("No epoch start rows available for APY calculation")?;
376
377    let latest_summary = decode_summary(&latest.system_state)?;
378    let current_epoch = latest_summary.epoch;
379    let stake_subsidy_start_epoch = latest_summary.stake_subsidy_start_epoch;
380
381    // Map pool to exchange rates history (newest to oldest)
382    let mut by_pool: HashMap<ObjectID, Vec<PoolTokenExchangeRate>> = HashMap::new();
383    for row in &rows {
384        if (row.epoch as u64) < stake_subsidy_start_epoch {
385            continue;
386        }
387        let summary = decode_summary(&row.system_state)?;
388        for v in summary.active_validators {
389            by_pool
390                .entry(v.staking_pool_id)
391                .or_default()
392                .push(PoolTokenExchangeRate::new(
393                    v.staking_pool_sui_balance,
394                    v.pool_token_balance,
395                ));
396        }
397    }
398
399    let apys = latest_summary
400        .active_validators
401        .into_iter()
402        .map(|v| ValidatorApy {
403            address: v.sui_address,
404            apy: by_pool
405                .get(&v.staking_pool_id)
406                .map_or(0.0, |rates| compute_apy(rates)),
407        })
408        .collect();
409
410    Ok(ValidatorApys {
411        apys,
412        epoch: current_epoch,
413    })
414}
415
416fn decode_summary(bytes: &[u8]) -> Result<SuiSystemStateSummary, RpcError> {
417    Ok(bcs::from_bytes::<SuiSystemState>(bytes)
418        .context("Failed to deserialize SuiSystemState from kv_epoch_starts")?
419        .into_sui_system_state_summary())
420}
421
422/// Compute the average APY from a descending-epoch list of exchange rates.
423///
424/// Iterates adjacent pairs (newer, older), converts each pair into an annualized return
425/// `(older / newer) ^ 365 - 1`, discards outliers outside `(0.0, 0.1)`, and averages up to 30 of
426/// the remaining samples. This mirrors the legacy `calculate_apys` logic.
427fn compute_apy(rates: &[PoolTokenExchangeRate]) -> f64 {
428    let samples: Vec<f64> = rates
429        .windows(2)
430        .map(|w| (w[1].rate() / w[0].rate()).powf(365.0) - 1.0)
431        .filter(|apy| *apy > 0.0 && *apy < 0.1)
432        .take(30)
433        .collect();
434
435    if samples.is_empty() {
436        0.0
437    } else {
438        samples.iter().sum::<f64>() / samples.len() as f64
439    }
440}