sui_graphql_rpc/types/
epoch.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5
6use crate::connection::ScanConnection;
7use crate::consistency::Checkpointed;
8use crate::context_data::db_data_provider::{convert_to_validators, PgManager};
9use crate::data::{self, DataLoader, Db, DbConnection, QueryExecutor};
10use crate::error::Error;
11use crate::server::watermark_task::Watermark;
12
13use super::big_int::BigInt;
14use super::checkpoint::{self, Checkpoint};
15use super::cursor::{self, Page, Paginated, ScanLimited, Target};
16use super::date_time::DateTime;
17use super::protocol_config::ProtocolConfigs;
18use super::system_state_summary::SystemStateSummary;
19use super::transaction_block::{self, TransactionBlock, TransactionBlockFilter};
20use super::uint53::UInt53;
21use super::validator_set::ValidatorSet;
22use async_graphql::connection::Connection;
23use async_graphql::dataloader::Loader;
24use async_graphql::*;
25use connection::{CursorType, Edge};
26use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, SelectableHelper};
27use diesel_async::scoped_futures::ScopedFutureExt;
28use fastcrypto::encoding::{Base58, Encoding};
29use serde::{Deserialize, Serialize};
30use sui_indexer::models::epoch::QueryableEpochInfo;
31use sui_indexer::schema::epochs;
32use sui_types::messages_checkpoint::CheckpointCommitment as EpochCommitment;
33
34#[derive(Clone)]
35pub(crate) struct Epoch {
36    pub stored: QueryableEpochInfo,
37    pub checkpoint_viewed_at: u64,
38}
39
40/// `DataLoader` key for fetching an `Epoch` by its ID, optionally constrained by a consistency
41/// cursor.
42#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
43struct EpochKey {
44    pub epoch_id: u64,
45    pub checkpoint_viewed_at: u64,
46}
47
48pub(crate) type Cursor = cursor::JsonCursor<EpochCursor>;
49type Query<ST, GB> = data::Query<ST, epochs::table, GB>;
50
51/// The cursor returned for each `Epoch` in a connection's page of results. The
52/// `checkpoint_viewed_at` will set the consistent upper bound for subsequent queries made on this
53/// cursor.
54#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
55pub(crate) struct EpochCursor {
56    /// The checkpoint sequence number this was viewed at.
57    #[serde(rename = "c")]
58    pub checkpoint_viewed_at: u64,
59    #[serde(rename = "e")]
60    pub epoch_id: u64,
61}
62
63/// Operation of the Sui network is temporally partitioned into non-overlapping epochs,
64/// and the network aims to keep epochs roughly the same duration as each other.
65/// During a particular epoch the following data is fixed:
66///
67/// - the protocol version
68/// - the reference gas price
69/// - the set of participating validators
70#[Object]
71impl Epoch {
72    /// The epoch's id as a sequence number that starts at 0 and is incremented by one at every epoch change.
73    async fn epoch_id(&self) -> UInt53 {
74        UInt53::from(self.stored.epoch as u64)
75    }
76
77    /// The minimum gas price that a quorum of validators are guaranteed to sign a transaction for.
78    async fn reference_gas_price(&self) -> Option<BigInt> {
79        Some(BigInt::from(self.stored.reference_gas_price as u64))
80    }
81
82    /// Validator related properties, including the active validators.
83    async fn validator_set(&self, ctx: &Context<'_>) -> Result<Option<ValidatorSet>> {
84        let system_state = ctx
85            .data_unchecked::<PgManager>()
86            .fetch_sui_system_state(Some(self.stored.epoch as u64))
87            .await?;
88
89        let active_validators = convert_to_validators(
90            system_state.clone(),
91            self.checkpoint_viewed_at,
92            self.stored.epoch as u64,
93        );
94        let validator_set = ValidatorSet {
95            total_stake: Some(BigInt::from(self.stored.total_stake)),
96            active_validators: Some(active_validators),
97            pending_removals: Some(system_state.pending_removals),
98            pending_active_validators_id: Some(system_state.pending_active_validators_id.into()),
99            pending_active_validators_size: Some(system_state.pending_active_validators_size),
100            staking_pool_mappings_id: Some(system_state.staking_pool_mappings_id.into()),
101            staking_pool_mappings_size: Some(system_state.staking_pool_mappings_size),
102            inactive_pools_id: Some(system_state.inactive_pools_id.into()),
103            inactive_pools_size: Some(system_state.inactive_pools_size),
104            validator_candidates_id: Some(system_state.validator_candidates_id.into()),
105            validator_candidates_size: Some(system_state.validator_candidates_size),
106            checkpoint_viewed_at: self.checkpoint_viewed_at,
107        };
108        Ok(Some(validator_set))
109    }
110
111    /// The epoch's starting timestamp.
112    async fn start_timestamp(&self) -> Result<DateTime, Error> {
113        DateTime::from_ms(self.stored.epoch_start_timestamp)
114    }
115
116    /// The epoch's ending timestamp.
117    async fn end_timestamp(&self) -> Result<Option<DateTime>, Error> {
118        self.stored
119            .epoch_end_timestamp
120            .map(DateTime::from_ms)
121            .transpose()
122    }
123
124    /// The total number of checkpoints in this epoch.
125    async fn total_checkpoints(&self, ctx: &Context<'_>) -> Result<Option<UInt53>> {
126        let last = match self.stored.last_checkpoint_id {
127            Some(last) => last as u64,
128            None => {
129                let Watermark { hi_cp, .. } = *ctx.data_unchecked();
130                hi_cp
131            }
132        };
133
134        Ok(Some(UInt53::from(
135            last - self.stored.first_checkpoint_id as u64,
136        )))
137    }
138
139    /// The total number of transaction blocks in this epoch.
140    async fn total_transactions(&self) -> Result<Option<UInt53>> {
141        // TODO: this currently returns None for the current epoch. Fix this.
142        Ok(self
143            .stored
144            .epoch_total_transactions
145            .map(|v| UInt53::from(v as u64)))
146    }
147
148    /// The total amount of gas fees (in MIST) that were paid in this epoch.
149    async fn total_gas_fees(&self) -> Option<BigInt> {
150        self.stored.total_gas_fees.map(BigInt::from)
151    }
152
153    /// The total MIST rewarded as stake.
154    async fn total_stake_rewards(&self) -> Option<BigInt> {
155        self.stored
156            .total_stake_rewards_distributed
157            .map(BigInt::from)
158    }
159
160    /// The amount added to total gas fees to make up the total stake rewards.
161    async fn total_stake_subsidies(&self) -> Option<BigInt> {
162        self.stored.stake_subsidy_amount.map(BigInt::from)
163    }
164
165    /// The storage fund available in this epoch.
166    /// This fund is used to redistribute storage fees from past transactions
167    /// to future validators.
168    async fn fund_size(&self) -> Option<BigInt> {
169        Some(BigInt::from(self.stored.storage_fund_balance))
170    }
171
172    /// The difference between the fund inflow and outflow, representing
173    /// the net amount of storage fees accumulated in this epoch.
174    async fn net_inflow(&self) -> Option<BigInt> {
175        if let (Some(fund_inflow), Some(fund_outflow)) =
176            (self.stored.storage_charge, self.stored.storage_rebate)
177        {
178            Some(BigInt::from(fund_inflow - fund_outflow))
179        } else {
180            None
181        }
182    }
183
184    /// The storage fees paid for transactions executed during the epoch.
185    async fn fund_inflow(&self) -> Option<BigInt> {
186        self.stored.storage_charge.map(BigInt::from)
187    }
188
189    /// The storage fee rebates paid to users who deleted the data associated with past
190    /// transactions.
191    async fn fund_outflow(&self) -> Option<BigInt> {
192        self.stored.storage_rebate.map(BigInt::from)
193    }
194
195    /// The epoch's corresponding protocol configuration, including the feature flags and the
196    /// configuration options.
197    async fn protocol_configs(&self, ctx: &Context<'_>) -> Result<ProtocolConfigs> {
198        ProtocolConfigs::query(ctx.data_unchecked(), Some(self.protocol_version()))
199            .await
200            .extend()
201    }
202
203    #[graphql(flatten)]
204    async fn system_state_summary(&self, ctx: &Context<'_>) -> Result<SystemStateSummary> {
205        let state = ctx
206            .data_unchecked::<PgManager>()
207            .fetch_sui_system_state(Some(self.stored.epoch as u64))
208            .await?;
209        Ok(SystemStateSummary { native: state })
210    }
211
212    /// A commitment by the committee at the end of epoch on the contents of the live object set at
213    /// that time. This can be used to verify state snapshots.
214    async fn live_object_set_digest(&self) -> Result<Option<String>> {
215        let Some(commitments) = self.stored.epoch_commitments.as_ref() else {
216            return Ok(None);
217        };
218        let commitments: Vec<EpochCommitment> = bcs::from_bytes(commitments).map_err(|e| {
219            Error::Internal(format!("Error deserializing commitments: {e}")).extend()
220        })?;
221
222        for commitment in commitments {
223            if let EpochCommitment::ECMHLiveObjectSetDigest(digest) = commitment {
224                return Ok(Some(Base58::encode(digest.digest.into_inner())));
225            }
226        }
227        Ok(None)
228    }
229
230    /// The epoch's corresponding checkpoints.
231    async fn checkpoints(
232        &self,
233        ctx: &Context<'_>,
234        first: Option<u64>,
235        after: Option<checkpoint::Cursor>,
236        last: Option<u64>,
237        before: Option<checkpoint::Cursor>,
238    ) -> Result<Connection<String, Checkpoint>> {
239        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
240        let epoch = self.stored.epoch as u64;
241        Checkpoint::paginate(
242            ctx.data_unchecked(),
243            page,
244            Some(epoch),
245            self.checkpoint_viewed_at,
246        )
247        .await
248        .extend()
249    }
250
251    /// The epoch's corresponding transaction blocks.
252    ///
253    /// `scanLimit` restricts the number of candidate transactions scanned when gathering a page of
254    /// results. It is required for queries that apply more than two complex filters (on function,
255    /// kind, sender, recipient, input object, changed object, or ids), and can be at most
256    /// `serviceConfig.maxScanLimit`.
257    ///
258    /// When the scan limit is reached the page will be returned even if it has fewer than `first`
259    /// results when paginating forward (`last` when paginating backwards). If there are more
260    /// transactions to scan, `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
261    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set to the last
262    /// transaction that was scanned as opposed to the last (or first) transaction in the page.
263    ///
264    /// Requesting the next (or previous) page after this cursor will resume the search, scanning
265    /// the next `scanLimit` many transactions in the direction of pagination, and so on until all
266    /// transactions in the scanning range have been visited.
267    ///
268    /// By default, the scanning range consists of all transactions in this epoch.
269    async fn transaction_blocks(
270        &self,
271        ctx: &Context<'_>,
272        first: Option<u64>,
273        after: Option<transaction_block::Cursor>,
274        last: Option<u64>,
275        before: Option<transaction_block::Cursor>,
276        filter: Option<TransactionBlockFilter>,
277        scan_limit: Option<u64>,
278    ) -> Result<ScanConnection<String, TransactionBlock>> {
279        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
280
281        #[allow(clippy::unnecessary_lazy_evaluations)] // rust-lang/rust-clippy#9422
282        let Some(filter) = filter
283            .unwrap_or_default()
284            .intersect(TransactionBlockFilter {
285                // If `first_checkpoint_id` is 0, we include the 0th checkpoint by leaving it None
286                after_checkpoint: (self.stored.first_checkpoint_id > 0)
287                    .then(|| UInt53::from(self.stored.first_checkpoint_id as u64 - 1)),
288                before_checkpoint: self
289                    .stored
290                    .last_checkpoint_id
291                    .map(|id| UInt53::from(id as u64 + 1)),
292                ..Default::default()
293            })
294        else {
295            return Ok(ScanConnection::new(false, false));
296        };
297
298        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
299            .await
300            .extend()
301    }
302}
303
304impl Epoch {
305    /// The epoch's protocol version.
306    pub(crate) fn protocol_version(&self) -> u64 {
307        self.stored.protocol_version as u64
308    }
309
310    /// Look up an `Epoch` in the database, optionally filtered by its Epoch ID. If no ID is
311    /// supplied, defaults to fetching the latest epoch.
312    pub(crate) async fn query(
313        ctx: &Context<'_>,
314        filter: Option<u64>,
315        checkpoint_viewed_at: u64,
316    ) -> Result<Option<Self>, Error> {
317        if let Some(epoch_id) = filter {
318            let DataLoader(dl) = ctx.data_unchecked();
319            dl.load_one(EpochKey {
320                epoch_id,
321                checkpoint_viewed_at,
322            })
323            .await
324        } else {
325            Self::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await
326        }
327    }
328
329    /// Look up the latest `Epoch` from the database, optionally filtered by a consistency cursor
330    /// (querying for a consistency cursor in the past looks for the latest epoch as of that
331    /// cursor).
332    pub(crate) async fn query_latest_at(
333        db: &Db,
334        checkpoint_viewed_at: u64,
335    ) -> Result<Option<Self>, Error> {
336        use epochs::dsl;
337
338        let stored: Option<QueryableEpochInfo> = db
339            .execute(move |conn| {
340                async move {
341                    conn.first(move || {
342                        // Bound the query on `checkpoint_viewed_at` by filtering for the epoch
343                        // whose `first_checkpoint_id <= checkpoint_viewed_at`, selecting the epoch
344                        // with the largest `first_checkpoint_id` among the filtered set.
345                        dsl::epochs
346                            .select(QueryableEpochInfo::as_select())
347                            .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
348                            .order_by(dsl::first_checkpoint_id.desc())
349                    })
350                    .await
351                    .optional()
352                }
353                .scope_boxed()
354            })
355            .await
356            .map_err(|e| Error::Internal(format!("Failed to fetch epoch: {e}")))?;
357
358        Ok(stored.map(|stored| Epoch {
359            stored,
360            checkpoint_viewed_at,
361        }))
362    }
363
364    pub(crate) async fn paginate(
365        db: &Db,
366        page: Page<Cursor>,
367        checkpoint_viewed_at: u64,
368    ) -> Result<Connection<String, Epoch>, Error> {
369        use epochs::dsl;
370        let cursor_viewed_at = page.validate_cursor_consistency()?;
371        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
372
373        let (prev, next, results) = db
374            .execute(move |conn| {
375                async move {
376                    page.paginate_query::<QueryableEpochInfo, _, _, _>(
377                        conn,
378                        checkpoint_viewed_at,
379                        move || {
380                            dsl::epochs
381                                .select(QueryableEpochInfo::as_select())
382                                .filter(dsl::first_checkpoint_id.le(checkpoint_viewed_at as i64))
383                                .into_boxed()
384                        },
385                    )
386                    .await
387                }
388                .scope_boxed()
389            })
390            .await?;
391
392        // The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
393        let mut conn = Connection::new(prev, next);
394        for stored in results {
395            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
396            conn.edges.push(Edge::new(
397                cursor,
398                Epoch {
399                    stored,
400                    checkpoint_viewed_at,
401                },
402            ));
403        }
404
405        Ok(conn)
406    }
407}
408
409impl Paginated<Cursor> for QueryableEpochInfo {
410    type Source = epochs::table;
411
412    fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
413        query.filter(epochs::dsl::epoch.ge(cursor.epoch_id as i64))
414    }
415
416    fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
417        query.filter(epochs::dsl::epoch.le(cursor.epoch_id as i64))
418    }
419
420    fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
421        use epochs::dsl;
422        if asc {
423            query.order(dsl::epoch)
424        } else {
425            query.order(dsl::epoch.desc())
426        }
427    }
428}
429
430impl Target<Cursor> for QueryableEpochInfo {
431    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
432        Cursor::new(EpochCursor {
433            checkpoint_viewed_at,
434            epoch_id: self.epoch as u64,
435        })
436    }
437}
438
439impl Checkpointed for Cursor {
440    fn checkpoint_viewed_at(&self) -> u64 {
441        self.checkpoint_viewed_at
442    }
443}
444
445impl ScanLimited for Cursor {}
446
447#[async_trait::async_trait]
448impl Loader<EpochKey> for Db {
449    type Value = Epoch;
450    type Error = Error;
451
452    async fn load(&self, keys: &[EpochKey]) -> Result<HashMap<EpochKey, Epoch>, Error> {
453        use epochs::dsl;
454
455        let epoch_ids: BTreeSet<_> = keys.iter().map(|key| key.epoch_id as i64).collect();
456        let epochs: Vec<QueryableEpochInfo> = self
457            .execute_repeatable(move |conn| {
458                async move {
459                    conn.results(move || {
460                        dsl::epochs
461                            .select(QueryableEpochInfo::as_select())
462                            .filter(dsl::epoch.eq_any(epoch_ids.iter().cloned()))
463                    })
464                    .await
465                }
466                .scope_boxed()
467            })
468            .await
469            .map_err(|e| Error::Internal(format!("Failed to fetch epochs: {e}")))?;
470
471        let epoch_id_to_stored: BTreeMap<_, _> = epochs
472            .into_iter()
473            .map(|stored| (stored.epoch as u64, stored))
474            .collect();
475
476        Ok(keys
477            .iter()
478            .filter_map(|key| {
479                let stored = epoch_id_to_stored.get(&key.epoch_id).cloned()?;
480                let epoch = Epoch {
481                    stored,
482                    checkpoint_viewed_at: key.checkpoint_viewed_at,
483                };
484
485                // We filter by checkpoint viewed at in memory because it should be quite rare that
486                // this query actually filters something (only in edge cases), and not trying to
487                // encode it in the SQL query makes the query much simpler and therefore easier for
488                // the DB to plan.
489                let start = epoch.stored.first_checkpoint_id as u64;
490                (key.checkpoint_viewed_at >= start).then_some((*key, epoch))
491            })
492            .collect())
493    }
494}