1use std::collections::{BTreeMap, BTreeSet, HashMap};
5
6use super::{
7 base64::Base64,
8 cursor::{self, Page, Paginated, ScanLimited, Target},
9 date_time::DateTime,
10 digest::Digest,
11 epoch::Epoch,
12 gas::GasCostSummary,
13 transaction_block::{self, TransactionBlock, TransactionBlockFilter},
14 uint53::UInt53,
15};
16use crate::{connection::ScanConnection, consistency::Checkpointed};
17use crate::{
18 data::{self, Conn, DataLoader, Db, DbConnection, QueryExecutor},
19 error::Error,
20};
21use async_graphql::{
22 connection::{Connection, CursorType, Edge},
23 dataloader::Loader,
24 *,
25};
26use diesel::{ExpressionMethods, OptionalExtension, QueryDsl};
27use diesel_async::scoped_futures::ScopedFutureExt;
28use fastcrypto::encoding::{Base58, Encoding};
29use serde::{Deserialize, Serialize};
30use sui_indexer::{
31 models::{checkpoints::StoredCheckpoint, raw_checkpoints::StoredRawCheckpoint},
32 schema::checkpoints,
33 schema::raw_checkpoints,
34};
35use sui_types::messages_checkpoint::{
36 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointDigest,
37};
38
39#[derive(Default, InputObject)]
41pub(crate) struct CheckpointId {
42 pub digest: Option<Digest>,
43 pub sequence_number: Option<UInt53>,
44}
45
46#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
48struct RawSeqNumKey {
49 pub sequence_number: i64,
50}
51
52#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
55struct SeqNumKey {
56 pub sequence_number: u64,
57 pub digest: Option<Digest>,
60 pub checkpoint_viewed_at: u64,
61}
62
63#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
65struct DigestKey {
66 pub digest: Digest,
67 pub checkpoint_viewed_at: u64,
68}
69
70#[derive(Clone)]
71pub(crate) struct Checkpoint {
72 pub stored: StoredCheckpoint,
75 pub checkpoint_viewed_at: u64,
77}
78
79pub(crate) type Cursor = cursor::JsonCursor<CheckpointCursor>;
80type Query<ST, GB> = data::Query<ST, checkpoints::table, GB>;
81
82#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
86pub(crate) struct CheckpointCursor {
87 #[serde(rename = "c")]
89 pub checkpoint_viewed_at: u64,
90 #[serde(rename = "s")]
91 pub sequence_number: u64,
92}
93
94#[Object]
97impl Checkpoint {
98 async fn digest(&self) -> Result<String> {
102 Ok(self.digest_impl().extend()?.base58_encode())
103 }
104
105 async fn sequence_number(&self) -> UInt53 {
108 self.sequence_number_impl().into()
109 }
110
111 async fn timestamp(&self) -> Result<DateTime> {
114 DateTime::from_ms(self.stored.timestamp_ms).extend()
115 }
116
117 async fn validator_signatures(&self) -> Base64 {
120 Base64::from(&self.stored.validator_signature)
121 }
122
123 async fn previous_checkpoint_digest(&self) -> Option<String> {
125 self.stored
126 .previous_checkpoint_digest
127 .as_ref()
128 .map(Base58::encode)
129 }
130
131 async fn network_total_transactions(&self) -> Option<UInt53> {
133 Some(self.network_total_transactions_impl().into())
134 }
135
136 async fn rolling_gas_summary(&self) -> Option<GasCostSummary> {
140 Some(GasCostSummary {
141 computation_cost: self.stored.computation_cost as u64,
142 storage_cost: self.stored.storage_cost as u64,
143 storage_rebate: self.stored.storage_rebate as u64,
144 non_refundable_storage_fee: self.stored.non_refundable_storage_fee as u64,
145 })
146 }
147
148 async fn epoch(&self, ctx: &Context<'_>) -> Result<Option<Epoch>> {
150 Epoch::query(
151 ctx,
152 Some(self.stored.epoch as u64),
153 self.checkpoint_viewed_at,
154 )
155 .await
156 .extend()
157 }
158
159 async fn transaction_blocks(
178 &self,
179 ctx: &Context<'_>,
180 first: Option<u64>,
181 after: Option<transaction_block::Cursor>,
182 last: Option<u64>,
183 before: Option<transaction_block::Cursor>,
184 filter: Option<TransactionBlockFilter>,
185 scan_limit: Option<u64>,
186 ) -> Result<ScanConnection<String, TransactionBlock>> {
187 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
188
189 let Some(filter) = filter
190 .unwrap_or_default()
191 .intersect(TransactionBlockFilter {
192 at_checkpoint: Some(UInt53::from(self.stored.sequence_number as u64)),
193 ..Default::default()
194 })
195 else {
196 return Ok(ScanConnection::new(false, false));
197 };
198
199 TransactionBlock::paginate(ctx, page, filter, self.checkpoint_viewed_at, scan_limit)
200 .await
201 .extend()
202 }
203
204 async fn bcs(&self, ctx: &Context<'_>) -> Result<Option<Base64>> {
206 let DataLoader(dl) = ctx.data_unchecked();
207 let raw_checkpoint = dl
208 .load_one(RawSeqNumKey {
209 sequence_number: self.stored.sequence_number,
210 })
211 .await?;
212
213 let summary = raw_checkpoint.map(|raw_checkpoint| {
214 bcs::from_bytes::<CertifiedCheckpointSummary>(&raw_checkpoint.certified_checkpoint)
215 .unwrap()
216 });
217
218 let checkpoint_bcs = summary
219 .map(|c| c.into_summary_and_sequence().1)
220 .map(|c| bcs::to_bytes(&c).unwrap());
221
222 Ok(checkpoint_bcs.map(Base64::from))
223 }
224
225 async fn artifacts_digest(&self) -> Result<Option<String>> {
228 let commitments: Vec<CheckpointCommitment> =
229 bcs::from_bytes(&self.stored.checkpoint_commitments).map_err(|e| {
230 Error::Internal(format!("Error deserializing commitments: {e}")).extend()
231 })?;
232
233 for commitment in commitments {
234 if let CheckpointCommitment::CheckpointArtifactsDigest(digest) = commitment {
235 return Ok(Some(digest.base58_encode()));
236 }
237 }
238 Ok(None)
239 }
240}
241
242impl CheckpointId {
243 pub(crate) fn by_seq_num(seq_num: u64) -> Self {
244 CheckpointId {
245 sequence_number: Some(seq_num.into()),
246 digest: None,
247 }
248 }
249}
250
251impl Checkpoint {
252 pub(crate) fn sequence_number_impl(&self) -> u64 {
253 self.stored.sequence_number as u64
254 }
255
256 pub(crate) fn network_total_transactions_impl(&self) -> u64 {
257 self.stored.network_total_transactions as u64
258 }
259
260 pub(crate) fn digest_impl(&self) -> Result<CheckpointDigest, Error> {
261 CheckpointDigest::try_from(self.stored.checkpoint_digest.clone())
262 .map_err(|e| Error::Internal(format!("Failed to deserialize checkpoint digest: {e}")))
263 }
264
265 pub(crate) async fn query(
269 ctx: &Context<'_>,
270 filter: CheckpointId,
271 checkpoint_viewed_at: u64,
272 ) -> Result<Option<Self>, Error> {
273 match filter {
274 CheckpointId {
275 sequence_number: Some(sequence_number),
276 digest,
277 } => {
278 let DataLoader(dl) = ctx.data_unchecked();
279 dl.load_one(SeqNumKey {
280 sequence_number: sequence_number.into(),
281 digest,
282 checkpoint_viewed_at,
283 })
284 .await
285 }
286
287 CheckpointId {
288 sequence_number: None,
289 digest: Some(digest),
290 } => {
291 let DataLoader(dl) = ctx.data_unchecked();
292 dl.load_one(DigestKey {
293 digest,
294 checkpoint_viewed_at,
295 })
296 .await
297 }
298
299 CheckpointId {
300 sequence_number: None,
301 digest: None,
302 } => Checkpoint::query_latest_at(ctx.data_unchecked(), checkpoint_viewed_at).await,
303 }
304 }
305
306 async fn query_latest_at(db: &Db, checkpoint_viewed_at: u64) -> Result<Option<Self>, Error> {
310 use checkpoints::dsl;
311
312 let stored: Option<StoredCheckpoint> = db
313 .execute(move |conn| {
314 async move {
315 conn.first(move || {
316 dsl::checkpoints
317 .filter(dsl::sequence_number.le(checkpoint_viewed_at as i64))
318 .order_by(dsl::sequence_number.desc())
319 })
320 .await
321 .optional()
322 }
323 .scope_boxed()
324 })
325 .await
326 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))?;
327
328 Ok(stored.map(|stored| Checkpoint {
329 stored,
330 checkpoint_viewed_at,
331 }))
332 }
333
334 pub(crate) async fn query_timestamp(
337 conn: &mut Conn<'_>,
338 seq_num: u64,
339 ) -> Result<u64, diesel::result::Error> {
340 use checkpoints::dsl;
341
342 let stored: i64 = conn
343 .first(move || {
344 dsl::checkpoints
345 .select(dsl::timestamp_ms)
346 .filter(dsl::sequence_number.eq(seq_num as i64))
347 })
348 .await?;
349
350 Ok(stored as u64)
351 }
352
353 pub(crate) async fn paginate(
366 db: &Db,
367 page: Page<Cursor>,
368 filter: Option<u64>,
369 checkpoint_viewed_at: u64,
370 ) -> Result<Connection<String, Checkpoint>, Error> {
371 use checkpoints::dsl;
372 let cursor_viewed_at = page.validate_cursor_consistency()?;
373 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
374
375 let (prev, next, results) = db
376 .execute(move |conn| {
377 async move {
378 page.paginate_query::<StoredCheckpoint, _, _, _>(
379 conn,
380 checkpoint_viewed_at,
381 move || {
382 let mut query = dsl::checkpoints.into_boxed();
383 query =
384 query.filter(dsl::sequence_number.le(checkpoint_viewed_at as i64));
385 if let Some(epoch) = filter {
386 query = query.filter(dsl::epoch.eq(epoch as i64));
387 }
388 query
389 },
390 )
391 .await
392 }
393 .scope_boxed()
394 })
395 .await?;
396
397 let mut conn = Connection::new(prev, next);
399 for stored in results {
400 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
401 conn.edges.push(Edge::new(
402 cursor,
403 Checkpoint {
404 stored,
405 checkpoint_viewed_at,
406 },
407 ));
408 }
409
410 Ok(conn)
411 }
412}
413
414impl Paginated<Cursor> for StoredCheckpoint {
415 type Source = checkpoints::table;
416
417 fn filter_ge<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
418 query.filter(checkpoints::dsl::sequence_number.ge(cursor.sequence_number as i64))
419 }
420
421 fn filter_le<ST, GB>(cursor: &Cursor, query: Query<ST, GB>) -> Query<ST, GB> {
422 query.filter(checkpoints::dsl::sequence_number.le(cursor.sequence_number as i64))
423 }
424
425 fn order<ST, GB>(asc: bool, query: Query<ST, GB>) -> Query<ST, GB> {
426 use checkpoints::dsl;
427 if asc {
428 query.order(dsl::sequence_number)
429 } else {
430 query.order(dsl::sequence_number.desc())
431 }
432 }
433}
434
435impl Target<Cursor> for StoredCheckpoint {
436 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
437 Cursor::new(CheckpointCursor {
438 checkpoint_viewed_at,
439 sequence_number: self.sequence_number as u64,
440 })
441 }
442}
443
444impl Checkpointed for Cursor {
445 fn checkpoint_viewed_at(&self) -> u64 {
446 self.checkpoint_viewed_at
447 }
448}
449
450impl ScanLimited for Cursor {}
451
452#[async_trait::async_trait]
453impl Loader<SeqNumKey> for Db {
454 type Value = Checkpoint;
455 type Error = Error;
456
457 async fn load(&self, keys: &[SeqNumKey]) -> Result<HashMap<SeqNumKey, Checkpoint>, Error> {
458 use checkpoints::dsl;
459
460 let checkpoint_ids: BTreeSet<_> = keys
461 .iter()
462 .filter_map(|key| {
463 (key.checkpoint_viewed_at >= key.sequence_number)
465 .then_some(key.sequence_number as i64)
466 })
467 .collect();
468
469 let checkpoints: Vec<StoredCheckpoint> = self
470 .execute(move |conn| {
471 async move {
472 conn.results(move || {
473 dsl::checkpoints
474 .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
475 })
476 .await
477 }
478 .scope_boxed()
479 })
480 .await
481 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
482
483 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
484 .into_iter()
485 .map(|stored| (stored.sequence_number as u64, stored))
486 .collect();
487
488 Ok(keys
489 .iter()
490 .filter_map(|key| {
491 let stored = checkpoint_id_to_stored.get(&key.sequence_number).cloned()?;
492 let checkpoint = Checkpoint {
493 stored,
494 checkpoint_viewed_at: key.checkpoint_viewed_at,
495 };
496
497 let digest = &checkpoint.stored.checkpoint_digest;
498 if matches!(key.digest, Some(d) if d.as_slice() != digest) {
499 None
500 } else {
501 Some((*key, checkpoint))
502 }
503 })
504 .collect())
505 }
506}
507
508#[async_trait::async_trait]
509impl Loader<DigestKey> for Db {
510 type Value = Checkpoint;
511 type Error = Error;
512
513 async fn load(&self, keys: &[DigestKey]) -> Result<HashMap<DigestKey, Checkpoint>, Error> {
514 use checkpoints::dsl;
515
516 let digests: BTreeSet<_> = keys.iter().map(|key| key.digest.to_vec()).collect();
517
518 let checkpoints: Vec<StoredCheckpoint> = self
519 .execute(move |conn| {
520 async move {
521 conn.results(move || {
522 dsl::checkpoints
523 .filter(dsl::checkpoint_digest.eq_any(digests.iter().cloned()))
524 })
525 .await
526 }
527 .scope_boxed()
528 })
529 .await
530 .map_err(|e| Error::Internal(format!("Failed to fetch checkpoints: {e}")))?;
531
532 let checkpoint_id_to_stored: BTreeMap<_, _> = checkpoints
533 .into_iter()
534 .map(|stored| (stored.checkpoint_digest.clone(), stored))
535 .collect();
536
537 Ok(keys
538 .iter()
539 .filter_map(|key| {
540 let DigestKey {
541 digest,
542 checkpoint_viewed_at,
543 } = *key;
544
545 let stored = checkpoint_id_to_stored.get(digest.as_slice()).cloned()?;
546 let checkpoint = Checkpoint {
547 stored,
548 checkpoint_viewed_at,
549 };
550
551 let seq_num = checkpoint.stored.sequence_number as u64;
555 (checkpoint_viewed_at >= seq_num).then_some((*key, checkpoint))
556 })
557 .collect())
558 }
559}
560
561#[async_trait::async_trait]
562impl Loader<RawSeqNumKey> for Db {
563 type Value = StoredRawCheckpoint;
564 type Error = Error;
565
566 async fn load(
567 &self,
568 keys: &[RawSeqNumKey],
569 ) -> Result<HashMap<RawSeqNumKey, StoredRawCheckpoint>, Error> {
570 use raw_checkpoints::dsl;
571
572 let checkpoint_ids = keys
573 .iter()
574 .map(|key| key.sequence_number)
575 .collect::<Vec<_>>();
576
577 let raw_checkpoints: Vec<StoredRawCheckpoint> = self
578 .execute(move |conn| {
579 async move {
580 conn.results(move || {
581 dsl::raw_checkpoints
582 .filter(dsl::sequence_number.eq_any(checkpoint_ids.iter().cloned()))
583 })
584 .await
585 }
586 .scope_boxed()
587 })
588 .await
589 .map_err(|e| Error::Internal(format!("Failed to fetch raw checkpoints: {e}")))?;
590
591 Ok(raw_checkpoints
592 .into_iter()
593 .map(|raw| {
594 (
595 RawSeqNumKey {
596 sequence_number: raw.sequence_number,
597 },
598 raw,
599 )
600 })
601 .collect())
602 }
603}