sui_graphql_rpc/context_data/
db_data_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    error::Error,
6    types::{address::Address, sui_address::SuiAddress, validator::Validator},
7};
8use std::{collections::BTreeMap, time::Duration};
9use sui_indexer::db::ConnectionPoolConfig;
10use sui_indexer::{apis::GovernanceReadApi, indexer_reader::IndexerReader};
11use sui_json_rpc_types::Stake as RpcStakedSui;
12use sui_types::{
13    governance::StakedSui as NativeStakedSui,
14    sui_system_state::sui_system_state_summary::SuiSystemStateSummary as NativeSuiSystemStateSummary,
15};
16
17pub(crate) struct PgManager {
18    pub inner: IndexerReader,
19}
20
21impl PgManager {
22    pub(crate) fn new(inner: IndexerReader) -> Self {
23        Self { inner }
24    }
25
26    /// Create a new underlying reader, which is used by this type as well as other data providers.
27    pub(crate) async fn reader_with_config(
28        db_url: impl Into<String>,
29        pool_size: u32,
30        timeout_ms: u64,
31    ) -> Result<IndexerReader, Error> {
32        let mut config = ConnectionPoolConfig::default();
33        config.set_pool_size(pool_size);
34        config.set_statement_timeout(Duration::from_millis(timeout_ms));
35        IndexerReader::new_with_config(db_url, config)
36            .await
37            .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
38    }
39}
40
41/// Implement methods to be used by graphql resolvers
42impl PgManager {
43    /// If no epoch was requested or if the epoch requested is in progress,
44    /// returns the latest sui system state.
45    pub(crate) async fn fetch_sui_system_state(
46        &self,
47        epoch_id: Option<u64>,
48    ) -> Result<NativeSuiSystemStateSummary, Error> {
49        let latest_sui_system_state = self.inner.get_latest_sui_system_state().await?;
50
51        if let Some(epoch_id) = epoch_id {
52            if epoch_id == latest_sui_system_state.epoch {
53                Ok(latest_sui_system_state)
54            } else {
55                Ok(self
56                    .inner
57                    .get_epoch_sui_system_state(Some(epoch_id))
58                    .await?)
59            }
60        } else {
61            Ok(latest_sui_system_state)
62        }
63    }
64
65    /// Make a request to the RPC for its representations of the staked sui we parsed out of the
66    /// object.  Used to implement fields that are implemented in JSON-RPC but not GraphQL (yet).
67    pub(crate) async fn fetch_rpc_staked_sui(
68        &self,
69        stake: NativeStakedSui,
70    ) -> Result<RpcStakedSui, Error> {
71        let governance_api = GovernanceReadApi::new(self.inner.clone());
72
73        let mut delegated_stakes = governance_api
74            .get_delegated_stakes(vec![stake])
75            .await
76            .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
77
78        let Some(mut delegated_stake) = delegated_stakes.pop() else {
79            return Err(Error::Internal(
80                "Error fetching delegated stake. No pools returned.".to_string(),
81            ));
82        };
83
84        let Some(stake) = delegated_stake.stakes.pop() else {
85            return Err(Error::Internal(
86                "Error fetching delegated stake. No stake in pool.".to_string(),
87            ));
88        };
89
90        Ok(stake)
91    }
92}
93
94/// `checkpoint_viewed_at` represents the checkpoint sequence number at which the set of
95/// `SuiValidatorSummary` was queried for. Each `Validator` will inherit this checkpoint, so that
96/// when viewing the `Validator`'s state, it will be as if it was read at the same checkpoint.
97pub(crate) fn convert_to_validators(
98    system_state_at_requested_epoch: NativeSuiSystemStateSummary,
99    checkpoint_viewed_at: u64,
100    requested_for_epoch: u64,
101) -> Vec<Validator> {
102    let at_risk = BTreeMap::from_iter(system_state_at_requested_epoch.at_risk_validators);
103    let reports = BTreeMap::from_iter(system_state_at_requested_epoch.validator_report_records);
104
105    system_state_at_requested_epoch
106        .active_validators
107        .into_iter()
108        .map(move |validator_summary| {
109            let at_risk = at_risk.get(&validator_summary.sui_address).copied();
110            let report_records = reports.get(&validator_summary.sui_address).map(|addrs| {
111                addrs
112                    .iter()
113                    .cloned()
114                    .map(|a| Address {
115                        address: SuiAddress::from(a),
116                        checkpoint_viewed_at,
117                    })
118                    .collect()
119            });
120
121            Validator {
122                validator_summary,
123                at_risk,
124                report_records,
125                checkpoint_viewed_at,
126                requested_for_epoch,
127            }
128        })
129        .collect()
130}