1use std::collections::{BTreeMap, BTreeSet, HashMap};
5
6use crate::connection::ScanConnection;
7use crate::consistency::Checkpointed;
8use crate::context_data::db_data_provider::{convert_to_validators, PgManager};
9use crate::data::{self, DataLoader, Db, DbConnection, QueryExecutor};
10use crate::error::Error;
11use crate::server::watermark_task::Watermark;
12
13use super::big_int::BigInt;
14use super::checkpoint::{self, Checkpoint};
15use super::cursor::{self, Page, Paginated, ScanLimited, Target};
16use super::date_time::DateTime;
17use super::protocol_config::ProtocolConfigs;
18use super::system_state_summary::SystemStateSummary;
19use super::transaction_block::{self, TransactionBlock, TransactionBlockFilter};
20use super::uint53::UInt53;
21use super::validator_set::ValidatorSet;
22use async_graphql::connection::Connection;
23use async_graphql::dataloader::Loader;
24use async_graphql::*;
25use connection::{CursorType, Edge};
26use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
27use diesel_async::scoped_futures::ScopedFutureExt;
28use fastcrypto::encoding::{Base58, Encoding};
29use serde::{Deserialize, Serialize};
30use sui_indexer::models::epoch::QueryableEpochInfo;
31use sui_indexer::schema::epochs;
32use sui_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
33
34#[derive(Clone)]
35pub(crate) struct Epoch {
36 pub stored: QueryableEpochInfo,
37 pub checkpoint_viewed_at: u64,
38}
39
40#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
43struct EpochKey {
44 pub epoch_id: u64,
45 pub checkpoint_viewed_at: u64,
46}
47
48pub(crate) type Cursor = cursor::JsonCursor<EpochCursor>;
49type Query<ST, GB> = data::Query<ST, epochs::table, GB>;
50
51#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
55pub(crate) struct EpochCursor {
56 #[serde(rename = "c")]
58 pub checkpoint_viewed_at: u64,
59 #[serde(rename = "e")]
60 pub epoch_id: u64,
61}
62
63#[Object]
71impl Epoch {
72 async fn epoch_id(&self) -> UInt53 {
74 UInt53::from(self.stored.epoch as u64)
75 }
76
77 async fn reference_gas_price(&self) -> Option<BigInt> {
79 Some(BigInt::from(self.stored.reference_gas_price as u64))
80 }
81
82 async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
84 let system_state = ctx
85 .data_unchecked::<PgManager>()
86 .fetch_sui_system_state(Some(self.stored.epoch as u64))
87 .await?;
88
89 let active_validators = convert_to_validators(
90 system_state.clone(),
91 self.checkpoint_viewed_at,
92 self.stored.epoch as u64,
93 );
94 let validator_set = ValidatorSet {
95 total_stake: Some(BigInt::from(self.stored.total_stake)),
96 active_validators: Some(active_validators),
97 pending_removals: Some(system_state.pending_removals),
98 pending_active_validators_id: Some(system_state.pending_active_validators_id.into()),
99 pending_active_validators_size: Some(system_state.pending_active_validators_size),
100 staking_pool_mappings_id: Some(system_state.staking_pool_mappings_id.into()),
101 staking_pool_mappings_size: Some(system_state.staking_pool_mappings_size),
102 inactive_pools_id: Some(system_state.inactive_pools_id.into()),
103 inactive_pools_size: Some(system_state.inactive_pools_size),
104 validator_candidates_id: Some(system_state.validator_candidates_id.into()),
105 validator_candidates_size: Some(system_state.validator_candidates_size),
106 checkpoint_viewed_at: self.checkpoint_viewed_at,
107 };
108 Ok(Some(validator_set))
109 }
110
111 async fn start_timestamp(&self) -> Result<DateTime, Error> {
113 DateTime::from_ms(self.stored.epoch_start_timestamp)
114 }
115
116 async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
118 self.stored
119 .epoch_end_timestamp
120 .map(DateTime::from_ms)
121 .transpose()
122 }
123
124 async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
126 let last = match self.stored.last_checkpoint_id {
127 Some(last) => last as u64,
128 None => {
129 let Watermark { hi_cp, .. } = *ctx.data_unchecked();
130 hi_cp
131 }
132 };
133
134 Ok(Some(UInt53::from(
135 last - self.stored.first_checkpoint_id as u64,
136 )))
137 }
138
139 async fn total_transactions(&self) -> Result<Option<UInt53>> {
141 Ok(self
143 .stored
144 .epoch_total_transactions
145 .map(|v| UInt53::from(v as u64)))
146 }
147
148 async fn total_gas_fees(&self) -> Option<BigInt> {
150 self.stored.total_gas_fees.map(BigInt::from)
151 }
152
153 async fn total_stake_rewards(&self) -> Option<BigInt> {
155 self.stored
156 .total_stake_rewards_distributed
157 .map(BigInt::from)
158 }
159
160 async fn total_stake_subsidies(&self) -> Option<BigInt> {
162 self.stored.stake_subsidy_amount.map(BigInt::from)
163 }
164
165 async fn fund_size(&self) -> Option<BigInt> {
169 Some(BigInt::from(self.stored.storage_fund_balance))
170 }
171
172 async fn net_inflow(&self) -> Option<BigInt> {
175 if let (Some(fund_inflow), Some(fund_outflow)) =
176 (self.stored.storage_charge, self.stored.storage_rebate)
177 {
178 Some(BigInt::from(fund_inflow - fund_outflow))
179 } else {
180 None
181 }
182 }
183
184 async fn fund_inflow(&self) -> Option<BigInt> {
186 self.stored.storage_charge.map(BigInt::from)
187 }
188
189 async fn fund_outflow(&self) -> Option<BigInt> {
192 self.stored.storage_rebate.map(BigInt::from)
193 }
194
195 async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
198 ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
199 .await
200 .extend()
201 }
202
203 #[graphql(flatten)]
204 async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
205 let state = ctx
206 .data_unchecked::<PgManager>()
207 .fetch_sui_system_state(Some(self.stored.epoch as u64))
208 .await?;
209 Ok(SystemStateSummary { native: state })
210 }
211
212 async fn live_object_set_digest(&self) -> Result<Option<String>> {
215 let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
216 return Ok(None);
217 };
218 let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
219 Error::Internal(format!("Error deserializing commitments: {e}")).extend()
220 })?;
221
222 for commitment in commitments {
223 if let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment {
224 return Ok(Some(Base58::encode(digest.digest.into_inner())));
225 }
226 }
227 Ok(None)
228 }
229
230 async fn checkpoints(
232 &self,
233 ctx: &Context<'_>,
234 first: Option<u64>,
235 after: Option<checkpoint::Cursor>,
236 last: Option<u64>,
237 before: Option<checkpoint::Cursor>,
238 ) -> Result<Connection<String, Checkpoint>> {
239 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
240 let epoch = self.stored.epoch as u64;
241 Checkpoint::paginate(
242 ctx.data_unchecked(),
243 page,
244 Some(epoch),
245 self.checkpoint_viewed_at,
246 )
247 .await
248 .extend()
249 }
250
251 async fn transaction_blocks(
270 &self,
271 ctx: &Context<'_>,
272 first: Option<u64>,
273 after: Option<transaction_block::Cursor>,
274 last: Option<u64>,
275 before: Option<transaction_block::Cursor>,
276 filter: Option<TransactionBlockFilter>,
277 scan_limit: Option<u64>,
278 ) -> Result<ScanConnection<String, TransactionBlock>> {
279 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
280
281 #[allow(clippy::unnecessary_lazy_evaluations)] let Some(filter) = filter
283 .unwrap_or_default()
284 .intersect(TransactionBlockFilter {
285 after_checkpoint: (self.stored.first_checkpoint_id > 0)
287 .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
288 before_checkpoint: self
289 .stored
290 .last_checkpoint_id
291 .map(|id| UInt53::from(id as u64 + 1)),
292 ..Default::default()
293 })
294 else {
295 return Ok(ScanConnection::new(false, false));
296 };
297
298 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
299 .await
300 .extend()
301 }
302}
303
304impl Epoch {
305 pub(crate) fn protocol_version(&self) -> u64 {
307 self.stored.protocol_version as u64
308 }
309
310 pub(crate) async fn query(
313 ctx: &Context<'_>,
314 filter: Option<u64>,
315 checkpoint_viewed_at: u64,
316 ) -> Result<Option<Self>, Error> {
317 if let Some(epoch_id) = filter {
318 let DataLoader(dl) = ctx.data_unchecked();
319 dl.load_one(EpochKey {
320 epoch_id,
321 checkpoint_viewed_at,
322 })
323 .await
324 } else {
325 Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
326 }
327 }
328
329 pub(crate) async fn query_latest_at(
333 db: &Db,
334 checkpoint_viewed_at: u64,
335 ) -> Result<Option<Self>, Error> {
336 use epochs::dsl;
337
338 let stored: Option<QueryableEpochInfo> = db
339 .execute(move |conn| {
340 async move {
341 conn.first(move || {
342 dsl::epochs
346 .select(QueryableEpochInfo::as_select())
347 .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
348 .order_by(dsl::first_checkpoint_id.desc())
349 })
350 .await
351 .optional()
352 }
353 .scope_boxed()
354 })
355 .await
356 .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
357
358 Ok(stored.map(|stored| Epoch {
359 stored,
360 checkpoint_viewed_at,
361 }))
362 }
363
364 pub(crate) async fn paginate(
365 db: &Db,
366 page: Page<Cursor>,
367 checkpoint_viewed_at: u64,
368 ) -> Result<Connection<String, Epoch>, Error> {
369 use epochs::dsl;
370 let cursor_viewed_at = page.validate_cursor_consistency()?;
371 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
372
373 let (prev, next, results) = db
374 .execute(move |conn| {
375 async move {
376 page.paginate_query::<QueryableEpochInfo, _, _, _>(
377 conn,
378 checkpoint_viewed_at,
379 move || {
380 dsl::epochs
381 .select(QueryableEpochInfo::as_select())
382 .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
383 .into_boxed()
384 },
385 )
386 .await
387 }
388 .scope_boxed()
389 })
390 .await?;
391
392 let mut conn = Connection::new(prev, next);
394 for stored in results {
395 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
396 conn.edges.push(Edge::new(
397 cursor,
398 Epoch {
399 stored,
400 checkpoint_viewed_at,
401 },
402 ));
403 }
404
405 Ok(conn)
406 }
407}
408
409impl Paginated<Cursor> for QueryableEpochInfo {
410 type Source = epochs::table;
411
412 fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
413 query.filter(epochs::dsl::epoch.ge(cursor.epoch_id as i64))
414 }
415
416 fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
417 query.filter(epochs::dsl::epoch.le(cursor.epoch_id as i64))
418 }
419
420 fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
421 use epochs::dsl;
422 if asc {
423 query.order(dsl::epoch)
424 } else {
425 query.order(dsl::epoch.desc())
426 }
427 }
428}
429
430impl Target<Cursor> for QueryableEpochInfo {
431 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
432 Cursor::new(EpochCursor {
433 checkpoint_viewed_at,
434 epoch_id: self.epoch as u64,
435 })
436 }
437}
438
439impl Checkpointed for Cursor {
440 fn checkpoint_viewed_at(&self) -> u64 {
441 self.checkpoint_viewed_at
442 }
443}
444
445impl ScanLimited for Cursor {}
446
447#[async_trait::async_trait]
448impl Loader<EpochKey> for Db {
449 type Value = Epoch;
450 type Error = Error;
451
452 async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
453 use epochs::dsl;
454
455 let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
456 let epochs: Vec<QueryableEpochInfo> = self
457 .execute_repeatable(move |conn| {
458 async move {
459 conn.results(move || {
460 dsl::epochs
461 .select(QueryableEpochInfo::as_select())
462 .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
463 })
464 .await
465 }
466 .scope_boxed()
467 })
468 .await
469 .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
470
471 let epoch_id_to_stored: BTreeMap<_, _> = epochs
472 .into_iter()
473 .map(|stored| (stored.epoch as u64, stored))
474 .collect();
475
476 Ok(keys
477 .iter()
478 .filter_map(|key| {
479 let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
480 let epoch = Epoch {
481 stored,
482 checkpoint_viewed_at: key.checkpoint_viewed_at,
483 };
484
485 let start = epoch.stored.first_checkpoint_id as u64;
490 (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
491 })
492 .collect())
493 }
494}