sui_graphql_rpc/types/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, BTreeSet, HashMap};
5
6use super::{
7    base64::Base64,
8    cursor::{self, Page, Paginated, ScanLimited, Target},
9    date_time::DateTime,
10    digest::Digest,
11    epoch::Epoch,
12    gas::GasCostSummary,
13    transaction_block::{self, TransactionBlock, TransactionBlockFilter},
14    uint53::UInt53,
15};
16use crate::{connection::ScanConnection, consistency::Checkpointed};
17use crate::{
18    data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
19    error::Error,
20};
21use async_graphql::{
22    connection::{Connection, CursorType, Edge},
23    dataloader::Loader,
24    *,
25};
26use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
27use diesel_async::scoped_futures::ScopedFutureExt;
28use fastcrypto::encoding::{Base58, Encoding};
29use serde::{Deserialize, Serialize};
30use sui_indexer::{
31    models::{checkpoints::StoredCheckpoint, raw_checkpoints::StoredRawCheckpoint},
32    schema::checkpoints,
33    schema::raw_checkpoints,
34};
35use sui_types::messages_checkpoint::{
36    CertifiedCheckpointSummary, CheckpointCommitment, CheckpointDigest,
37};
38
39/// Filter either by the digest, or the sequence number, or neither, to get the latest checkpoint.
40#[derive(Default, InputObject)]
41pub(crate) struct CheckpointId {
42    pub digest: Option<Digest>,
43    pub sequence_number: Option<UInt53>,
44}
45
46/// `DataLoader` key for fetching `StoredRawCheckpoint` by its sequence number.
47#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
48struct RawSeqNumKey {
49    pub sequence_number: i64,
50}
51
52/// `DataLoader` key for fetching a `Checkpoint` by its sequence number, constrained by a consistency
53/// cursor.
54#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
55struct SeqNumKey {
56    pub sequence_number: u64,
57    /// The digest is not used for fetching, but is used as an additional filter, to correctly
58    /// implement a request that sets both a sequence number and a digest.
59    pub digest: Option<Digest>,
60    pub checkpoint_viewed_at: u64,
61}
62
63/// `DataLoader` key for fetching a `Checkpoint` by its digest, constrained by a consistency cursor.
64#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
65struct DigestKey {
66    pub digest: Digest,
67    pub checkpoint_viewed_at: u64,
68}
69
70#[derive(Clone)]
71pub(crate) struct Checkpoint {
72    /// Representation of transaction data in the Indexer's Store. The indexer stores the
73    /// transaction data and its effects together, in one table.
74    pub stored: StoredCheckpoint,
75    /// The checkpoint_sequence_number at which this was viewed at.
76    pub checkpoint_viewed_at: u64,
77}
78
79pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
80type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
81
82/// The cursor returned for each `Checkpoint` in a connection's page of results. The
83/// `checkpoint_viewed_at` will set the consistent upper bound for subsequent queries made on this
84/// cursor.
85#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
86pub(crate) struct CheckpointCursor {
87    /// The checkpoint sequence number this was viewed at.
88    #[serde(rename = "c")]
89    pub checkpoint_viewed_at: u64,
90    #[serde(rename = "s")]
91    pub sequence_number: u64,
92}
93
94/// Checkpoints contain finalized transactions and are used for node synchronization
95/// and global transaction ordering.
96#[Object]
97impl Checkpoint {
98    /// A 32-byte hash that uniquely identifies the checkpoint contents, encoded in Base58. This
99    /// hash can be used to verify checkpoint contents by checking signatures against the committee,
100    /// Hashing contents to match digest, and checking that the previous checkpoint digest matches.
101    async fn digest(&self) -> Result<String> {
102        Ok(self.digest_impl().extend()?.base58_encode())
103    }
104
105    /// This checkpoint's position in the total order of finalized checkpoints, agreed upon by
106    /// consensus.
107    async fn sequence_number(&self) -> UInt53 {
108        self.sequence_number_impl().into()
109    }
110
111    /// The timestamp at which the checkpoint is agreed to have happened according to consensus.
112    /// Transactions that access time in this checkpoint will observe this timestamp.
113    async fn timestamp(&self) -> Result<DateTime> {
114        DateTime::from_ms(self.stored.timestamp_ms).extend()
115    }
116
117    /// This is an aggregation of signatures from a quorum of validators for the checkpoint
118    /// proposal.
119    async fn validator_signatures(&self) -> Base64 {
120        Base64::from(&self.stored.validator_signature)
121    }
122
123    /// The digest of the checkpoint at the previous sequence number.
124    async fn previous_checkpoint_digest(&self) -> Option<String> {
125        self.stored
126            .previous_checkpoint_digest
127            .as_ref()
128            .map(Base58::encode)
129    }
130
131    /// The total number of transaction blocks in the network by the end of this checkpoint.
132    async fn network_total_transactions(&self) -> Option<UInt53> {
133        Some(self.network_total_transactions_impl().into())
134    }
135
136    /// The computation cost, storage cost, storage rebate, and non-refundable storage fee
137    /// accumulated during this epoch, up to and including this checkpoint. These values increase
138    /// monotonically across checkpoints in the same epoch, and reset on epoch boundaries.
139    async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
140        Some(GasCostSummary {
141            computation_cost: self.stored.computation_cost as u64,
142            storage_cost: self.stored.storage_cost as u64,
143            storage_rebate: self.stored.storage_rebate as u64,
144            non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
145        })
146    }
147
148    /// The epoch this checkpoint is part of.
149    async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
150        Epoch::query(
151            ctx,
152            Some(self.stored.epoch as u64),
153            self.checkpoint_viewed_at,
154        )
155        .await
156        .extend()
157    }
158
159    /// Transactions in this checkpoint.
160    ///
161    /// `scanLimit` restricts the number of candidate transactions scanned when gathering a page of
162    /// results. It is required for queries that apply more than two complex filters (on function,
163    /// kind, sender, recipient, input object, changed object, or ids), and can be at most
164    /// `serviceConfig.maxScanLimit`.
165    ///
166    /// When the scan limit is reached the page will be returned even if it has fewer than `first`
167    /// results when paginating forward (`last` when paginating backwards). If there are more
168    /// transactions to scan, `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
169    /// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set to the last
170    /// transaction that was scanned as opposed to the last (or first) transaction in the page.
171    ///
172    /// Requesting the next (or previous) page after this cursor will resume the search, scanning
173    /// the next `scanLimit` many transactions in the direction of pagination, and so on until all
174    /// transactions in the scanning range have been visited.
175    ///
176    /// By default, the scanning range consists of all transactions in this checkpoint.
177    async fn transaction_blocks(
178        &self,
179        ctx: &Context<'_>,
180        first: Option<u64>,
181        after: Option<transaction_block::Cursor>,
182        last: Option<u64>,
183        before: Option<transaction_block::Cursor>,
184        filter: Option<TransactionBlockFilter>,
185        scan_limit: Option<u64>,
186    ) -> Result<ScanConnection<String, TransactionBlock>> {
187        let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
188
189        let Some(filter) = filter
190            .unwrap_or_default()
191            .intersect(TransactionBlockFilter {
192                at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
193                ..Default::default()
194            })
195        else {
196            return Ok(ScanConnection::new(false, false));
197        };
198
199        TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
200            .await
201            .extend()
202    }
203
204    /// The Base64 serialized BCS bytes of CheckpointSummary for this checkpoint.
205    async fn bcs(&self, ctx: &Context<'_>) -> Result<Option<Base64>> {
206        let DataLoader(dl) = ctx.data_unchecked();
207        let raw_checkpoint = dl
208            .load_one(RawSeqNumKey {
209                sequence_number: self.stored.sequence_number,
210            })
211            .await?;
212
213        let summary = raw_checkpoint.map(|raw_checkpoint| {
214            bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
215                .unwrap()
216        });
217
218        let checkpoint_bcs = summary
219            .map(|c| c.into_summary_and_sequence().1)
220            .map(|c| bcs::to_bytes(&c).unwrap());
221
222        Ok(checkpoint_bcs.map(Base64::from))
223    }
224
225    /// A commitment by the committee on the artifacts of the checkpoint.
226    /// e.g., object checkpoint states
227    async fn artifacts_digest(&self) -> Result<Option<String>> {
228        let commitments: Vec<CheckpointCommitment> =
229            bcs::from_bytes(&self.stored.checkpoint_commitments).map_err(|e| {
230                Error::Internal(format!("Error deserializing commitments: {e}")).extend()
231            })?;
232
233        for commitment in commitments {
234            if let CheckpointCommitment::CheckpointArtifactsDigest(digest) = commitment {
235                return Ok(Some(digest.base58_encode()));
236            }
237        }
238        Ok(None)
239    }
240}
241
242impl CheckpointId {
243    pub(crate) fn by_seq_num(seq_num: u64) -> Self {
244        CheckpointId {
245            sequence_number: Some(seq_num.into()),
246            digest: None,
247        }
248    }
249}
250
251impl Checkpoint {
252    pub(crate) fn sequence_number_impl(&self) -> u64 {
253        self.stored.sequence_number as u64
254    }
255
256    pub(crate) fn network_total_transactions_impl(&self) -> u64 {
257        self.stored.network_total_transactions as u64
258    }
259
260    pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
261        CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
262            .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
263    }
264
265    /// Look up a `Checkpoint` in the database, filtered by either sequence number or digest. If
266    /// both filters are supplied they will both be applied. If none are supplied, the latest
267    /// checkpoint is fetched.
268    pub(crate) async fn query(
269        ctx: &Context<'_>,
270        filter: CheckpointId,
271        checkpoint_viewed_at: u64,
272    ) -> Result<Option<Self>, Error> {
273        match filter {
274            CheckpointId {
275                sequence_number: Some(sequence_number),
276                digest,
277            } => {
278                let DataLoader(dl) = ctx.data_unchecked();
279                dl.load_one(SeqNumKey {
280                    sequence_number: sequence_number.into(),
281                    digest,
282                    checkpoint_viewed_at,
283                })
284                .await
285            }
286
287            CheckpointId {
288                sequence_number: None,
289                digest: Some(digest),
290            } => {
291                let DataLoader(dl) = ctx.data_unchecked();
292                dl.load_one(DigestKey {
293                    digest,
294                    checkpoint_viewed_at,
295                })
296                .await
297            }
298
299            CheckpointId {
300                sequence_number: None,
301                digest: None,
302            } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
303        }
304    }
305
306    /// Look up the latest `Checkpoint` from the database, optionally filtered by a consistency
307    /// cursor (querying for a consistency cursor in the past looks for the latest checkpoint as of
308    /// that cursor).
309    async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
310        use checkpoints::dsl;
311
312        let stored: Option<StoredCheckpoint> = db
313            .execute(move |conn| {
314                async move {
315                    conn.first(move || {
316                        dsl::checkpoints
317                            .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
318                            .order_by(dsl::sequence_number.desc())
319                    })
320                    .await
321                    .optional()
322                }
323                .scope_boxed()
324            })
325            .await
326            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
327
328        Ok(stored.map(|stored| Checkpoint {
329            stored,
330            checkpoint_viewed_at,
331        }))
332    }
333
334    /// Look up a `Checkpoint` in the database and retrieve its `timestamp_ms` field. This method
335    /// takes a connection, so that it can be used within a transaction.
336    pub(crate) async fn query_timestamp(
337        conn: &mut Conn<'_>,
338        seq_num: u64,
339    ) -> Result<u64, diesel::result::Error> {
340        use checkpoints::dsl;
341
342        let stored: i64 = conn
343            .first(move || {
344                dsl::checkpoints
345                    .select(dsl::timestamp_ms)
346                    .filter(dsl::sequence_number.eq(seq_num as i64))
347            })
348            .await?;
349
350        Ok(stored as u64)
351    }
352
353    /// Query the database for a `page` of checkpoints. The Page uses the checkpoint sequence number
354    /// of the stored checkpoint and the checkpoint at which this was viewed at as the cursor, and
355    /// can optionally be further `filter`-ed by an epoch number (to only return checkpoints within
356    /// that epoch).
357    ///
358    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence number at which this
359    /// page was queried for. Each entity returned in the connection will inherit this checkpoint,
360    /// so that when viewing that entity's state, it will be from the reference of this
361    /// checkpoint_viewed_at parameter.
362    ///
363    /// If the `Page<Cursor>` is set, then this function will defer to the `checkpoint_viewed_at` in
364    /// the cursor if they are consistent.
365    pub(crate) async fn paginate(
366        db: &Db,
367        page: Page<Cursor>,
368        filter: Option<u64>,
369        checkpoint_viewed_at: u64,
370    ) -> Result<Connection<String, Checkpoint>, Error> {
371        use checkpoints::dsl;
372        let cursor_viewed_at = page.validate_cursor_consistency()?;
373        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
374
375        let (prev, next, results) = db
376            .execute(move |conn| {
377                async move {
378                    page.paginate_query::<StoredCheckpoint, _, _, _>(
379                        conn,
380                        checkpoint_viewed_at,
381                        move || {
382                            let mut query = dsl::checkpoints.into_boxed();
383                            query =
384                                query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
385                            if let Some(epoch) = filter {
386                                query = query.filter(dsl::epoch.eq(epoch as i64));
387                            }
388                            query
389                        },
390                    )
391                    .await
392                }
393                .scope_boxed()
394            })
395            .await?;
396
397        // The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
398        let mut conn = Connection::new(prev, next);
399        for stored in results {
400            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
401            conn.edges.push(Edge::new(
402                cursor,
403                Checkpoint {
404                    stored,
405                    checkpoint_viewed_at,
406                },
407            ));
408        }
409
410        Ok(conn)
411    }
412}
413
414impl Paginated<Cursor> for StoredCheckpoint {
415    type Source = checkpoints::table;
416
417    fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
418        query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
419    }
420
421    fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
422        query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
423    }
424
425    fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
426        use checkpoints::dsl;
427        if asc {
428            query.order(dsl::sequence_number)
429        } else {
430            query.order(dsl::sequence_number.desc())
431        }
432    }
433}
434
435impl Target<Cursor> for StoredCheckpoint {
436    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
437        Cursor::new(CheckpointCursor {
438            checkpoint_viewed_at,
439            sequence_number: self.sequence_number as u64,
440        })
441    }
442}
443
444impl Checkpointed for Cursor {
445    fn checkpoint_viewed_at(&self) -> u64 {
446        self.checkpoint_viewed_at
447    }
448}
449
450impl ScanLimited for Cursor {}
451
452#[async_trait::async_trait]
453impl Loader<SeqNumKey> for Db {
454    type Value = Checkpoint;
455    type Error = Error;
456
457    async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
458        use checkpoints::dsl;
459
460        let checkpoint_ids: BTreeSet<_> = keys
461            .iter()
462            .filter_map(|key| {
463                // Filter out keys querying for checkpoints after their own consistency cursor.
464                (key.checkpoint_viewed_at >= key.sequence_number)
465                    .then_some(key.sequence_number as i64)
466            })
467            .collect();
468
469        let checkpoints: Vec<StoredCheckpoint> = self
470            .execute(move |conn| {
471                async move {
472                    conn.results(move || {
473                        dsl::checkpoints
474                            .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
475                    })
476                    .await
477                }
478                .scope_boxed()
479            })
480            .await
481            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
482
483        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
484            .into_iter()
485            .map(|stored| (stored.sequence_number as u64, stored))
486            .collect();
487
488        Ok(keys
489            .iter()
490            .filter_map(|key| {
491                let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
492                let checkpoint = Checkpoint {
493                    stored,
494                    checkpoint_viewed_at: key.checkpoint_viewed_at,
495                };
496
497                let digest = &checkpoint.stored.checkpoint_digest;
498                if matches!(key.digest, Some(d) if d.as_slice() != digest) {
499                    None
500                } else {
501                    Some((*key, checkpoint))
502                }
503            })
504            .collect())
505    }
506}
507
508#[async_trait::async_trait]
509impl Loader<DigestKey> for Db {
510    type Value = Checkpoint;
511    type Error = Error;
512
513    async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
514        use checkpoints::dsl;
515
516        let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
517
518        let checkpoints: Vec<StoredCheckpoint> = self
519            .execute(move |conn| {
520                async move {
521                    conn.results(move || {
522                        dsl::checkpoints
523                            .filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
524                    })
525                    .await
526                }
527                .scope_boxed()
528            })
529            .await
530            .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
531
532        let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
533            .into_iter()
534            .map(|stored| (stored.checkpoint_digest.clone(), stored))
535            .collect();
536
537        Ok(keys
538            .iter()
539            .filter_map(|key| {
540                let DigestKey {
541                    digest,
542                    checkpoint_viewed_at,
543                } = *key;
544
545                let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
546                let checkpoint = Checkpoint {
547                    stored,
548                    checkpoint_viewed_at,
549                };
550
551                // Filter by key's checkpoint viewed at here. Doing this in memory because it should
552                // be quite rare that this query actually filters something, but encoding it in SQL
553                // is complicated.
554                let seq_num = checkpoint.stored.sequence_number as u64;
555                (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
556            })
557            .collect())
558    }
559}
560
561#[async_trait::async_trait]
562impl Loader<RawSeqNumKey> for Db {
563    type Value = StoredRawCheckpoint;
564    type Error = Error;
565
566    async fn load(
567        &self,
568        keys: &[RawSeqNumKey],
569    ) -> Result<HashMap<RawSeqNumKey, StoredRawCheckpoint>, Error> {
570        use raw_checkpoints::dsl;
571
572        let checkpoint_ids = keys
573            .iter()
574            .map(|key| key.sequence_number)
575            .collect::<Vec<_>>();
576
577        let raw_checkpoints: Vec<StoredRawCheckpoint> = self
578            .execute(move |conn| {
579                async move {
580                    conn.results(move || {
581                        dsl::raw_checkpoints
582                            .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
583                    })
584                    .await
585                }
586                .scope_boxed()
587            })
588            .await
589            .map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoints: {e}")))?;
590
591        Ok(raw_checkpoints
592            .into_iter()
593            .map(|raw| {
594                (
595                    RawSeqNumKey {
596                        sequence_number: raw.sequence_number,
597                    },
598                    raw,
599                )
600            })
601            .collect())
602    }
603}