1use anyhow::Context as _;
5use anyhow::Result;
6use anyhow::anyhow;
7use diesel::{
8 ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, QueryDsl,
9 SelectableHelper, TextExpressionMethods, dsl::sql, sql_types::Bool,
10};
11use itertools::Itertools;
12use std::sync::Arc;
13use sui_types::dynamic_field::visitor as DFV;
14use sui_types::object::bounded_visitor::BoundedVisitor;
15use tap::{Pipe, TapFallible};
16use tracing::{debug, error, warn};
17
18use fastcrypto::encoding::Encoding;
19use fastcrypto::encoding::Hex;
20use move_core_types::language_storage::{StructTag, TypeTag};
21use sui_json_rpc_types::{Balance, Coin as SuiCoin, SuiCoinMetadata, SuiMoveValue};
22use sui_json_rpc_types::{
23 CheckpointId, EpochInfo, EventFilter, SuiEvent, SuiObjectDataFilter,
24 SuiTransactionBlockResponse, TransactionFilter,
25};
26use sui_package_resolver::Package;
27use sui_package_resolver::PackageStore;
28use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
29use sui_types::effects::TransactionEvents;
30use sui_types::{balance::Supply, coin::TreasuryCap, dynamic_field::DynamicFieldName};
31use sui_types::{
32 base_types::{ObjectID, SuiAddress, VersionNumber},
33 committee::EpochId,
34 digests::TransactionDigest,
35 dynamic_field::DynamicFieldInfo,
36 object::{Object, ObjectRead},
37 sui_system_state::{SuiSystemStateTrait, sui_system_state_summary::SuiSystemStateSummary},
38};
39use sui_types::{coin::CoinMetadata, event::EventID};
40
41use crate::database::ConnectionPool;
42use crate::db::ConnectionPoolConfig;
43use crate::models::objects::StoredHistoryObject;
44use crate::models::objects::StoredObjectSnapshot;
45use crate::models::transactions::{StoredTransactionEvents, stored_events_to_events};
46use crate::schema::objects_history;
47use crate::schema::objects_snapshot;
48use crate::schema::pruner_cp_watermark;
49use crate::schema::tx_digests;
50use crate::{
51 errors::IndexerError,
52 models::{
53 checkpoints::StoredCheckpoint,
54 epoch::StoredEpochInfo,
55 events::StoredEvent,
56 objects::{CoinBalance, StoredObject},
57 transactions::{StoredTransaction, tx_events_to_sui_tx_events},
58 tx_indices::TxSequenceNumber,
59 },
60 schema::{checkpoints, epochs, events, objects, transactions},
61 store::package_resolver::IndexerStorePackageResolver,
62 types::{IndexerResult, OwnerType},
63};
64
65pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
66pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
67pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
68
69#[derive(Clone)]
70pub struct IndexerReader {
71 pool: ConnectionPool,
72 package_resolver: PackageResolver,
73}
74
75pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
76
77impl IndexerReader {
79 pub fn new(pool: ConnectionPool) -> Self {
80 let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
81 let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
82 let package_resolver = Arc::new(Resolver::new(package_cache));
83 Self {
84 pool,
85 package_resolver,
86 }
87 }
88
89 pub async fn new_with_config<T: Into<String>>(
90 db_url: T,
91 config: ConnectionPoolConfig,
92 ) -> Result<Self> {
93 let db_url = db_url.into();
94
95 let pool = ConnectionPool::new(db_url.parse()?, config).await?;
96
97 let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
98 let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
99 let package_resolver = Arc::new(Resolver::new(package_cache));
100 Ok(Self {
101 pool,
102 package_resolver,
103 })
104 }
105
106 pub fn pool(&self) -> &ConnectionPool {
107 &self.pool
108 }
109}
110
111impl IndexerReader {
113 async fn get_object_from_db(
114 &self,
115 object_id: &ObjectID,
116 version: Option<VersionNumber>,
117 ) -> Result<Option<StoredObject>, IndexerError> {
118 use diesel_async::RunQueryDsl;
119
120 let mut connection = self.pool.get().await?;
121
122 let mut query = objects::table
123 .filter(objects::object_id.eq(object_id.to_vec()))
124 .into_boxed();
125 if let Some(version) = version {
126 query = query.filter(objects::object_version.eq(version.value() as i64))
127 }
128
129 query
130 .first::<StoredObject>(&mut connection)
131 .await
132 .optional()
133 .map_err(Into::into)
134 }
135
136 pub async fn get_object(
137 &self,
138 object_id: &ObjectID,
139 version: Option<VersionNumber>,
140 ) -> Result<Option<Object>, IndexerError> {
141 let Some(stored_package) = self.get_object_from_db(object_id, version).await? else {
142 return Ok(None);
143 };
144
145 let object = stored_package.try_into()?;
146 Ok(Some(object))
147 }
148
149 pub async fn get_object_read(&self, object_id: ObjectID) -> Result<ObjectRead, IndexerError> {
150 use diesel_async::RunQueryDsl;
151
152 let mut connection = self.pool.get().await?;
153
154 let stored_object = objects::table
155 .filter(objects::object_id.eq(object_id.to_vec()))
156 .first::<StoredObject>(&mut connection)
157 .await
158 .optional()?;
159
160 if let Some(object) = stored_object {
161 object
162 .try_into_object_read(self.package_resolver.clone())
163 .await
164 } else {
165 Ok(ObjectRead::NotExists(object_id))
166 }
167 }
168
169 pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
170 let store = self.package_resolver.package_store();
171 let pkg = store
172 .fetch(package_id.into())
173 .await
174 .map_err(|e| {
175 IndexerError::PostgresReadError(format!(
176 "Fail to fetch package from package store with error {:?}",
177 e
178 ))
179 })?
180 .as_ref()
181 .clone();
182 Ok(pkg)
183 }
184
185 async fn get_epoch_info_from_db(
186 &self,
187 epoch: Option<EpochId>,
188 ) -> Result<Option<StoredEpochInfo>, IndexerError> {
189 use diesel_async::RunQueryDsl;
190
191 let mut connection = self.pool.get().await?;
192
193 let stored_epoch = epochs::table
194 .into_boxed()
195 .pipe(|query| {
196 if let Some(epoch) = epoch {
197 query.filter(epochs::epoch.eq(epoch as i64))
198 } else {
199 query.order_by(epochs::epoch.desc())
200 }
201 })
202 .first::<StoredEpochInfo>(&mut connection)
203 .await
204 .optional()?;
205
206 Ok(stored_epoch)
207 }
208
209 pub async fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
210 use diesel_async::RunQueryDsl;
211
212 let mut connection = self.pool.get().await?;
213
214 let stored_epoch = epochs::table
215 .order_by(epochs::epoch.desc())
216 .first::<StoredEpochInfo>(&mut connection)
217 .await?;
218
219 Ok(stored_epoch)
220 }
221
222 pub async fn get_epoch_info(
223 &self,
224 epoch: Option<EpochId>,
225 ) -> Result<Option<EpochInfo>, IndexerError> {
226 let stored_epoch = self.get_epoch_info_from_db(epoch).await?;
227
228 let stored_epoch = match stored_epoch {
229 Some(stored_epoch) => stored_epoch,
230 None => return Ok(None),
231 };
232
233 let epoch_info = EpochInfo::try_from(stored_epoch)?;
234 Ok(Some(epoch_info))
235 }
236
237 async fn get_epochs_from_db(
238 &self,
239 cursor: Option<u64>,
240 limit: usize,
241 descending_order: bool,
242 ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
243 use diesel_async::RunQueryDsl;
244
245 let mut connection = self.pool.get().await?;
246
247 let mut query = epochs::table.into_boxed();
248
249 if let Some(cursor) = cursor {
250 if descending_order {
251 query = query.filter(epochs::epoch.lt(cursor as i64));
252 } else {
253 query = query.filter(epochs::epoch.gt(cursor as i64));
254 }
255 }
256
257 if descending_order {
258 query = query.order_by(epochs::epoch.desc());
259 } else {
260 query = query.order_by(epochs::epoch.asc());
261 }
262
263 query
264 .limit(limit as i64)
265 .load(&mut connection)
266 .await
267 .map_err(Into::into)
268 }
269
270 pub async fn get_epochs(
271 &self,
272 cursor: Option<u64>,
273 limit: usize,
274 descending_order: bool,
275 ) -> Result<Vec<EpochInfo>, IndexerError> {
276 self.get_epochs_from_db(cursor, limit, descending_order)
277 .await?
278 .into_iter()
279 .map(EpochInfo::try_from)
280 .collect::<Result<Vec<_>, _>>()
281 }
282
283 pub async fn get_latest_sui_system_state(&self) -> Result<SuiSystemStateSummary, IndexerError> {
284 let object_store = ConnectionAsObjectStore::from_pool(&self.pool)
285 .await
286 .map_err(|e| IndexerError::PgPoolConnectionError(e.to_string()))?;
287
288 let system_state = tokio::task::spawn_blocking(move || {
289 sui_types::sui_system_state::get_sui_system_state(&object_store)
290 })
291 .await
292 .unwrap()?
293 .into_sui_system_state_summary();
294
295 Ok(system_state)
296 }
297
298 pub async fn get_validator_from_table(
299 &self,
300 table_id: ObjectID,
301 pool_id: sui_types::id::ID,
302 ) -> Result<
303 sui_types::sui_system_state::sui_system_state_summary::SuiValidatorSummary,
304 IndexerError,
305 > {
306 let object_store = ConnectionAsObjectStore::from_pool(&self.pool)
307 .await
308 .map_err(|e| IndexerError::PgPoolConnectionError(e.to_string()))?;
309
310 let validator = tokio::task::spawn_blocking(move || {
311 sui_types::sui_system_state::get_validator_from_table(&object_store, table_id, &pool_id)
312 })
313 .await
314 .unwrap()?;
315 Ok(validator)
316 }
317
318 pub async fn get_epoch_sui_system_state(
324 &self,
325 epoch: Option<EpochId>,
326 ) -> Result<SuiSystemStateSummary, IndexerError> {
327 let stored_epoch = self.get_epoch_info_from_db(epoch).await?;
328 let stored_epoch = match stored_epoch {
329 Some(stored_epoch) => stored_epoch,
330 None => return Err(IndexerError::InvalidArgumentError("Invalid epoch".into())),
331 };
332 stored_epoch.get_json_system_state_summary()
333 }
334
335 async fn get_checkpoint_from_db(
336 &self,
337 checkpoint_id: CheckpointId,
338 ) -> Result<Option<StoredCheckpoint>, IndexerError> {
339 use diesel_async::RunQueryDsl;
340
341 let mut connection = self.pool.get().await?;
342 let stored_checkpoint = checkpoints::table
343 .into_boxed()
344 .pipe(|query| match checkpoint_id {
345 CheckpointId::SequenceNumber(seq) => {
346 query.filter(checkpoints::sequence_number.eq(seq as i64))
347 }
348 CheckpointId::Digest(digest) => {
349 query.filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
350 }
351 })
352 .first::<StoredCheckpoint>(&mut connection)
353 .await
354 .optional()?;
355
356 Ok(stored_checkpoint)
357 }
358
359 async fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
360 use diesel_async::RunQueryDsl;
361
362 let mut connection = self.pool.get().await?;
363
364 checkpoints::table
365 .order_by(checkpoints::sequence_number.desc())
366 .first::<StoredCheckpoint>(&mut connection)
367 .await
368 .map_err(Into::into)
369 }
370
371 pub async fn get_checkpoint(
372 &self,
373 checkpoint_id: CheckpointId,
374 ) -> Result<Option<sui_json_rpc_types::Checkpoint>, IndexerError> {
375 let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id).await? {
376 Some(stored_checkpoint) => stored_checkpoint,
377 None => return Ok(None),
378 };
379
380 let checkpoint = sui_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
381 Ok(Some(checkpoint))
382 }
383
384 pub async fn get_latest_checkpoint(
385 &self,
386 ) -> Result<sui_json_rpc_types::Checkpoint, IndexerError> {
387 let stored_checkpoint = self.get_latest_checkpoint_from_db().await?;
388
389 sui_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
390 }
391
392 async fn get_checkpoints_from_db(
393 &self,
394 cursor: Option<u64>,
395 limit: usize,
396 descending_order: bool,
397 ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
398 use diesel_async::RunQueryDsl;
399
400 let mut connection = self.pool.get().await?;
401
402 let mut query = checkpoints::table.into_boxed();
403 if let Some(cursor) = cursor {
404 if descending_order {
405 query = query.filter(checkpoints::sequence_number.lt(cursor as i64));
406 } else {
407 query = query.filter(checkpoints::sequence_number.gt(cursor as i64));
408 }
409 }
410 if descending_order {
411 query = query.order_by(checkpoints::sequence_number.desc());
412 } else {
413 query = query.order_by(checkpoints::sequence_number.asc());
414 }
415
416 query
417 .limit(limit as i64)
418 .load::<StoredCheckpoint>(&mut connection)
419 .await
420 .map_err(Into::into)
421 }
422
423 pub async fn get_checkpoints(
424 &self,
425 cursor: Option<u64>,
426 limit: usize,
427 descending_order: bool,
428 ) -> Result<Vec<sui_json_rpc_types::Checkpoint>, IndexerError> {
429 self.get_checkpoints_from_db(cursor, limit, descending_order)
430 .await?
431 .into_iter()
432 .map(sui_json_rpc_types::Checkpoint::try_from)
433 .collect()
434 }
435
436 async fn multi_get_transactions(
437 &self,
438 digests: &[TransactionDigest],
439 ) -> Result<Vec<StoredTransaction>, IndexerError> {
440 use diesel_async::RunQueryDsl;
441
442 let mut connection = self.pool.get().await?;
443
444 let digests = digests
445 .iter()
446 .map(|digest| digest.inner().to_vec())
447 .collect::<Vec<_>>();
448
449 transactions::table
450 .inner_join(
451 tx_digests::table
452 .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
453 )
454 .filter(tx_digests::tx_digest.eq_any(digests))
455 .select(StoredTransaction::as_select())
456 .load::<StoredTransaction>(&mut connection)
457 .await
458 .map_err(Into::into)
459 }
460
461 async fn stored_transaction_to_transaction_block(
462 &self,
463 stored_txes: Vec<StoredTransaction>,
464 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
465 ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
466 let mut tx_block_responses_futures = vec![];
467 for stored_tx in stored_txes {
468 let package_resolver_clone = self.package_resolver();
469 let options_clone = options.clone();
470 tx_block_responses_futures.push(tokio::task::spawn(
471 stored_tx
472 .try_into_sui_transaction_block_response(options_clone, package_resolver_clone),
473 ));
474 }
475
476 let tx_blocks = futures::future::join_all(tx_block_responses_futures)
477 .await
478 .into_iter()
479 .collect::<Result<Vec<_>, _>>()
480 .tap_err(|e| error!("Failed to join all tx block futures: {}", e))?
481 .into_iter()
482 .collect::<Result<Vec<_>, _>>()
483 .tap_err(|e| error!("Failed to collect tx block futures: {}", e))?;
484 Ok(tx_blocks)
485 }
486
487 async fn multi_get_transactions_with_sequence_numbers(
488 &self,
489 tx_sequence_numbers: Vec<i64>,
490 is_descending: Option<bool>,
492 ) -> Result<Vec<StoredTransaction>, IndexerError> {
493 use diesel_async::RunQueryDsl;
494
495 let mut connection = self.pool.get().await?;
496
497 let mut query = transactions::table
498 .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
499 .into_boxed();
500 match is_descending {
501 Some(true) => {
502 query = query.order(transactions::dsl::tx_sequence_number.desc());
503 }
504 Some(false) => {
505 query = query.order(transactions::dsl::tx_sequence_number.asc());
506 }
507 None => (),
508 }
509
510 query
511 .load::<StoredTransaction>(&mut connection)
512 .await
513 .map_err(Into::into)
514 }
515
516 pub async fn get_owned_objects(
517 &self,
518 address: SuiAddress,
519 filter: Option<SuiObjectDataFilter>,
520 cursor: Option<ObjectID>,
521 limit: usize,
522 ) -> Result<Vec<StoredObject>, IndexerError> {
523 use diesel_async::RunQueryDsl;
524
525 let mut connection = self.pool.get().await?;
526
527 let mut query = objects::table
528 .filter(objects::owner_type.eq(OwnerType::Address as i16))
529 .filter(objects::owner_id.eq(address.to_vec()))
530 .order(objects::object_id.asc())
531 .limit(limit as i64)
532 .into_boxed();
533 if let Some(filter) = filter {
534 match filter {
535 SuiObjectDataFilter::StructType(struct_tag) => {
536 let object_type = struct_tag.to_canonical_string(true);
537 query = query.filter(objects::object_type.like(format!("{}%", object_type)));
538 }
539 SuiObjectDataFilter::MatchAny(filters) => {
540 let mut condition = "(".to_string();
541 for (i, filter) in filters.iter().enumerate() {
542 if let SuiObjectDataFilter::StructType(struct_tag) = filter {
543 let object_type =
544 struct_tag.to_canonical_string(true);
545 if i == 0 {
546 condition +=
547 format!("objects.object_type LIKE '{}%'", object_type).as_str();
548 } else {
549 condition +=
550 format!(" OR objects.object_type LIKE '{}%'", object_type)
551 .as_str();
552 }
553 } else {
554 return Err(IndexerError::InvalidArgumentError(
555 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
556 ));
557 }
558 }
559 condition += ")";
560 query = query.filter(sql::<Bool>(&condition));
561 }
562 SuiObjectDataFilter::MatchNone(filters) => {
563 for filter in filters {
564 if let SuiObjectDataFilter::StructType(struct_tag) = filter {
565 let object_type =
566 struct_tag.to_canonical_string(true);
567 query = query
568 .filter(objects::object_type.not_like(format!("{}%", object_type)));
569 } else {
570 return Err(IndexerError::InvalidArgumentError(
571 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
572 ));
573 }
574 }
575 }
576 _ => {
577 return Err(IndexerError::InvalidArgumentError(
578 "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
579 ));
580 }
581 }
582 }
583
584 if let Some(object_cursor) = cursor {
585 query = query.filter(objects::object_id.gt(object_cursor.to_vec()));
586 }
587
588 query
589 .load::<StoredObject>(&mut connection)
590 .await
591 .map_err(|e| IndexerError::PostgresReadError(e.to_string()))
592 }
593
594 pub async fn multi_get_objects(
595 &self,
596 object_ids: Vec<ObjectID>,
597 ) -> Result<Vec<StoredObject>, IndexerError> {
598 use diesel_async::RunQueryDsl;
599
600 let mut connection = self.pool.get().await?;
601 let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
602
603 objects::table
604 .filter(objects::object_id.eq_any(object_ids))
605 .load::<StoredObject>(&mut connection)
606 .await
607 .map_err(Into::into)
608 }
609
610 async fn query_transaction_blocks_by_checkpoint(
611 &self,
612 checkpoint_seq: u64,
613 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
614 cursor_tx_seq: Option<i64>,
615 limit: usize,
616 is_descending: bool,
617 ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
618 use diesel_async::RunQueryDsl;
619
620 let mut connection = self.pool.get().await?;
621
622 let tx_range: (i64, i64) = pruner_cp_watermark::dsl::pruner_cp_watermark
623 .select((
624 pruner_cp_watermark::min_tx_sequence_number,
625 pruner_cp_watermark::max_tx_sequence_number,
626 ))
627 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
628 .first::<(i64, i64)>(&mut connection)
629 .await?;
630
631 let mut query = transactions::table
632 .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
633 .into_boxed();
634
635 if let Some(cursor_tx_seq) = cursor_tx_seq {
636 if is_descending {
637 query = query.filter(transactions::tx_sequence_number.lt(cursor_tx_seq));
638 } else {
639 query = query.filter(transactions::tx_sequence_number.gt(cursor_tx_seq));
640 }
641 }
642 if is_descending {
643 query = query.order(transactions::tx_sequence_number.desc());
644 } else {
645 query = query.order(transactions::tx_sequence_number.asc());
646 }
647 let stored_txes = query
648 .limit(limit as i64)
649 .load::<StoredTransaction>(&mut connection)
650 .await?;
651 self.stored_transaction_to_transaction_block(stored_txes, options)
652 .await
653 }
654
655 pub async fn query_transaction_blocks(
656 &self,
657 filter: Option<TransactionFilter>,
658 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
659 cursor: Option<TransactionDigest>,
660 limit: usize,
661 is_descending: bool,
662 ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
663 use diesel_async::RunQueryDsl;
664
665 let mut connection = self.pool.get().await?;
666
667 let cursor_tx_seq = if let Some(cursor) = cursor {
668 let tx_seq = tx_digests::table
669 .select(tx_digests::tx_sequence_number)
670 .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
671 .first::<i64>(&mut connection)
672 .await?;
673 Some(tx_seq)
674 } else {
675 None
676 };
677 let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
678 if is_descending {
679 format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
680 } else {
681 format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
682 }
683 } else {
684 "".to_string()
685 };
686 let order_str = if is_descending { "DESC" } else { "ASC" };
687 let (table_name, main_where_clause) = match filter {
688 Some(TransactionFilter::Checkpoint(seq)) => {
690 return self
691 .query_transaction_blocks_by_checkpoint(
692 seq,
693 options,
694 cursor_tx_seq,
695 limit,
696 is_descending,
697 )
698 .await
699 }
700 Some(TransactionFilter::MoveFunction {
702 package,
703 module,
704 function,
705 }) => {
706 let package = Hex::encode(package.to_vec());
707 match (module, function) {
708 (Some(module), Some(function)) => (
709 "tx_calls_fun".to_owned(),
710 format!(
711 "package = '\\x{package}'::bytea AND module = '{module}' AND func = '{function}'",
712 ),
713 ),
714 (Some(module), None) => (
715 "tx_calls_mod".to_owned(),
716 format!(
717 "package = '\\x{package}'::bytea AND module = '{module}'",
718 ),
719 ),
720 (None, Some(_)) => {
721 return Err(IndexerError::InvalidArgumentError(
722 "Function cannot be present without Module.".into(),
723 ));
724 }
725 (None, None) => (
726 "tx_calls_pkg".to_owned(),
727 format!("package = '\\x{package}'::bytea"),
728 ),
729 }
730 }
731 Some(TransactionFilter::AffectedObject(object_id)) => {
732 let object_id = Hex::encode(object_id.to_vec());
733 (
734 "tx_affected_objects".to_owned(),
735 format!("affected = '\\x{object_id}'::bytea"),
736 )
737 }
738 Some(TransactionFilter::FromAddress(from_address)) => {
739 let from_address = Hex::encode(from_address.to_vec());
740 (
741 "tx_affected_addresses".to_owned(),
742 format!("sender = '\\x{from_address}'::bytea AND affected = '\\x{from_address}'::bytea"),
743 )
744 }
745 Some(TransactionFilter::FromAndToAddress { from, to }) => {
746 let from_address = Hex::encode(from.to_vec());
747 let to_address = Hex::encode(to.to_vec());
748 (
749 "tx_affected_addresses".to_owned(),
750 format!("sender = '\\x{from_address}'::bytea AND affected = '\\x{to_address}'::bytea"),
751 )
752 }
753 Some(TransactionFilter::FromOrToAddress { addr }) => {
754 let address = Hex::encode(addr.to_vec());
755 (
756 "tx_affected_addresses".to_owned(),
757 format!("affected = '\\x{address}'::bytea"),
758 )
759 }
760 Some(
761 TransactionFilter::TransactionKind(_) | TransactionFilter::TransactionKindIn(_),
762 ) => {
763 return Err(IndexerError::NotSupportedError(
764 "TransactionKind filter is not supported.".into(),
765 ));
766 }
767 Some(TransactionFilter::InputObject(_) | TransactionFilter::ChangedObject(_)) => {
768 return Err(IndexerError::NotSupportedError(
769 "InputObject and OutputObject filters are not supported, please use AffectedObject instead.".into()
770 ))
771 }
772 Some(TransactionFilter::ToAddress(_)) => {
773 return Err(IndexerError::NotSupportedError(
774 "ToAddress filter is not supported, please use FromOrToAddress instead.".into()
775 ))
776 }
777 None => {
778 ("transactions".to_owned(), "1 = 1".into())
780 }
781 };
782
783 let query = format!(
784 "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
785 table_name, main_where_clause, cursor_clause, order_str, limit,
786 );
787
788 debug!("query transaction blocks: {}", query);
789 let tx_sequence_numbers = diesel::sql_query(query.clone())
790 .load::<TxSequenceNumber>(&mut connection)
791 .await?
792 .into_iter()
793 .map(|tsn| tsn.tx_sequence_number)
794 .collect::<Vec<i64>>();
795 self.multi_get_transaction_block_response_by_sequence_numbers(
796 tx_sequence_numbers,
797 options,
798 Some(is_descending),
799 )
800 .await
801 }
802
803 async fn multi_get_transaction_block_response_in_blocking_task_impl(
804 &self,
805 digests: &[TransactionDigest],
806 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
807 ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
808 let stored_txes = self.multi_get_transactions(digests).await?;
809 self.stored_transaction_to_transaction_block(stored_txes, options)
810 .await
811 }
812
813 async fn multi_get_transaction_block_response_by_sequence_numbers(
814 &self,
815 tx_sequence_numbers: Vec<i64>,
816 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
817 is_descending: Option<bool>,
819 ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
820 let stored_txes: Vec<StoredTransaction> = self
821 .multi_get_transactions_with_sequence_numbers(tx_sequence_numbers, is_descending)
822 .await?;
823 self.stored_transaction_to_transaction_block(stored_txes, options)
824 .await
825 }
826
827 pub async fn multi_get_transaction_block_response_in_blocking_task(
828 &self,
829 digests: Vec<TransactionDigest>,
830 options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
831 ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
832 self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
833 .await
834 }
835
836 pub async fn get_transaction_events(
837 &self,
838 digest: TransactionDigest,
839 ) -> Result<Vec<sui_json_rpc_types::SuiEvent>, IndexerError> {
840 use diesel_async::RunQueryDsl;
841
842 let mut connection = self.pool.get().await?;
843
844 let (timestamp_ms, serialized_events) = transactions::table
847 .filter(
848 transactions::tx_sequence_number
849 .nullable()
850 .eq(tx_digests::table
851 .select(tx_digests::tx_sequence_number)
852 .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
853 .single_value()),
854 )
855 .select((transactions::timestamp_ms, transactions::events))
856 .first::<(i64, StoredTransactionEvents)>(&mut connection)
857 .await?;
858
859 let events = stored_events_to_events(serialized_events)?;
860 let tx_events = TransactionEvents { data: events };
861
862 let sui_tx_events = tx_events_to_sui_tx_events(
863 tx_events,
864 self.package_resolver(),
865 digest,
866 timestamp_ms as u64,
867 )
868 .await?;
869 Ok(sui_tx_events.map_or(vec![], |ste| ste.data))
870 }
871
872 async fn query_events_by_tx_digest(
873 &self,
874 tx_digest: TransactionDigest,
875 cursor: Option<EventID>,
876 cursor_tx_seq: i64,
877 limit: usize,
878 descending_order: bool,
879 ) -> IndexerResult<Vec<SuiEvent>> {
880 use diesel_async::RunQueryDsl;
881
882 let mut connection = self.pool.get().await?;
883
884 let mut query = events::table.into_boxed();
885
886 if let Some(cursor) = cursor {
887 if cursor.tx_digest != tx_digest {
888 return Err(IndexerError::InvalidArgumentError(
889 "Cursor tx_digest does not match the tx_digest in the query.".into(),
890 ));
891 }
892 if descending_order {
893 query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
894 } else {
895 query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
896 }
897 } else if descending_order {
898 query = query.filter(events::event_sequence_number.le(i64::MAX));
899 } else {
900 query = query.filter(events::event_sequence_number.ge(0));
901 };
902
903 if descending_order {
904 query = query.order(events::event_sequence_number.desc());
905 } else {
906 query = query.order(events::event_sequence_number.asc());
907 }
908
909 if cursor.is_some() {
913 query = query.filter(events::tx_sequence_number.eq(cursor_tx_seq));
914 } else {
915 query = query.filter(
916 events::tx_sequence_number.nullable().eq(tx_digests::table
917 .select(tx_digests::tx_sequence_number)
918 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
919 .single_value()),
920 );
921 }
922
923 let stored_events = query
924 .limit(limit as i64)
925 .load::<StoredEvent>(&mut connection)
926 .await?;
927
928 let mut sui_event_futures = vec![];
929 for stored_event in stored_events {
930 sui_event_futures.push(tokio::task::spawn(
931 stored_event.try_into_sui_event(self.package_resolver.clone()),
932 ));
933 }
934
935 let sui_events = futures::future::join_all(sui_event_futures)
936 .await
937 .into_iter()
938 .collect::<Result<Vec<_>, _>>()
939 .tap_err(|e| error!("Failed to join sui event futures: {}", e))?
940 .into_iter()
941 .collect::<Result<Vec<_>, _>>()
942 .tap_err(|e| error!("Failed to collect sui event futures: {}", e))?;
943 Ok(sui_events)
944 }
945
946 pub async fn query_events(
947 &self,
948 filter: EventFilter,
949 cursor: Option<EventID>,
950 limit: usize,
951 descending_order: bool,
952 ) -> IndexerResult<Vec<SuiEvent>> {
953 use diesel_async::RunQueryDsl;
954
955 let mut connection = self.pool.get().await?;
956
957 let (tx_seq, event_seq) = if let Some(cursor) = cursor {
958 let EventID {
959 tx_digest,
960 event_seq,
961 } = cursor;
962 let tx_seq = transactions::table
963 .select(transactions::tx_sequence_number)
964 .filter(
965 transactions::tx_sequence_number
966 .nullable()
967 .eq(tx_digests::table
968 .select(tx_digests::tx_sequence_number)
969 .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
970 .single_value()),
971 )
972 .first::<i64>(&mut connection)
973 .await?;
974 (tx_seq, event_seq as i64)
975 } else if descending_order {
976 (i64::MAX, i64::MAX)
977 } else {
978 (-1, 0)
979 };
980
981 let query = if let EventFilter::Sender(sender) = &filter {
982 let cursor_clause = if descending_order {
984 format!(
985 "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
986 tx_seq, tx_seq, event_seq
987 )
988 } else {
989 format!(
990 "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
991 tx_seq, tx_seq, event_seq
992 )
993 };
994 let order_clause = if descending_order {
995 format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
996 } else {
997 format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
998 };
999 format!(
1000 "( \
1001 SELECT *
1002 FROM event_senders s
1003 JOIN events e
1004 USING (tx_sequence_number, event_sequence_number)
1005 WHERE s.sender = '\\x{}'::bytea AND {} \
1006 ORDER BY {} \
1007 LIMIT {}
1008 )",
1009 Hex::encode(sender.to_vec()),
1010 cursor_clause,
1011 order_clause,
1012 limit,
1013 )
1014 } else if let EventFilter::Transaction(tx_digest) = filter {
1015 return self
1016 .query_events_by_tx_digest(tx_digest, cursor, tx_seq, limit, descending_order)
1017 .await;
1018 } else {
1019 let main_where_clause = match filter {
1020 EventFilter::All([]) => {
1021 "1 = 1".to_string()
1023 }
1024 EventFilter::MoveModule { package, module } => {
1025 format!(
1026 "package = '\\x{}'::bytea AND module = '{}'",
1027 package.to_hex(),
1028 module,
1029 )
1030 }
1031 EventFilter::MoveEventType(struct_tag) => {
1032 format!(
1033 "event_type = '{}'",
1034 struct_tag.to_canonical_display(true),
1035 )
1036 }
1037 EventFilter::MoveEventModule { package, module } => {
1038 let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1039 format!("event_type LIKE '{package_module_prefix}::%'")
1040 }
1041 EventFilter::Sender(_) => {
1042 unreachable!()
1044 }
1045 EventFilter::Transaction(_) => {
1046 unreachable!()
1048 }
1049 EventFilter::TimeRange { .. } | EventFilter::Any(_) => {
1050 return Err(IndexerError::NotSupportedError(
1051 "This type of EventFilter is not supported.".to_owned(),
1052 ));
1053 }
1054 };
1055
1056 let cursor_clause = if descending_order {
1057 format!(
1058 "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1059 tx_seq, tx_seq, event_seq
1060 )
1061 } else {
1062 format!(
1063 "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1064 tx_seq, tx_seq, event_seq
1065 )
1066 };
1067 let order_clause = if descending_order {
1068 format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1069 } else {
1070 format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1071 };
1072
1073 format!(
1074 "
1075 SELECT * FROM events \
1076 WHERE {} {} \
1077 ORDER BY {} \
1078 LIMIT {}
1079 ",
1080 main_where_clause, cursor_clause, order_clause, limit,
1081 )
1082 };
1083 debug!("query events: {}", query);
1084 let stored_events = diesel::sql_query(query)
1085 .load::<StoredEvent>(&mut connection)
1086 .await?;
1087
1088 let mut sui_event_futures = vec![];
1089 for stored_event in stored_events {
1090 sui_event_futures.push(tokio::task::spawn(
1091 stored_event.try_into_sui_event(self.package_resolver.clone()),
1092 ));
1093 }
1094
1095 let sui_events = futures::future::join_all(sui_event_futures)
1096 .await
1097 .into_iter()
1098 .collect::<Result<Vec<_>, _>>()
1099 .tap_err(|e| error!("Failed to join sui event futures: {}", e))?
1100 .into_iter()
1101 .collect::<Result<Vec<_>, _>>()
1102 .tap_err(|e| error!("Failed to collect sui event futures: {}", e))?;
1103 Ok(sui_events)
1104 }
1105
1106 pub async fn get_dynamic_fields(
1107 &self,
1108 parent_object_id: ObjectID,
1109 cursor: Option<ObjectID>,
1110 limit: usize,
1111 ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1112 let stored_objects = self
1113 .get_dynamic_fields_raw(parent_object_id, cursor, limit)
1114 .await?;
1115 let mut df_futures = vec![];
1116 let indexer_reader_arc = Arc::new(self.clone());
1117 for stored_object in stored_objects {
1118 let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1119 df_futures.push(tokio::task::spawn(async move {
1120 indexer_reader_arc_clone
1121 .try_create_dynamic_field_info(stored_object)
1122 .await
1123 }));
1124 }
1125 let df_infos = futures::future::join_all(df_futures)
1126 .await
1127 .into_iter()
1128 .collect::<Result<Vec<_>, _>>()
1129 .tap_err(|e| error!("Error joining DF futures: {:?}", e))?
1130 .into_iter()
1131 .collect::<Result<Vec<_>, _>>()
1132 .tap_err(|e| error!("Error calling try_create_dynamic_field_info: {:?}", e))?
1133 .into_iter()
1134 .flatten()
1135 .collect::<Vec<_>>();
1136 Ok(df_infos)
1137 }
1138
1139 pub async fn get_dynamic_fields_raw(
1140 &self,
1141 parent_object_id: ObjectID,
1142 cursor: Option<ObjectID>,
1143 limit: usize,
1144 ) -> Result<Vec<StoredObject>, IndexerError> {
1145 use diesel_async::RunQueryDsl;
1146
1147 let mut connection = self.pool.get().await?;
1148
1149 let mut query = objects::table
1150 .filter(objects::owner_type.eq(OwnerType::Object as i16))
1151 .filter(objects::owner_id.eq(parent_object_id.to_vec()))
1152 .order(objects::object_id.asc())
1153 .limit(limit as i64)
1154 .into_boxed();
1155
1156 if let Some(object_cursor) = cursor {
1157 query = query.filter(objects::object_id.gt(object_cursor.to_vec()));
1158 }
1159
1160 query
1161 .load::<StoredObject>(&mut connection)
1162 .await
1163 .map_err(Into::into)
1164 }
1165
1166 async fn try_create_dynamic_field_info(
1167 &self,
1168 stored_object: StoredObject,
1169 ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1170 if stored_object.df_kind.is_none() {
1171 return Ok(None);
1172 }
1173
1174 let object: Object = stored_object.try_into()?;
1175 let move_object = match object.data.try_as_move().cloned() {
1176 Some(move_object) => move_object,
1177 None => {
1178 return Err(IndexerError::ResolveMoveStructError(
1179 "Object is not a MoveObject".to_string(),
1180 ));
1181 }
1182 };
1183 let type_tag: TypeTag = move_object.type_().clone().into();
1184 let layout = self
1185 .package_resolver
1186 .type_layout(type_tag.clone())
1187 .await
1188 .map_err(|e| {
1189 IndexerError::ResolveMoveStructError(format!(
1190 "Failed to get type layout for type {}: {e}",
1191 type_tag.to_canonical_display(true),
1192 ))
1193 })?;
1194
1195 let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1196 .tap_err(|e| warn!("{e}"))
1197 .context("Failed to deserialize dynamic field")?;
1198
1199 let type_ = field.kind;
1200 let name_type: TypeTag = field.name_layout.into();
1201 let bcs_name = field.name_bytes.to_owned();
1202
1203 let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1204 .tap_err(|e| warn!("{e}"))
1205 .context("Failed to deserialize dynamic field name")?;
1206
1207 let name = DynamicFieldName {
1208 type_: name_type,
1209 value: SuiMoveValue::from(name_value).to_json_value(),
1210 };
1211
1212 let value_metadata = field.value_metadata().map_err(|e| {
1213 warn!("{e}");
1214 IndexerError::UncategorizedError(anyhow!(e))
1215 })?;
1216
1217 Ok(Some(match value_metadata {
1218 DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1219 name,
1220 bcs_name,
1221 type_,
1222 object_type: object_type.to_canonical_string(true),
1223 object_id: object.id(),
1224 version: object.version(),
1225 digest: object.digest(),
1226 },
1227
1228 DFV::ValueMetadata::DynamicObjectField(object_id) => {
1229 let object = self.get_object(&object_id, None).await?.ok_or_else(|| {
1230 IndexerError::UncategorizedError(anyhow!(
1231 "Failed to find object_id {} when trying to create dynamic field info",
1232 object_id.to_canonical_display(true),
1233 ))
1234 })?;
1235
1236 let object_type = object.data.type_().unwrap().clone();
1237 DynamicFieldInfo {
1238 name,
1239 bcs_name,
1240 type_,
1241 object_type: object_type.to_canonical_string(true),
1242 object_id,
1243 version: object.version(),
1244 digest: object.digest(),
1245 }
1246 }
1247 }))
1248 }
1249
1250 pub async fn bcs_name_from_dynamic_field_name(
1251 &self,
1252 name: &DynamicFieldName,
1253 ) -> Result<Vec<u8>, IndexerError> {
1254 let move_type_layout = self
1255 .package_resolver()
1256 .type_layout(name.type_.clone())
1257 .await
1258 .map_err(|e| {
1259 IndexerError::ResolveMoveStructError(format!(
1260 "Failed to get type layout for type {}: {}",
1261 name.type_, e
1262 ))
1263 })?;
1264 let sui_json_value = sui_json::SuiJsonValue::new(name.value.clone())?;
1265 let name_bcs_value = sui_json_value.to_bcs_bytes(&move_type_layout)?;
1266 Ok(name_bcs_value)
1267 }
1268
1269 pub async fn get_owned_coins(
1270 &self,
1271 owner: SuiAddress,
1272 coin_type: Option<String>,
1274 cursor: ObjectID,
1275 limit: usize,
1276 ) -> Result<Vec<SuiCoin>, IndexerError> {
1277 use diesel_async::RunQueryDsl;
1278
1279 let mut connection = self.pool.get().await?;
1280 let mut query = objects::dsl::objects
1281 .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1282 .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1283 .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1284 .into_boxed();
1285 if let Some(coin_type) = coin_type {
1286 query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1287 } else {
1288 query = query.filter(objects::dsl::coin_type.is_not_null());
1289 }
1290
1291 query
1292 .order((objects::dsl::coin_type.asc(), objects::dsl::object_id.asc()))
1293 .limit(limit as i64)
1294 .load::<StoredObject>(&mut connection)
1295 .await?
1296 .into_iter()
1297 .map(|o| o.try_into())
1298 .collect::<IndexerResult<Vec<_>>>()
1299 }
1300
1301 pub async fn get_coin_balances(
1302 &self,
1303 owner: SuiAddress,
1304 coin_type: Option<String>,
1306 ) -> Result<Vec<Balance>, IndexerError> {
1307 use diesel_async::RunQueryDsl;
1308
1309 let mut connection = self.pool.get().await?;
1310
1311 let coin_type_filter = if let Some(coin_type) = coin_type {
1312 format!("= '{}'", coin_type)
1313 } else {
1314 "IS NOT NULL".to_string()
1315 };
1316 let query = format!(
1318 "
1319 SELECT coin_type, \
1320 CAST(COUNT(*) AS BIGINT) AS coin_num, \
1321 CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1322 FROM objects \
1323 WHERE owner_type = {} \
1324 AND owner_id = '\\x{}'::BYTEA \
1325 AND coin_type {} \
1326 GROUP BY coin_type \
1327 ORDER BY coin_type ASC
1328 ",
1329 OwnerType::Address as i16,
1330 Hex::encode(owner.to_vec()),
1331 coin_type_filter,
1332 );
1333
1334 debug!("get coin balances query: {query}");
1335 diesel::sql_query(query)
1336 .load::<CoinBalance>(&mut connection)
1337 .await?
1338 .into_iter()
1339 .map(|cb| cb.try_into())
1340 .collect::<IndexerResult<Vec<_>>>()
1341 }
1342
1343 pub async fn get_singleton_object(&self, type_: &StructTag) -> Result<Option<Object>> {
1344 use diesel_async::RunQueryDsl;
1345
1346 let mut connection = self.pool.get().await?;
1347
1348 let object = match objects::table
1349 .filter(objects::object_type_package.eq(type_.address.to_vec()))
1350 .filter(objects::object_type_module.eq(type_.module.to_string()))
1351 .filter(objects::object_type_name.eq(type_.name.to_string()))
1352 .filter(objects::object_type.eq(type_.to_canonical_string(true)))
1353 .first::<StoredObject>(&mut connection)
1354 .await
1355 .optional()?
1356 {
1357 Some(object) => object,
1358 None => return Ok(None),
1359 }
1360 .try_into()?;
1361
1362 Ok(Some(object))
1363 }
1364
1365 pub async fn get_coin_metadata(
1366 &self,
1367 coin_struct: StructTag,
1368 ) -> Result<Option<SuiCoinMetadata>, IndexerError> {
1369 let coin_metadata_type = CoinMetadata::type_(coin_struct);
1370
1371 self.get_singleton_object(&coin_metadata_type)
1372 .await?
1373 .and_then(|o| SuiCoinMetadata::try_from(o).ok())
1374 .pipe(Ok)
1375 }
1376
1377 pub async fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
1378 let treasury_cap_type = TreasuryCap::type_(coin_struct);
1379
1380 self.get_singleton_object(&treasury_cap_type)
1381 .await?
1382 .and_then(|o| TreasuryCap::try_from(o).ok())
1383 .ok_or(IndexerError::GenericError(format!(
1384 "Cannot find treasury cap object with type {}",
1385 treasury_cap_type
1386 )))?
1387 .total_supply
1388 .pipe(Ok)
1389 }
1390
1391 pub fn package_resolver(&self) -> PackageResolver {
1392 self.package_resolver.clone()
1393 }
1394}
1395
1396struct ConnectionAsObjectStore {
1402 inner: std::sync::Mutex<
1403 diesel_async::async_connection_wrapper::AsyncConnectionWrapper<
1404 crate::database::Connection<'static>,
1405 >,
1406 >,
1407}
1408
1409impl ConnectionAsObjectStore {
1410 async fn from_pool(
1411 pool: &ConnectionPool,
1412 ) -> Result<Self, diesel_async::pooled_connection::PoolError> {
1413 let connection = std::sync::Mutex::new(pool.dedicated_connection().await?.into());
1414
1415 Ok(Self { inner: connection })
1416 }
1417
1418 fn get_object_from_objects(
1419 &self,
1420 object_id: &ObjectID,
1421 version: Option<VersionNumber>,
1422 ) -> Result<Option<StoredObject>, IndexerError> {
1423 use diesel::RunQueryDsl;
1424
1425 let mut guard = self.inner.lock().unwrap();
1426 let connection: &mut diesel_async::async_connection_wrapper::AsyncConnectionWrapper<_> =
1427 &mut guard;
1428
1429 let mut query = objects::table
1430 .filter(objects::object_id.eq(object_id.to_vec()))
1431 .into_boxed();
1432 if let Some(version) = version {
1433 query = query.filter(objects::object_version.eq(version.value() as i64))
1434 }
1435
1436 query
1437 .first::<StoredObject>(connection)
1438 .optional()
1439 .map_err(Into::into)
1440 }
1441
1442 fn get_object_from_history(
1443 &self,
1444 object_id: &ObjectID,
1445 version: Option<VersionNumber>,
1446 ) -> Result<Option<StoredObject>, IndexerError> {
1447 use diesel::RunQueryDsl;
1448
1449 let mut guard = self.inner.lock().unwrap();
1450 let connection: &mut diesel_async::async_connection_wrapper::AsyncConnectionWrapper<_> =
1451 &mut guard;
1452
1453 let mut history_query = objects_history::table
1454 .filter(objects_history::dsl::object_id.eq(object_id.to_vec()))
1455 .into_boxed();
1456
1457 if let Some(version) = version {
1458 history_query = history_query
1459 .filter(objects_history::dsl::object_version.eq(version.value() as i64));
1460 }
1461
1462 let history_latest = history_query
1463 .order_by(objects_history::dsl::object_version.desc())
1464 .first::<StoredHistoryObject>(connection)
1465 .optional()?;
1466
1467 if let Some(history_record) = history_latest {
1468 return Ok(Some(history_record.try_into()?));
1469 }
1470
1471 let mut snapshot_query = objects_snapshot::table
1472 .filter(objects_snapshot::dsl::object_id.eq(object_id.to_vec()))
1473 .into_boxed();
1474
1475 if let Some(version) = version {
1476 snapshot_query = snapshot_query
1477 .filter(objects_snapshot::dsl::object_version.eq(version.value() as i64));
1478 }
1479
1480 snapshot_query
1481 .first::<StoredObjectSnapshot>(connection)
1482 .optional()?
1483 .map(|o| o.try_into())
1484 .transpose()
1485 }
1486
1487 fn get_object(
1488 &self,
1489 object_id: &ObjectID,
1490 version: Option<VersionNumber>,
1491 ) -> Result<Option<Object>, IndexerError> {
1492 let mut result = self.get_object_from_objects(object_id, version)?;
1493
1494 if result.is_none() {
1496 result = self.get_object_from_history(object_id, version)?;
1497 }
1498
1499 result.map(|o| o.try_into()).transpose()
1500 }
1501}
1502
1503impl sui_types::storage::ObjectStore for ConnectionAsObjectStore {
1504 fn get_object(&self, object_id: &ObjectID) -> Option<sui_types::object::Object> {
1505 self.get_object(object_id, None)
1506 .expect("Failed to get object")
1507 }
1508
1509 fn get_object_by_key(
1510 &self,
1511 object_id: &ObjectID,
1512 version: sui_types::base_types::VersionNumber,
1513 ) -> Option<sui_types::object::Object> {
1514 self.get_object(object_id, Some(version))
1515 .expect("Failed to get object")
1516 }
1517}