1use std::collections::{BTreeMap, BTreeSet, HashMap};
5
6use super::balance::{self, Balance};
7use super::base64::Base64;
8use super::big_int::BigInt;
9use super::coin::Coin;
10use super::cursor::{BcsCursor, JsonCursor, Page, RawPaginated, ScanLimited, Target};
11use super::move_module::MoveModule;
12use super::move_object::MoveObject;
13use super::object::{self, Object, ObjectFilter, ObjectImpl, ObjectOwner, ObjectStatus};
14use super::owner::OwnerImpl;
15use super::stake::StakedSui;
16use super::sui_address::SuiAddress;
17use super::suins_registration::{DomainFormat, SuinsRegistration};
18use super::transaction_block::{self, TransactionBlock, TransactionBlockFilter};
19use super::type_filter::ExactTypeFilter;
20use super::uint53::UInt53;
21use crate::connection::ScanConnection;
22use crate::consistency::{Checkpointed, ConsistentNamedCursor};
23use crate::data::{DataLoader, Db, DbConnection, QueryExecutor};
24use crate::error::Error;
25use crate::raw_query::RawQuery;
26use crate::types::sui_address::addr;
27use crate::{filter, query};
28use async_graphql::connection::{Connection, CursorType, Edge};
29use async_graphql::dataloader::Loader;
30use async_graphql::*;
31use diesel::prelude::QueryableByName;
32use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable};
33use diesel_async::scoped_futures::ScopedFutureExt;
34use serde::{Deserialize, Serialize};
35use sui_indexer::models::objects::StoredFullHistoryObject;
36use sui_indexer::schema::packages;
37use sui_package_resolver::{error::Error as PackageCacheError, Package as ParsedMovePackage};
38use sui_types::is_system_package;
39use sui_types::{move_package::MovePackage as NativeMovePackage, object::Data};
40
41#[derive(Clone)]
42pub(crate) struct MovePackage {
43 pub super_: Object,
45
46 pub native: NativeMovePackage,
49}
50
51#[derive(InputObject, Debug, Default, Clone)]
53pub(crate) struct MovePackageCheckpointFilter {
54 pub after_checkpoint: Option<UInt53>,
57
58 pub before_checkpoint: Option<UInt53>,
61}
62
63#[derive(InputObject, Debug, Default, Clone)]
65pub(crate) struct MovePackageVersionFilter {
66 pub after_version: Option<UInt53>,
69
70 pub before_version: Option<UInt53>,
73}
74
75pub(crate) enum PackageLookup {
79 ById { checkpoint_viewed_at: u64 },
81
82 Versioned {
85 version: u64,
86 checkpoint_viewed_at: u64,
87 },
88
89 Latest { checkpoint_viewed_at: u64 },
92}
93
94#[derive(SimpleObject)]
96struct Linkage {
97 original_id: SuiAddress,
99
100 upgraded_id: SuiAddress,
102
103 version: UInt53,
105}
106
107#[derive(SimpleObject)]
109struct TypeOrigin {
110 module: String,
112
113 #[graphql(name = "struct")]
115 struct_: String,
116
117 defining_id: SuiAddress,
119}
120
121#[derive(Selectable, QueryableByName)]
124#[diesel(table_name = packages)]
125struct StoredHistoryPackage {
126 checkpoint_sequence_number: i64,
127 original_id: Vec<u8>,
128 #[diesel(embed)]
129 object: StoredFullHistoryObject,
130}
131
132pub(crate) struct MovePackageDowncastError;
133
134pub(crate) type CModule = JsonCursor<ConsistentNamedCursor>;
135pub(crate) type Cursor = BcsCursor<PackageCursor>;
136
137#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
145pub(crate) struct PackageCursor {
146 pub checkpoint_sequence_number: u64,
147 pub original_id: Vec<u8>,
148 pub package_version: u64,
149 pub checkpoint_viewed_at: u64,
150}
151
152#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
159struct PackageVersionKey {
160 address: SuiAddress,
161 version: u64,
162}
163
164#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)]
167struct LatestKey {
168 address: SuiAddress,
169 checkpoint_viewed_at: u64,
170}
171
172#[Object]
175impl MovePackage {
176 pub(crate) async fn address(&self) -> SuiAddress {
177 OwnerImpl::from(&self.super_).address().await
178 }
179
180 pub(crate) async fn objects(
185 &self,
186 ctx: &Context<'_>,
187 first: Option<u64>,
188 after: Option<object::Cursor>,
189 last: Option<u64>,
190 before: Option<object::Cursor>,
191 filter: Option<ObjectFilter>,
192 ) -> Result<Connection<String, MoveObject>> {
193 OwnerImpl::from(&self.super_)
194 .objects(ctx, first, after, last, before, filter)
195 .await
196 }
197
198 pub(crate) async fn balance(
204 &self,
205 ctx: &Context<'_>,
206 type_: Option<ExactTypeFilter>,
207 ) -> Result<Option<Balance>> {
208 OwnerImpl::from(&self.super_).balance(ctx, type_).await
209 }
210
211 pub(crate) async fn balances(
216 &self,
217 ctx: &Context<'_>,
218 first: Option<u64>,
219 after: Option<balance::Cursor>,
220 last: Option<u64>,
221 before: Option<balance::Cursor>,
222 ) -> Result<Connection<String, Balance>> {
223 OwnerImpl::from(&self.super_)
224 .balances(ctx, first, after, last, before)
225 .await
226 }
227
228 pub(crate) async fn coins(
235 &self,
236 ctx: &Context<'_>,
237 first: Option<u64>,
238 after: Option<object::Cursor>,
239 last: Option<u64>,
240 before: Option<object::Cursor>,
241 type_: Option<ExactTypeFilter>,
242 ) -> Result<Connection<String, Coin>> {
243 OwnerImpl::from(&self.super_)
244 .coins(ctx, first, after, last, before, type_)
245 .await
246 }
247
248 pub(crate) async fn staked_suis(
253 &self,
254 ctx: &Context<'_>,
255 first: Option<u64>,
256 after: Option<object::Cursor>,
257 last: Option<u64>,
258 before: Option<object::Cursor>,
259 ) -> Result<Connection<String, StakedSui>> {
260 OwnerImpl::from(&self.super_)
261 .staked_suis(ctx, first, after, last, before)
262 .await
263 }
264
265 pub(crate) async fn default_suins_name(
267 &self,
268 ctx: &Context<'_>,
269 format: Option<DomainFormat>,
270 ) -> Result<Option<String>> {
271 OwnerImpl::from(&self.super_)
272 .default_suins_name(ctx, format)
273 .await
274 }
275
276 pub(crate) async fn suins_registrations(
282 &self,
283 ctx: &Context<'_>,
284 first: Option<u64>,
285 after: Option<object::Cursor>,
286 last: Option<u64>,
287 before: Option<object::Cursor>,
288 ) -> Result<Connection<String, SuinsRegistration>> {
289 OwnerImpl::from(&self.super_)
290 .suins_registrations(ctx, first, after, last, before)
291 .await
292 }
293
294 pub(crate) async fn version(&self) -> UInt53 {
295 ObjectImpl(&self.super_).version().await
296 }
297
298 pub(crate) async fn status(&self) -> ObjectStatus {
306 ObjectImpl(&self.super_).status().await
307 }
308
309 pub(crate) async fn digest(&self) -> Option<String> {
311 ObjectImpl(&self.super_).digest().await
312 }
313
314 pub(crate) async fn owner(&self) -> Option<ObjectOwner> {
317 ObjectImpl(&self.super_).owner().await
318 }
319
320 pub(crate) async fn previous_transaction_block(
322 &self,
323 ctx: &Context<'_>,
324 ) -> Result<Option<TransactionBlock>> {
325 ObjectImpl(&self.super_)
326 .previous_transaction_block(ctx)
327 .await
328 }
329
330 pub(crate) async fn storage_rebate(&self) -> Option<BigInt> {
336 ObjectImpl(&self.super_).storage_rebate().await
337 }
338
339 pub(crate) async fn received_transaction_blocks(
362 &self,
363 ctx: &Context<'_>,
364 first: Option<u64>,
365 after: Option<transaction_block::Cursor>,
366 last: Option<u64>,
367 before: Option<transaction_block::Cursor>,
368 filter: Option<TransactionBlockFilter>,
369 scan_limit: Option<u64>,
370 ) -> Result<ScanConnection<String, TransactionBlock>> {
371 ObjectImpl(&self.super_)
372 .received_transaction_blocks(ctx, first, after, last, before, filter, scan_limit)
373 .await
374 }
375
376 pub(crate) async fn bcs(&self) -> Result<Option<Base64>> {
378 ObjectImpl(&self.super_).bcs().await
379 }
380
381 async fn package_at_version(
384 &self,
385 ctx: &Context<'_>,
386 version: u64,
387 ) -> Result<Option<MovePackage>> {
388 MovePackage::query(
389 ctx,
390 self.super_.address,
391 MovePackage::by_version(version, self.checkpoint_viewed_at_impl()),
392 )
393 .await
394 .extend()
395 }
396
397 async fn package_versions(
401 &self,
402 ctx: &Context<'_>,
403 first: Option<u64>,
404 after: Option<Cursor>,
405 last: Option<u64>,
406 before: Option<Cursor>,
407 filter: Option<MovePackageVersionFilter>,
408 ) -> Result<Connection<String, MovePackage>> {
409 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
410
411 MovePackage::paginate_by_version(
412 ctx.data_unchecked(),
413 page,
414 self.super_.address,
415 filter,
416 self.checkpoint_viewed_at_impl(),
417 )
418 .await
419 .extend()
420 }
421
422 async fn latest_package(&self, ctx: &Context<'_>) -> Result<MovePackage> {
425 Ok(MovePackage::query(
426 ctx,
427 self.super_.address,
428 MovePackage::latest_at(self.checkpoint_viewed_at_impl()),
429 )
430 .await
431 .extend()?
432 .ok_or_else(|| Error::Internal("No latest version found".to_string()))?)
433 }
434
435 async fn module(&self, name: String) -> Result<Option<MoveModule>> {
438 self.module_impl(&name).extend()
439 }
440
441 pub async fn modules(
443 &self,
444 ctx: &Context<'_>,
445 first: Option<u64>,
446 after: Option<CModule>,
447 last: Option<u64>,
448 before: Option<CModule>,
449 ) -> Result<Option<Connection<String, MoveModule>>> {
450 use std::ops::Bound as B;
451
452 let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?;
453 let cursor_viewed_at = page.validate_cursor_consistency()?;
454 let checkpoint_viewed_at =
455 cursor_viewed_at.unwrap_or_else(|| self.checkpoint_viewed_at_impl());
456
457 let parsed = self.parsed_package()?;
458 let module_range = parsed.modules().range::<String, _>((
459 page.after().map_or(B::Unbounded, |a| B::Excluded(&a.name)),
460 page.before().map_or(B::Unbounded, |b| B::Excluded(&b.name)),
461 ));
462
463 let mut connection = Connection::new(false, false);
464 let modules = if page.is_from_front() {
465 module_range.take(page.limit()).collect()
466 } else {
467 let mut ms: Vec<_> = module_range.rev().take(page.limit()).collect();
468 ms.reverse();
469 ms
470 };
471
472 connection.has_previous_page = modules.first().is_some_and(|(fst, _)| {
473 parsed
474 .modules()
475 .range::<String, _>((B::Unbounded, B::Excluded(*fst)))
476 .next()
477 .is_some()
478 });
479
480 connection.has_next_page = modules.last().is_some_and(|(lst, _)| {
481 parsed
482 .modules()
483 .range::<String, _>((B::Excluded(*lst), B::Unbounded))
484 .next()
485 .is_some()
486 });
487
488 for (name, parsed) in modules {
489 let Some(native) = self.native.serialized_module_map().get(name) else {
490 return Err(Error::Internal(format!(
491 "Module '{name}' exists in PackageCache but not in serialized map.",
492 ))
493 .extend());
494 };
495
496 let cursor = JsonCursor::new(ConsistentNamedCursor {
497 name: name.clone(),
498 c: checkpoint_viewed_at,
499 })
500 .encode_cursor();
501 connection.edges.push(Edge::new(
502 cursor,
503 MoveModule {
504 storage_id: self.super_.address,
505 native: native.clone(),
506 parsed: parsed.clone(),
507 checkpoint_viewed_at,
508 },
509 ))
510 }
511
512 if connection.edges.is_empty() {
513 Ok(None)
514 } else {
515 Ok(Some(connection))
516 }
517 }
518
519 async fn linkage(&self) -> Option<Vec<Linkage>> {
521 let linkage = self
522 .native
523 .linkage_table()
524 .iter()
525 .map(|(&runtime_id, upgrade_info)| Linkage {
526 original_id: runtime_id.into(),
527 upgraded_id: upgrade_info.upgraded_id.into(),
528 version: upgrade_info.upgraded_version.value().into(),
529 })
530 .collect();
531
532 Some(linkage)
533 }
534
535 async fn type_origins(&self) -> Option<Vec<TypeOrigin>> {
537 let type_origins = self
538 .native
539 .type_origin_table()
540 .iter()
541 .map(|origin| TypeOrigin {
542 module: origin.module_name.clone(),
543 struct_: origin.datatype_name.clone(),
544 defining_id: origin.package.into(),
545 })
546 .collect();
547
548 Some(type_origins)
549 }
550
551 async fn package_bcs(&self) -> Result<Option<Base64>> {
553 let bcs = bcs::to_bytes(&self.native)
554 .map_err(|_| {
555 Error::Internal(format!("Failed to serialize package {}", self.native.id()))
556 })
557 .extend()?;
558
559 Ok(Some(bcs.into()))
560 }
561
562 async fn module_bcs(&self) -> Result<Option<Base64>> {
565 let bcs = bcs::to_bytes(self.native.serialized_module_map())
566 .map_err(|_| {
567 Error::Internal(format!("Failed to serialize package {}", self.native.id()))
568 })
569 .extend()?;
570
571 Ok(Some(bcs.into()))
572 }
573}
574
575impl MovePackage {
576 fn parsed_package(&self) -> Result<ParsedMovePackage, Error> {
577 ParsedMovePackage::read_from_package(&self.native)
578 .map_err(|e| Error::Internal(format!("Error reading package: {e}")))
579 }
580
581 fn checkpoint_viewed_at_impl(&self) -> u64 {
584 self.super_.checkpoint_viewed_at
585 }
586
587 pub(crate) fn module_impl(&self, name: &str) -> Result<Option<MoveModule>, Error> {
588 use PackageCacheError as E;
589 match (
590 self.native.serialized_module_map().get(name),
591 self.parsed_package()?.module(name),
592 ) {
593 (Some(native), Ok(parsed)) => Ok(Some(MoveModule {
594 storage_id: self.super_.address,
595 native: native.clone(),
596 parsed: parsed.clone(),
597 checkpoint_viewed_at: self.checkpoint_viewed_at_impl(),
598 })),
599
600 (None, _) | (_, Err(E::ModuleNotFound(_, _))) => Ok(None),
601 (_, Err(e)) => Err(Error::Internal(format!(
602 "Unexpected error fetching module: {e}"
603 ))),
604 }
605 }
606
607 pub(crate) fn by_id_at(checkpoint_viewed_at: u64) -> PackageLookup {
609 PackageLookup::ById {
610 checkpoint_viewed_at,
611 }
612 }
613
614 pub(crate) fn by_version(version: u64, checkpoint_viewed_at: u64) -> PackageLookup {
618 PackageLookup::Versioned {
619 version,
620 checkpoint_viewed_at,
621 }
622 }
623
624 pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> PackageLookup {
627 PackageLookup::Latest {
628 checkpoint_viewed_at,
629 }
630 }
631
632 pub(crate) async fn query(
633 ctx: &Context<'_>,
634 address: SuiAddress,
635 key: PackageLookup,
636 ) -> Result<Option<Self>, Error> {
637 let (address, key) = match key {
638 PackageLookup::ById {
639 checkpoint_viewed_at,
640 } => (address, Object::latest_at(checkpoint_viewed_at)),
641
642 PackageLookup::Versioned {
643 version,
644 checkpoint_viewed_at,
645 } => {
646 if is_system_package(address) {
647 (address, Object::at_version(version, checkpoint_viewed_at))
648 } else {
649 let DataLoader(loader) = &ctx.data_unchecked();
650 let Some(translation) = loader
651 .load_one(PackageVersionKey { address, version })
652 .await?
653 else {
654 return Ok(None);
655 };
656
657 (translation, Object::latest_at(checkpoint_viewed_at))
658 }
659 }
660
661 PackageLookup::Latest {
662 checkpoint_viewed_at,
663 } => {
664 if is_system_package(address) {
665 (address, Object::latest_at(checkpoint_viewed_at))
666 } else {
667 let DataLoader(loader) = &ctx.data_unchecked();
668 let Some(translation) = loader
669 .load_one(LatestKey {
670 address,
671 checkpoint_viewed_at,
672 })
673 .await?
674 else {
675 return Ok(None);
676 };
677
678 (translation, Object::latest_at(checkpoint_viewed_at))
679 }
680 }
681 };
682
683 let Some(object) = Object::query(ctx, address, key).await? else {
684 return Ok(None);
685 };
686
687 Ok(Some(MovePackage::try_from(&object).map_err(|_| {
688 Error::Internal(format!("{address} is not a package"))
689 })?))
690 }
691
692 pub(crate) async fn paginate_by_checkpoint(
704 db: &Db,
705 page: Page<Cursor>,
706 filter: Option<MovePackageCheckpointFilter>,
707 checkpoint_viewed_at: u64,
708 ) -> Result<Connection<String, MovePackage>, Error> {
709 let cursor_viewed_at = page.validate_cursor_consistency()?;
710 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
711
712 let after_checkpoint: Option<u64> = filter
713 .as_ref()
714 .and_then(|f| f.after_checkpoint)
715 .map(|v| v.into());
716
717 let before_checkpoint = filter
719 .as_ref()
720 .and_then(|f| f.before_checkpoint)
721 .map(|v| v.into())
722 .unwrap_or(u64::MAX)
723 .min(checkpoint_viewed_at + 1);
724
725 let (prev, next, results) = db
726 .execute(move |conn| {
727 async move {
728 let mut q = query!(
729 r#"
730 SELECT
731 p.checkpoint_sequence_number,
732 p.original_id,
733 o.*
734 FROM
735 packages p
736 INNER JOIN
737 full_objects_history o
738 ON
739 p.package_id = o.object_id
740 AND p.package_version = o.object_version
741 "#
742 );
743
744 q = filter!(
745 q,
746 format!("p.checkpoint_sequence_number < {before_checkpoint}")
747 );
748 if let Some(after) = after_checkpoint {
749 q = filter!(q, format!("{after} < p.checkpoint_sequence_number"));
750 }
751
752 page.paginate_raw_query::<StoredHistoryPackage>(conn, checkpoint_viewed_at, q)
753 .await
754 }
755 .scope_boxed()
756 })
757 .await?;
758
759 let mut conn = Connection::new(prev, next);
760
761 for stored in results {
763 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
764 let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?;
765 conn.edges.push(Edge::new(cursor, package));
766 }
767
768 Ok(conn)
769 }
770
771 pub(crate) async fn paginate_by_version(
785 db: &Db,
786 page: Page<Cursor>,
787 package: SuiAddress,
788 filter: Option<MovePackageVersionFilter>,
789 checkpoint_viewed_at: u64,
790 ) -> Result<Connection<String, MovePackage>, Error> {
791 let cursor_viewed_at = page.validate_cursor_consistency()?;
792 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
793 let (prev, next, results) = db
794 .execute(move |conn| {
795 async move {
796 page.paginate_raw_query::<StoredHistoryPackage>(
797 conn,
798 checkpoint_viewed_at,
799 if is_system_package(package) {
800 system_package_version_query(package, filter)
801 } else {
802 user_package_version_query(package, filter)
803 },
804 )
805 .await
806 }
807 .scope_boxed()
808 })
809 .await?;
810
811 let mut conn = Connection::new(prev, next);
812
813 for stored in results {
815 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
816 let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?;
817 conn.edges.push(Edge::new(cursor, package));
818 }
819
820 Ok(conn)
821 }
822
823 pub(crate) fn try_from_serialized(
827 history_object: StoredFullHistoryObject,
828 checkpoint_viewed_at: u64,
829 ) -> Result<Self, Error> {
830 let object = Object::new_serialized(
831 SuiAddress::from_bytes(&history_object.object_id)
832 .map_err(|_| Error::Internal("Invalid package ID".to_string()))?,
833 history_object.object_version as u64,
834 history_object.serialized_object,
835 checkpoint_viewed_at,
836 history_object.object_version as u64,
837 )
838 .ok_or_else(|| Error::Internal("Not a package!".to_string()))?;
839
840 Self::try_from(&object).map_err(|_| Error::Internal("Not a package!".to_string()))
841 }
842}
843
844impl Checkpointed for Cursor {
845 fn checkpoint_viewed_at(&self) -> u64 {
846 self.checkpoint_viewed_at
847 }
848}
849
850impl RawPaginated<Cursor> for StoredHistoryPackage {
851 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
852 filter!(
853 query,
854 format!(
855 "(p.checkpoint_sequence_number > {cp} OR (\
856 p.checkpoint_sequence_number = {cp} AND \
857 (original_id > '\\x{id}'::bytea OR (\
858 original_id = '\\x{id}'::bytea AND \
859 object_version >= {pv}\
860 ))))",
861 cp = cursor.checkpoint_sequence_number,
862 id = hex::encode(&cursor.original_id),
863 pv = cursor.package_version,
864 )
865 )
866 }
867
868 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
869 filter!(
870 query,
871 format!(
872 "(p.checkpoint_sequence_number < {cp} OR (\
873 p.checkpoint_sequence_number = {cp} AND \
874 (original_id < '\\x{id}'::bytea OR (\
875 original_id = '\\x{id}'::bytea AND \
876 object_version <= {pv}\
877 ))))",
878 cp = cursor.checkpoint_sequence_number,
879 id = hex::encode(&cursor.original_id),
880 pv = cursor.package_version,
881 )
882 )
883 }
884
885 fn order(asc: bool, query: RawQuery) -> RawQuery {
886 if asc {
887 query
888 .order_by("1 ASC") .order_by("2 ASC") .order_by("object_version ASC")
891 } else {
892 query
893 .order_by("1 DESC") .order_by("2 DESC") .order_by("object_version DESC")
896 }
897 }
898}
899
900impl Target<Cursor> for StoredHistoryPackage {
901 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
902 Cursor::new(PackageCursor {
903 checkpoint_sequence_number: self.checkpoint_sequence_number as u64,
904 original_id: self.original_id.clone(),
905 package_version: self.object.object_version as u64,
906 checkpoint_viewed_at,
907 })
908 }
909}
910
911impl ScanLimited for BcsCursor<PackageCursor> {}
912
913#[async_trait::async_trait]
914impl Loader<PackageVersionKey> for Db {
915 type Value = SuiAddress;
916 type Error = Error;
917
918 async fn load(
919 &self,
920 keys: &[PackageVersionKey],
921 ) -> Result<HashMap<PackageVersionKey, SuiAddress>, Error> {
922 use packages::dsl;
923 let other = diesel::alias!(packages as other);
924
925 let id_versions: BTreeSet<_> = keys
926 .iter()
927 .map(|k| (k.address.into_vec(), k.version as i64))
928 .collect();
929
930 let stored_packages: Vec<(Vec<u8>, i64, Vec<u8>)> = self
931 .execute(move |conn| {
932 async move {
933 conn.results(|| {
934 let mut query = dsl::packages
935 .inner_join(
936 other.on(dsl::original_id.eq(other.field(dsl::original_id))),
937 )
938 .select((
939 dsl::package_id,
940 other.field(dsl::package_version),
941 other.field(dsl::package_id),
942 ))
943 .into_boxed();
944
945 for (id, version) in id_versions.iter().cloned() {
946 query = query.or_filter(
947 dsl::package_id
948 .eq(id)
949 .and(other.field(dsl::package_version).eq(version)),
950 );
951 }
952
953 query
954 })
955 .await
956 }
957 .scope_boxed()
958 })
959 .await
960 .map_err(|e| Error::Internal(format!("Failed to load packages: {e}")))?;
961
962 let mut result = HashMap::new();
963 for (id, version, other_id) in stored_packages {
964 result.insert(
965 PackageVersionKey {
966 address: addr(&id)?,
967 version: version as u64,
968 },
969 addr(&other_id)?,
970 );
971 }
972
973 Ok(result)
974 }
975}
976
977#[async_trait::async_trait]
978impl Loader<LatestKey> for Db {
979 type Value = SuiAddress;
980 type Error = Error;
981
982 async fn load(&self, keys: &[LatestKey]) -> Result<HashMap<LatestKey, SuiAddress>, Error> {
983 use packages::dsl;
984 let other = diesel::alias!(packages as other);
985
986 let mut ids_by_cursor: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
987 for key in keys {
988 ids_by_cursor
989 .entry(key.checkpoint_viewed_at)
990 .or_default()
991 .insert(key.address.into_vec());
992 }
993
994 let futures = ids_by_cursor
996 .into_iter()
997 .map(|(checkpoint_viewed_at, ids)| {
998 self.execute(move |conn| {
999 async move {
1000 let results: Vec<(Vec<u8>, Vec<u8>)> = conn
1001 .results(|| {
1002 let o_original_id = other.field(dsl::original_id);
1003 let o_package_id = other.field(dsl::package_id);
1004 let o_cp_seq_num = other.field(dsl::checkpoint_sequence_number);
1005 let o_version = other.field(dsl::package_version);
1006
1007 let query = dsl::packages
1008 .inner_join(other.on(dsl::original_id.eq(o_original_id)))
1009 .select((dsl::package_id, o_package_id))
1010 .filter(dsl::package_id.eq_any(ids.iter().cloned()))
1011 .filter(o_cp_seq_num.le(checkpoint_viewed_at as i64))
1012 .order_by((dsl::package_id, dsl::original_id, o_version.desc()))
1013 .distinct_on((dsl::package_id, dsl::original_id));
1014 query
1015 })
1016 .await?;
1017
1018 Ok::<_, diesel::result::Error>(
1019 results
1020 .into_iter()
1021 .map(|(p, latest)| (checkpoint_viewed_at, p, latest))
1022 .collect::<Vec<_>>(),
1023 )
1024 }
1025 .scope_boxed()
1026 })
1027 });
1028
1029 let groups = futures::future::join_all(futures).await;
1031
1032 let mut results = HashMap::new();
1033 for group in groups {
1034 for (checkpoint_viewed_at, address, latest) in
1035 group.map_err(|e| Error::Internal(format!("Failed to fetch packages: {e}")))?
1036 {
1037 results.insert(
1038 LatestKey {
1039 address: addr(&address)?,
1040 checkpoint_viewed_at,
1041 },
1042 addr(&latest)?,
1043 );
1044 }
1045 }
1046
1047 Ok(results)
1048 }
1049}
1050
1051impl TryFrom<&Object> for MovePackage {
1052 type Error = MovePackageDowncastError;
1053
1054 fn try_from(object: &Object) -> Result<Self, MovePackageDowncastError> {
1055 let Some(native) = object.native_impl() else {
1056 return Err(MovePackageDowncastError);
1057 };
1058
1059 if let Data::Package(move_package) = &native.data {
1060 Ok(Self {
1061 super_: object.clone(),
1062 native: move_package.clone(),
1063 })
1064 } else {
1065 Err(MovePackageDowncastError)
1066 }
1067 }
1068}
1069
1070fn system_package_version_query(
1077 package: SuiAddress,
1078 filter: Option<MovePackageVersionFilter>,
1079) -> RawQuery {
1080 let mut q = query!(
1082 r#"
1083 SELECT
1084 p.checkpoint_sequence_number,
1085 p.original_id,
1086 o.*
1087 FROM (
1088 SELECT
1089 object_id AS package_id,
1090 object_id AS original_id,
1091 object_version AS package_version,
1092 cp_sequence_number AS checkpoint_sequence_number
1093 FROM
1094 objects_version
1095 ) p
1096 LEFT JOIN
1097 full_objects_history o
1098 ON
1099 p.package_id = o.object_id
1100 AND p.package_version = o.object_version
1101 "#
1102 );
1103
1104 q = filter!(
1105 q,
1106 format!(
1107 "original_id = '\\x{}'::bytea",
1108 hex::encode(package.into_vec())
1109 )
1110 );
1111
1112 if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1113 let a: u64 = after.into();
1114 q = filter!(q, format!("object_version > {a}"));
1115 }
1116
1117 if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1118 let b: u64 = before.into();
1119 q = filter!(q, format!("object_version < {b}"));
1120 }
1121
1122 q
1123}
1124
1125fn user_package_version_query(
1128 package: SuiAddress,
1129 filter: Option<MovePackageVersionFilter>,
1130) -> RawQuery {
1131 let mut q = query!(
1132 r#"
1133 SELECT
1134 p.checkpoint_sequence_number,
1135 p.original_id,
1136 o.*
1137 FROM
1138 packages q
1139 INNER JOIN
1140 packages p
1141 USING (original_id)
1142 INNER JOIN
1143 full_objects_history o
1144 ON
1145 p.package_id = o.object_id
1146 AND p.package_version = o.object_version
1147 "#
1148 );
1149
1150 q = filter!(
1151 q,
1152 format!(
1153 "q.package_id = '\\x{}'::bytea",
1154 hex::encode(package.into_vec())
1155 )
1156 );
1157
1158 if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
1159 let a: u64 = after.into();
1160 q = filter!(q, format!("p.package_version > {a}"));
1161 }
1162
1163 if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
1164 let b: u64 = before.into();
1165 q = filter!(q, format!("p.package_version < {b}"));
1166 }
1167
1168 q
1169}