sui_graphql_rpc/context_data/
db_data_provider.rs1use crate::{
5 error::Error,
6 types::{address::Address, sui_address::SuiAddress, validator::Validator},
7};
8use std::{collections::BTreeMap, time::Duration};
9use sui_indexer::db::ConnectionPoolConfig;
10use sui_indexer::{apis::GovernanceReadApi, indexer_reader::IndexerReader};
11use sui_json_rpc_types::Stake as RpcStakedSui;
12use sui_types::{
13 governance::StakedSui as NativeStakedSui,
14 sui_system_state::sui_system_state_summary::SuiSystemStateSummary as NativeSuiSystemStateSummary,
15};
16
17pub(crate) struct PgManager {
18 pub inner: IndexerReader,
19}
20
21impl PgManager {
22 pub(crate) fn new(inner: IndexerReader) -> Self {
23 Self { inner }
24 }
25
26 pub(crate) async fn reader_with_config(
28 db_url: impl Into<String>,
29 pool_size: u32,
30 timeout_ms: u64,
31 ) -> Result<IndexerReader, Error> {
32 let mut config = ConnectionPoolConfig::default();
33 config.set_pool_size(pool_size);
34 config.set_statement_timeout(Duration::from_millis(timeout_ms));
35 IndexerReader::new_with_config(db_url, config)
36 .await
37 .map_err(|e| Error::Internal(format!("Failed to create reader: {e}")))
38 }
39}
40
41impl PgManager {
43 pub(crate) async fn fetch_sui_system_state(
46 &self,
47 epoch_id: Option<u64>,
48 ) -> Result<NativeSuiSystemStateSummary, Error> {
49 let latest_sui_system_state = self.inner.get_latest_sui_system_state().await?;
50
51 if let Some(epoch_id) = epoch_id {
52 if epoch_id == latest_sui_system_state.epoch {
53 Ok(latest_sui_system_state)
54 } else {
55 Ok(self
56 .inner
57 .get_epoch_sui_system_state(Some(epoch_id))
58 .await?)
59 }
60 } else {
61 Ok(latest_sui_system_state)
62 }
63 }
64
65 pub(crate) async fn fetch_rpc_staked_sui(
68 &self,
69 stake: NativeStakedSui,
70 ) -> Result<RpcStakedSui, Error> {
71 let governance_api = GovernanceReadApi::new(self.inner.clone());
72
73 let mut delegated_stakes = governance_api
74 .get_delegated_stakes(vec![stake])
75 .await
76 .map_err(|e| Error::Internal(format!("Error fetching delegated stake. {e}")))?;
77
78 let Some(mut delegated_stake) = delegated_stakes.pop() else {
79 return Err(Error::Internal(
80 "Error fetching delegated stake. No pools returned.".to_string(),
81 ));
82 };
83
84 let Some(stake) = delegated_stake.stakes.pop() else {
85 return Err(Error::Internal(
86 "Error fetching delegated stake. No stake in pool.".to_string(),
87 ));
88 };
89
90 Ok(stake)
91 }
92}
93
94pub(crate) fn convert_to_validators(
98 system_state_at_requested_epoch: NativeSuiSystemStateSummary,
99 checkpoint_viewed_at: u64,
100 requested_for_epoch: u64,
101) -> Vec<Validator> {
102 let at_risk = BTreeMap::from_iter(system_state_at_requested_epoch.at_risk_validators);
103 let reports = BTreeMap::from_iter(system_state_at_requested_epoch.validator_report_records);
104
105 system_state_at_requested_epoch
106 .active_validators
107 .into_iter()
108 .map(move |validator_summary| {
109 let at_risk = at_risk.get(&validator_summary.sui_address).copied();
110 let report_records = reports.get(&validator_summary.sui_address).map(|addrs| {
111 addrs
112 .iter()
113 .cloned()
114 .map(|a| Address {
115 address: SuiAddress::from(a),
116 checkpoint_viewed_at,
117 })
118 .collect()
119 });
120
121 Validator {
122 validator_summary,
123 at_risk,
124 report_records,
125 checkpoint_viewed_at,
126 requested_for_epoch,
127 }
128 })
129 .collect()
130}