sui_indexer/
indexer_reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use anyhow::Context as _;
5use anyhow::Result;
6use anyhow::anyhow;
7use diesel::{
8    ExpressionMethods, JoinOnDsl, NullableExpressionMethods, OptionalExtension, QueryDsl,
9    SelectableHelper, TextExpressionMethods, dsl::sql, sql_types::Bool,
10};
11use itertools::Itertools;
12use std::sync::Arc;
13use sui_types::dynamic_field::visitor as DFV;
14use sui_types::object::bounded_visitor::BoundedVisitor;
15use tap::{Pipe, TapFallible};
16use tracing::{debug, error, warn};
17
18use fastcrypto::encoding::Encoding;
19use fastcrypto::encoding::Hex;
20use move_core_types::language_storage::{StructTag, TypeTag};
21use sui_json_rpc_types::{Balance, Coin as SuiCoin, SuiCoinMetadata, SuiMoveValue};
22use sui_json_rpc_types::{
23    CheckpointId, EpochInfo, EventFilter, SuiEvent, SuiObjectDataFilter,
24    SuiTransactionBlockResponse, TransactionFilter,
25};
26use sui_package_resolver::Package;
27use sui_package_resolver::PackageStore;
28use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
29use sui_types::effects::TransactionEvents;
30use sui_types::{balance::Supply, coin::TreasuryCap, dynamic_field::DynamicFieldName};
31use sui_types::{
32    base_types::{ObjectID, SuiAddress, VersionNumber},
33    committee::EpochId,
34    digests::TransactionDigest,
35    dynamic_field::DynamicFieldInfo,
36    object::{Object, ObjectRead},
37    sui_system_state::{SuiSystemStateTrait, sui_system_state_summary::SuiSystemStateSummary},
38};
39use sui_types::{coin::CoinMetadata, event::EventID};
40
41use crate::database::ConnectionPool;
42use crate::db::ConnectionPoolConfig;
43use crate::models::objects::StoredHistoryObject;
44use crate::models::objects::StoredObjectSnapshot;
45use crate::models::transactions::{StoredTransactionEvents, stored_events_to_events};
46use crate::schema::objects_history;
47use crate::schema::objects_snapshot;
48use crate::schema::pruner_cp_watermark;
49use crate::schema::tx_digests;
50use crate::{
51    errors::IndexerError,
52    models::{
53        checkpoints::StoredCheckpoint,
54        epoch::StoredEpochInfo,
55        events::StoredEvent,
56        objects::{CoinBalance, StoredObject},
57        transactions::{StoredTransaction, tx_events_to_sui_tx_events},
58        tx_indices::TxSequenceNumber,
59    },
60    schema::{checkpoints, epochs, events, objects, transactions},
61    store::package_resolver::IndexerStorePackageResolver,
62    types::{IndexerResult, OwnerType},
63};
64
65pub const TX_SEQUENCE_NUMBER_STR: &str = "tx_sequence_number";
66pub const TRANSACTION_DIGEST_STR: &str = "transaction_digest";
67pub const EVENT_SEQUENCE_NUMBER_STR: &str = "event_sequence_number";
68
69#[derive(Clone)]
70pub struct IndexerReader {
71    pool: ConnectionPool,
72    package_resolver: PackageResolver,
73}
74
75pub type PackageResolver = Arc<Resolver<PackageStoreWithLruCache<IndexerStorePackageResolver>>>;
76
77// Impl for common initialization and utilities
78impl IndexerReader {
79    pub fn new(pool: ConnectionPool) -> Self {
80        let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
81        let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
82        let package_resolver = Arc::new(Resolver::new(package_cache));
83        Self {
84            pool,
85            package_resolver,
86        }
87    }
88
89    pub async fn new_with_config<T: Into<String>>(
90        db_url: T,
91        config: ConnectionPoolConfig,
92    ) -> Result<Self> {
93        let db_url = db_url.into();
94
95        let pool = ConnectionPool::new(db_url.parse()?, config).await?;
96
97        let indexer_store_pkg_resolver = IndexerStorePackageResolver::new(pool.clone());
98        let package_cache = PackageStoreWithLruCache::new(indexer_store_pkg_resolver);
99        let package_resolver = Arc::new(Resolver::new(package_cache));
100        Ok(Self {
101            pool,
102            package_resolver,
103        })
104    }
105
106    pub fn pool(&self) -> &ConnectionPool {
107        &self.pool
108    }
109}
110
111// Impl for reading data from the DB
112impl IndexerReader {
113    async fn get_object_from_db(
114        &self,
115        object_id: &ObjectID,
116        version: Option<VersionNumber>,
117    ) -> Result<Option<StoredObject>, IndexerError> {
118        use diesel_async::RunQueryDsl;
119
120        let mut connection = self.pool.get().await?;
121
122        let mut query = objects::table
123            .filter(objects::object_id.eq(object_id.to_vec()))
124            .into_boxed();
125        if let Some(version) = version {
126            query = query.filter(objects::object_version.eq(version.value() as i64))
127        }
128
129        query
130            .first::<StoredObject>(&mut connection)
131            .await
132            .optional()
133            .map_err(Into::into)
134    }
135
136    pub async fn get_object(
137        &self,
138        object_id: &ObjectID,
139        version: Option<VersionNumber>,
140    ) -> Result<Option<Object>, IndexerError> {
141        let Some(stored_package) = self.get_object_from_db(object_id, version).await? else {
142            return Ok(None);
143        };
144
145        let object = stored_package.try_into()?;
146        Ok(Some(object))
147    }
148
149    pub async fn get_object_read(&self, object_id: ObjectID) -> Result<ObjectRead, IndexerError> {
150        use diesel_async::RunQueryDsl;
151
152        let mut connection = self.pool.get().await?;
153
154        let stored_object = objects::table
155            .filter(objects::object_id.eq(object_id.to_vec()))
156            .first::<StoredObject>(&mut connection)
157            .await
158            .optional()?;
159
160        if let Some(object) = stored_object {
161            object
162                .try_into_object_read(self.package_resolver.clone())
163                .await
164        } else {
165            Ok(ObjectRead::NotExists(object_id))
166        }
167    }
168
169    pub async fn get_package(&self, package_id: ObjectID) -> Result<Package, IndexerError> {
170        let store = self.package_resolver.package_store();
171        let pkg = store
172            .fetch(package_id.into())
173            .await
174            .map_err(|e| {
175                IndexerError::PostgresReadError(format!(
176                    "Fail to fetch package from package store with error {:?}",
177                    e
178                ))
179            })?
180            .as_ref()
181            .clone();
182        Ok(pkg)
183    }
184
185    async fn get_epoch_info_from_db(
186        &self,
187        epoch: Option<EpochId>,
188    ) -> Result<Option<StoredEpochInfo>, IndexerError> {
189        use diesel_async::RunQueryDsl;
190
191        let mut connection = self.pool.get().await?;
192
193        let stored_epoch = epochs::table
194            .into_boxed()
195            .pipe(|query| {
196                if let Some(epoch) = epoch {
197                    query.filter(epochs::epoch.eq(epoch as i64))
198                } else {
199                    query.order_by(epochs::epoch.desc())
200                }
201            })
202            .first::<StoredEpochInfo>(&mut connection)
203            .await
204            .optional()?;
205
206        Ok(stored_epoch)
207    }
208
209    pub async fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
210        use diesel_async::RunQueryDsl;
211
212        let mut connection = self.pool.get().await?;
213
214        let stored_epoch = epochs::table
215            .order_by(epochs::epoch.desc())
216            .first::<StoredEpochInfo>(&mut connection)
217            .await?;
218
219        Ok(stored_epoch)
220    }
221
222    pub async fn get_epoch_info(
223        &self,
224        epoch: Option<EpochId>,
225    ) -> Result<Option<EpochInfo>, IndexerError> {
226        let stored_epoch = self.get_epoch_info_from_db(epoch).await?;
227
228        let stored_epoch = match stored_epoch {
229            Some(stored_epoch) => stored_epoch,
230            None => return Ok(None),
231        };
232
233        let epoch_info = EpochInfo::try_from(stored_epoch)?;
234        Ok(Some(epoch_info))
235    }
236
237    async fn get_epochs_from_db(
238        &self,
239        cursor: Option<u64>,
240        limit: usize,
241        descending_order: bool,
242    ) -> Result<Vec<StoredEpochInfo>, IndexerError> {
243        use diesel_async::RunQueryDsl;
244
245        let mut connection = self.pool.get().await?;
246
247        let mut query = epochs::table.into_boxed();
248
249        if let Some(cursor) = cursor {
250            if descending_order {
251                query = query.filter(epochs::epoch.lt(cursor as i64));
252            } else {
253                query = query.filter(epochs::epoch.gt(cursor as i64));
254            }
255        }
256
257        if descending_order {
258            query = query.order_by(epochs::epoch.desc());
259        } else {
260            query = query.order_by(epochs::epoch.asc());
261        }
262
263        query
264            .limit(limit as i64)
265            .load(&mut connection)
266            .await
267            .map_err(Into::into)
268    }
269
270    pub async fn get_epochs(
271        &self,
272        cursor: Option<u64>,
273        limit: usize,
274        descending_order: bool,
275    ) -> Result<Vec<EpochInfo>, IndexerError> {
276        self.get_epochs_from_db(cursor, limit, descending_order)
277            .await?
278            .into_iter()
279            .map(EpochInfo::try_from)
280            .collect::<Result<Vec<_>, _>>()
281    }
282
283    pub async fn get_latest_sui_system_state(&self) -> Result<SuiSystemStateSummary, IndexerError> {
284        let object_store = ConnectionAsObjectStore::from_pool(&self.pool)
285            .await
286            .map_err(|e| IndexerError::PgPoolConnectionError(e.to_string()))?;
287
288        let system_state = tokio::task::spawn_blocking(move || {
289            sui_types::sui_system_state::get_sui_system_state(&object_store)
290        })
291        .await
292        .unwrap()?
293        .into_sui_system_state_summary();
294
295        Ok(system_state)
296    }
297
298    pub async fn get_validator_from_table(
299        &self,
300        table_id: ObjectID,
301        pool_id: sui_types::id::ID,
302    ) -> Result<
303        sui_types::sui_system_state::sui_system_state_summary::SuiValidatorSummary,
304        IndexerError,
305    > {
306        let object_store = ConnectionAsObjectStore::from_pool(&self.pool)
307            .await
308            .map_err(|e| IndexerError::PgPoolConnectionError(e.to_string()))?;
309
310        let validator = tokio::task::spawn_blocking(move || {
311            sui_types::sui_system_state::get_validator_from_table(&object_store, table_id, &pool_id)
312        })
313        .await
314        .unwrap()?;
315        Ok(validator)
316    }
317
318    /// Retrieve the system state data for the given epoch. If no epoch is given,
319    /// it will retrieve the latest epoch's data and return the system state.
320    /// System state of the epoch is written at the end of the epoch, so system state
321    /// of the current epoch is empty until the epoch ends. You can call
322    /// `get_latest_sui_system_state` for current epoch instead.
323    pub async fn get_epoch_sui_system_state(
324        &self,
325        epoch: Option<EpochId>,
326    ) -> Result<SuiSystemStateSummary, IndexerError> {
327        let stored_epoch = self.get_epoch_info_from_db(epoch).await?;
328        let stored_epoch = match stored_epoch {
329            Some(stored_epoch) => stored_epoch,
330            None => return Err(IndexerError::InvalidArgumentError("Invalid epoch".into())),
331        };
332        stored_epoch.get_json_system_state_summary()
333    }
334
335    async fn get_checkpoint_from_db(
336        &self,
337        checkpoint_id: CheckpointId,
338    ) -> Result<Option<StoredCheckpoint>, IndexerError> {
339        use diesel_async::RunQueryDsl;
340
341        let mut connection = self.pool.get().await?;
342        let stored_checkpoint = checkpoints::table
343            .into_boxed()
344            .pipe(|query| match checkpoint_id {
345                CheckpointId::SequenceNumber(seq) => {
346                    query.filter(checkpoints::sequence_number.eq(seq as i64))
347                }
348                CheckpointId::Digest(digest) => {
349                    query.filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
350                }
351            })
352            .first::<StoredCheckpoint>(&mut connection)
353            .await
354            .optional()?;
355
356        Ok(stored_checkpoint)
357    }
358
359    async fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
360        use diesel_async::RunQueryDsl;
361
362        let mut connection = self.pool.get().await?;
363
364        checkpoints::table
365            .order_by(checkpoints::sequence_number.desc())
366            .first::<StoredCheckpoint>(&mut connection)
367            .await
368            .map_err(Into::into)
369    }
370
371    pub async fn get_checkpoint(
372        &self,
373        checkpoint_id: CheckpointId,
374    ) -> Result<Option<sui_json_rpc_types::Checkpoint>, IndexerError> {
375        let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id).await? {
376            Some(stored_checkpoint) => stored_checkpoint,
377            None => return Ok(None),
378        };
379
380        let checkpoint = sui_json_rpc_types::Checkpoint::try_from(stored_checkpoint)?;
381        Ok(Some(checkpoint))
382    }
383
384    pub async fn get_latest_checkpoint(
385        &self,
386    ) -> Result<sui_json_rpc_types::Checkpoint, IndexerError> {
387        let stored_checkpoint = self.get_latest_checkpoint_from_db().await?;
388
389        sui_json_rpc_types::Checkpoint::try_from(stored_checkpoint)
390    }
391
392    async fn get_checkpoints_from_db(
393        &self,
394        cursor: Option<u64>,
395        limit: usize,
396        descending_order: bool,
397    ) -> Result<Vec<StoredCheckpoint>, IndexerError> {
398        use diesel_async::RunQueryDsl;
399
400        let mut connection = self.pool.get().await?;
401
402        let mut query = checkpoints::table.into_boxed();
403        if let Some(cursor) = cursor {
404            if descending_order {
405                query = query.filter(checkpoints::sequence_number.lt(cursor as i64));
406            } else {
407                query = query.filter(checkpoints::sequence_number.gt(cursor as i64));
408            }
409        }
410        if descending_order {
411            query = query.order_by(checkpoints::sequence_number.desc());
412        } else {
413            query = query.order_by(checkpoints::sequence_number.asc());
414        }
415
416        query
417            .limit(limit as i64)
418            .load::<StoredCheckpoint>(&mut connection)
419            .await
420            .map_err(Into::into)
421    }
422
423    pub async fn get_checkpoints(
424        &self,
425        cursor: Option<u64>,
426        limit: usize,
427        descending_order: bool,
428    ) -> Result<Vec<sui_json_rpc_types::Checkpoint>, IndexerError> {
429        self.get_checkpoints_from_db(cursor, limit, descending_order)
430            .await?
431            .into_iter()
432            .map(sui_json_rpc_types::Checkpoint::try_from)
433            .collect()
434    }
435
436    async fn multi_get_transactions(
437        &self,
438        digests: &[TransactionDigest],
439    ) -> Result<Vec<StoredTransaction>, IndexerError> {
440        use diesel_async::RunQueryDsl;
441
442        let mut connection = self.pool.get().await?;
443
444        let digests = digests
445            .iter()
446            .map(|digest| digest.inner().to_vec())
447            .collect::<Vec<_>>();
448
449        transactions::table
450            .inner_join(
451                tx_digests::table
452                    .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
453            )
454            .filter(tx_digests::tx_digest.eq_any(digests))
455            .select(StoredTransaction::as_select())
456            .load::<StoredTransaction>(&mut connection)
457            .await
458            .map_err(Into::into)
459    }
460
461    async fn stored_transaction_to_transaction_block(
462        &self,
463        stored_txes: Vec<StoredTransaction>,
464        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
465    ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
466        let mut tx_block_responses_futures = vec![];
467        for stored_tx in stored_txes {
468            let package_resolver_clone = self.package_resolver();
469            let options_clone = options.clone();
470            tx_block_responses_futures.push(tokio::task::spawn(
471                stored_tx
472                    .try_into_sui_transaction_block_response(options_clone, package_resolver_clone),
473            ));
474        }
475
476        let tx_blocks = futures::future::join_all(tx_block_responses_futures)
477            .await
478            .into_iter()
479            .collect::<Result<Vec<_>, _>>()
480            .tap_err(|e| error!("Failed to join all tx block futures: {}", e))?
481            .into_iter()
482            .collect::<Result<Vec<_>, _>>()
483            .tap_err(|e| error!("Failed to collect tx block futures: {}", e))?;
484        Ok(tx_blocks)
485    }
486
487    async fn multi_get_transactions_with_sequence_numbers(
488        &self,
489        tx_sequence_numbers: Vec<i64>,
490        // Some(true) for desc, Some(false) for asc, None for undefined order
491        is_descending: Option<bool>,
492    ) -> Result<Vec<StoredTransaction>, IndexerError> {
493        use diesel_async::RunQueryDsl;
494
495        let mut connection = self.pool.get().await?;
496
497        let mut query = transactions::table
498            .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
499            .into_boxed();
500        match is_descending {
501            Some(true) => {
502                query = query.order(transactions::dsl::tx_sequence_number.desc());
503            }
504            Some(false) => {
505                query = query.order(transactions::dsl::tx_sequence_number.asc());
506            }
507            None => (),
508        }
509
510        query
511            .load::<StoredTransaction>(&mut connection)
512            .await
513            .map_err(Into::into)
514    }
515
516    pub async fn get_owned_objects(
517        &self,
518        address: SuiAddress,
519        filter: Option<SuiObjectDataFilter>,
520        cursor: Option<ObjectID>,
521        limit: usize,
522    ) -> Result<Vec<StoredObject>, IndexerError> {
523        use diesel_async::RunQueryDsl;
524
525        let mut connection = self.pool.get().await?;
526
527        let mut query = objects::table
528            .filter(objects::owner_type.eq(OwnerType::Address as i16))
529            .filter(objects::owner_id.eq(address.to_vec()))
530            .order(objects::object_id.asc())
531            .limit(limit as i64)
532            .into_boxed();
533        if let Some(filter) = filter {
534            match filter {
535                SuiObjectDataFilter::StructType(struct_tag) => {
536                    let object_type = struct_tag.to_canonical_string(/* with_prefix */ true);
537                    query = query.filter(objects::object_type.like(format!("{}%", object_type)));
538                }
539                SuiObjectDataFilter::MatchAny(filters) => {
540                    let mut condition = "(".to_string();
541                    for (i, filter) in filters.iter().enumerate() {
542                        if let SuiObjectDataFilter::StructType(struct_tag) = filter {
543                            let object_type =
544                                struct_tag.to_canonical_string(/* with_prefix */ true);
545                            if i == 0 {
546                                condition +=
547                                    format!("objects.object_type LIKE '{}%'", object_type).as_str();
548                            } else {
549                                condition +=
550                                    format!(" OR objects.object_type LIKE '{}%'", object_type)
551                                        .as_str();
552                            }
553                        } else {
554                            return Err(IndexerError::InvalidArgumentError(
555                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
556                                ));
557                        }
558                    }
559                    condition += ")";
560                    query = query.filter(sql::<Bool>(&condition));
561                }
562                SuiObjectDataFilter::MatchNone(filters) => {
563                    for filter in filters {
564                        if let SuiObjectDataFilter::StructType(struct_tag) = filter {
565                            let object_type =
566                                struct_tag.to_canonical_string(/* with_prefix */ true);
567                            query = query
568                                .filter(objects::object_type.not_like(format!("{}%", object_type)));
569                        } else {
570                            return Err(IndexerError::InvalidArgumentError(
571                                    "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
572                                ));
573                        }
574                    }
575                }
576                _ => {
577                    return Err(IndexerError::InvalidArgumentError(
578                            "Invalid filter type. Only struct, MatchAny and MatchNone of struct filters are supported.".into(),
579                        ));
580                }
581            }
582        }
583
584        if let Some(object_cursor) = cursor {
585            query = query.filter(objects::object_id.gt(object_cursor.to_vec()));
586        }
587
588        query
589            .load::<StoredObject>(&mut connection)
590            .await
591            .map_err(|e| IndexerError::PostgresReadError(e.to_string()))
592    }
593
594    pub async fn multi_get_objects(
595        &self,
596        object_ids: Vec<ObjectID>,
597    ) -> Result<Vec<StoredObject>, IndexerError> {
598        use diesel_async::RunQueryDsl;
599
600        let mut connection = self.pool.get().await?;
601        let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
602
603        objects::table
604            .filter(objects::object_id.eq_any(object_ids))
605            .load::<StoredObject>(&mut connection)
606            .await
607            .map_err(Into::into)
608    }
609
610    async fn query_transaction_blocks_by_checkpoint(
611        &self,
612        checkpoint_seq: u64,
613        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
614        cursor_tx_seq: Option<i64>,
615        limit: usize,
616        is_descending: bool,
617    ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
618        use diesel_async::RunQueryDsl;
619
620        let mut connection = self.pool.get().await?;
621
622        let tx_range: (i64, i64) = pruner_cp_watermark::dsl::pruner_cp_watermark
623            .select((
624                pruner_cp_watermark::min_tx_sequence_number,
625                pruner_cp_watermark::max_tx_sequence_number,
626            ))
627            .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint_seq as i64))
628            .first::<(i64, i64)>(&mut connection)
629            .await?;
630
631        let mut query = transactions::table
632            .filter(transactions::tx_sequence_number.between(tx_range.0, tx_range.1))
633            .into_boxed();
634
635        if let Some(cursor_tx_seq) = cursor_tx_seq {
636            if is_descending {
637                query = query.filter(transactions::tx_sequence_number.lt(cursor_tx_seq));
638            } else {
639                query = query.filter(transactions::tx_sequence_number.gt(cursor_tx_seq));
640            }
641        }
642        if is_descending {
643            query = query.order(transactions::tx_sequence_number.desc());
644        } else {
645            query = query.order(transactions::tx_sequence_number.asc());
646        }
647        let stored_txes = query
648            .limit(limit as i64)
649            .load::<StoredTransaction>(&mut connection)
650            .await?;
651        self.stored_transaction_to_transaction_block(stored_txes, options)
652            .await
653    }
654
655    pub async fn query_transaction_blocks(
656        &self,
657        filter: Option<TransactionFilter>,
658        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
659        cursor: Option<TransactionDigest>,
660        limit: usize,
661        is_descending: bool,
662    ) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
663        use diesel_async::RunQueryDsl;
664
665        let mut connection = self.pool.get().await?;
666
667        let cursor_tx_seq = if let Some(cursor) = cursor {
668            let tx_seq = tx_digests::table
669                .select(tx_digests::tx_sequence_number)
670                .filter(tx_digests::tx_digest.eq(cursor.into_inner().to_vec()))
671                .first::<i64>(&mut connection)
672                .await?;
673            Some(tx_seq)
674        } else {
675            None
676        };
677        let cursor_clause = if let Some(cursor_tx_seq) = cursor_tx_seq {
678            if is_descending {
679                format!("AND {TX_SEQUENCE_NUMBER_STR} < {}", cursor_tx_seq)
680            } else {
681                format!("AND {TX_SEQUENCE_NUMBER_STR} > {}", cursor_tx_seq)
682            }
683        } else {
684            "".to_string()
685        };
686        let order_str = if is_descending { "DESC" } else { "ASC" };
687        let (table_name, main_where_clause) = match filter {
688            // Processed above
689            Some(TransactionFilter::Checkpoint(seq)) => {
690                return self
691                    .query_transaction_blocks_by_checkpoint(
692                        seq,
693                        options,
694                        cursor_tx_seq,
695                        limit,
696                        is_descending,
697                    )
698                    .await
699            }
700            // FIXME: sanitize module & function
701            Some(TransactionFilter::MoveFunction {
702                package,
703                module,
704                function,
705            }) => {
706                let package = Hex::encode(package.to_vec());
707                match (module, function) {
708                    (Some(module), Some(function)) => (
709                        "tx_calls_fun".to_owned(),
710                        format!(
711                            "package = '\\x{package}'::bytea AND module = '{module}' AND func = '{function}'",
712                        ),
713                    ),
714                    (Some(module), None) => (
715                        "tx_calls_mod".to_owned(),
716                        format!(
717                            "package = '\\x{package}'::bytea AND module = '{module}'",
718                        ),
719                    ),
720                    (None, Some(_)) => {
721                        return Err(IndexerError::InvalidArgumentError(
722                            "Function cannot be present without Module.".into(),
723                        ));
724                    }
725                    (None, None) => (
726                        "tx_calls_pkg".to_owned(),
727                        format!("package = '\\x{package}'::bytea"),
728                    ),
729                }
730            }
731            Some(TransactionFilter::AffectedObject(object_id)) => {
732                let object_id = Hex::encode(object_id.to_vec());
733                (
734                    "tx_affected_objects".to_owned(),
735                    format!("affected = '\\x{object_id}'::bytea"),
736                )
737            }
738            Some(TransactionFilter::FromAddress(from_address)) => {
739                let from_address = Hex::encode(from_address.to_vec());
740                (
741                    "tx_affected_addresses".to_owned(),
742                    format!("sender = '\\x{from_address}'::bytea AND affected = '\\x{from_address}'::bytea"),
743                )
744            }
745            Some(TransactionFilter::FromAndToAddress { from, to }) => {
746                let from_address = Hex::encode(from.to_vec());
747                let to_address = Hex::encode(to.to_vec());
748                (
749                    "tx_affected_addresses".to_owned(),
750                    format!("sender = '\\x{from_address}'::bytea AND affected = '\\x{to_address}'::bytea"),
751                )
752            }
753            Some(TransactionFilter::FromOrToAddress { addr }) => {
754                let address = Hex::encode(addr.to_vec());
755                (
756                    "tx_affected_addresses".to_owned(),
757                    format!("affected = '\\x{address}'::bytea"),
758                )
759            }
760            Some(
761                TransactionFilter::TransactionKind(_) | TransactionFilter::TransactionKindIn(_),
762            ) => {
763                return Err(IndexerError::NotSupportedError(
764                    "TransactionKind filter is not supported.".into(),
765                ));
766            }
767            Some(TransactionFilter::InputObject(_) | TransactionFilter::ChangedObject(_)) => {
768                return Err(IndexerError::NotSupportedError(
769                    "InputObject and OutputObject filters are not supported, please use AffectedObject instead.".into()
770                ))
771            }
772            Some(TransactionFilter::ToAddress(_)) => {
773                return Err(IndexerError::NotSupportedError(
774                    "ToAddress filter is not supported, please use FromOrToAddress instead.".into()
775                ))
776            }
777            None => {
778                // apply no filter
779                ("transactions".to_owned(), "1 = 1".into())
780            }
781        };
782
783        let query = format!(
784            "SELECT {TX_SEQUENCE_NUMBER_STR} FROM {} WHERE {} {} ORDER BY {TX_SEQUENCE_NUMBER_STR} {} LIMIT {}",
785            table_name, main_where_clause, cursor_clause, order_str, limit,
786        );
787
788        debug!("query transaction blocks: {}", query);
789        let tx_sequence_numbers = diesel::sql_query(query.clone())
790            .load::<TxSequenceNumber>(&mut connection)
791            .await?
792            .into_iter()
793            .map(|tsn| tsn.tx_sequence_number)
794            .collect::<Vec<i64>>();
795        self.multi_get_transaction_block_response_by_sequence_numbers(
796            tx_sequence_numbers,
797            options,
798            Some(is_descending),
799        )
800        .await
801    }
802
803    async fn multi_get_transaction_block_response_in_blocking_task_impl(
804        &self,
805        digests: &[TransactionDigest],
806        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
807    ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
808        let stored_txes = self.multi_get_transactions(digests).await?;
809        self.stored_transaction_to_transaction_block(stored_txes, options)
810            .await
811    }
812
813    async fn multi_get_transaction_block_response_by_sequence_numbers(
814        &self,
815        tx_sequence_numbers: Vec<i64>,
816        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
817        // Some(true) for desc, Some(false) for asc, None for undefined order
818        is_descending: Option<bool>,
819    ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
820        let stored_txes: Vec<StoredTransaction> = self
821            .multi_get_transactions_with_sequence_numbers(tx_sequence_numbers, is_descending)
822            .await?;
823        self.stored_transaction_to_transaction_block(stored_txes, options)
824            .await
825    }
826
827    pub async fn multi_get_transaction_block_response_in_blocking_task(
828        &self,
829        digests: Vec<TransactionDigest>,
830        options: sui_json_rpc_types::SuiTransactionBlockResponseOptions,
831    ) -> Result<Vec<sui_json_rpc_types::SuiTransactionBlockResponse>, IndexerError> {
832        self.multi_get_transaction_block_response_in_blocking_task_impl(&digests, options)
833            .await
834    }
835
836    pub async fn get_transaction_events(
837        &self,
838        digest: TransactionDigest,
839    ) -> Result<Vec<sui_json_rpc_types::SuiEvent>, IndexerError> {
840        use diesel_async::RunQueryDsl;
841
842        let mut connection = self.pool.get().await?;
843
844        // Use the tx_digests lookup table for the corresponding tx_sequence_number, and then fetch
845        // event-relevant data from the entry on the transactions table.
846        let (timestamp_ms, serialized_events) = transactions::table
847            .filter(
848                transactions::tx_sequence_number
849                    .nullable()
850                    .eq(tx_digests::table
851                        .select(tx_digests::tx_sequence_number)
852                        .filter(tx_digests::tx_digest.eq(digest.into_inner().to_vec()))
853                        .single_value()),
854            )
855            .select((transactions::timestamp_ms, transactions::events))
856            .first::<(i64, StoredTransactionEvents)>(&mut connection)
857            .await?;
858
859        let events = stored_events_to_events(serialized_events)?;
860        let tx_events = TransactionEvents { data: events };
861
862        let sui_tx_events = tx_events_to_sui_tx_events(
863            tx_events,
864            self.package_resolver(),
865            digest,
866            timestamp_ms as u64,
867        )
868        .await?;
869        Ok(sui_tx_events.map_or(vec![], |ste| ste.data))
870    }
871
872    async fn query_events_by_tx_digest(
873        &self,
874        tx_digest: TransactionDigest,
875        cursor: Option<EventID>,
876        cursor_tx_seq: i64,
877        limit: usize,
878        descending_order: bool,
879    ) -> IndexerResult<Vec<SuiEvent>> {
880        use diesel_async::RunQueryDsl;
881
882        let mut connection = self.pool.get().await?;
883
884        let mut query = events::table.into_boxed();
885
886        if let Some(cursor) = cursor {
887            if cursor.tx_digest != tx_digest {
888                return Err(IndexerError::InvalidArgumentError(
889                    "Cursor tx_digest does not match the tx_digest in the query.".into(),
890                ));
891            }
892            if descending_order {
893                query = query.filter(events::event_sequence_number.lt(cursor.event_seq as i64));
894            } else {
895                query = query.filter(events::event_sequence_number.gt(cursor.event_seq as i64));
896            }
897        } else if descending_order {
898            query = query.filter(events::event_sequence_number.le(i64::MAX));
899        } else {
900            query = query.filter(events::event_sequence_number.ge(0));
901        };
902
903        if descending_order {
904            query = query.order(events::event_sequence_number.desc());
905        } else {
906            query = query.order(events::event_sequence_number.asc());
907        }
908
909        // If the cursor is provided and matches tx_digest, we've already fetched the
910        // tx_sequence_number and can query events table directly. Otherwise, we can just consult
911        // the tx_digests table for the tx_sequence_number to key into events table.
912        if cursor.is_some() {
913            query = query.filter(events::tx_sequence_number.eq(cursor_tx_seq));
914        } else {
915            query = query.filter(
916                events::tx_sequence_number.nullable().eq(tx_digests::table
917                    .select(tx_digests::tx_sequence_number)
918                    .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
919                    .single_value()),
920            );
921        }
922
923        let stored_events = query
924            .limit(limit as i64)
925            .load::<StoredEvent>(&mut connection)
926            .await?;
927
928        let mut sui_event_futures = vec![];
929        for stored_event in stored_events {
930            sui_event_futures.push(tokio::task::spawn(
931                stored_event.try_into_sui_event(self.package_resolver.clone()),
932            ));
933        }
934
935        let sui_events = futures::future::join_all(sui_event_futures)
936            .await
937            .into_iter()
938            .collect::<Result<Vec<_>, _>>()
939            .tap_err(|e| error!("Failed to join sui event futures: {}", e))?
940            .into_iter()
941            .collect::<Result<Vec<_>, _>>()
942            .tap_err(|e| error!("Failed to collect sui event futures: {}", e))?;
943        Ok(sui_events)
944    }
945
946    pub async fn query_events(
947        &self,
948        filter: EventFilter,
949        cursor: Option<EventID>,
950        limit: usize,
951        descending_order: bool,
952    ) -> IndexerResult<Vec<SuiEvent>> {
953        use diesel_async::RunQueryDsl;
954
955        let mut connection = self.pool.get().await?;
956
957        let (tx_seq, event_seq) = if let Some(cursor) = cursor {
958            let EventID {
959                tx_digest,
960                event_seq,
961            } = cursor;
962            let tx_seq = transactions::table
963                .select(transactions::tx_sequence_number)
964                .filter(
965                    transactions::tx_sequence_number
966                        .nullable()
967                        .eq(tx_digests::table
968                            .select(tx_digests::tx_sequence_number)
969                            .filter(tx_digests::tx_digest.eq(tx_digest.into_inner().to_vec()))
970                            .single_value()),
971                )
972                .first::<i64>(&mut connection)
973                .await?;
974            (tx_seq, event_seq as i64)
975        } else if descending_order {
976            (i64::MAX, i64::MAX)
977        } else {
978            (-1, 0)
979        };
980
981        let query = if let EventFilter::Sender(sender) = &filter {
982            // Need to remove ambiguities for tx_sequence_number column
983            let cursor_clause = if descending_order {
984                format!(
985                    "(e.{TX_SEQUENCE_NUMBER_STR} < {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} < {}))",
986                    tx_seq, tx_seq, event_seq
987                )
988            } else {
989                format!(
990                    "(e.{TX_SEQUENCE_NUMBER_STR} > {} OR (e.{TX_SEQUENCE_NUMBER_STR} = {} AND e.{EVENT_SEQUENCE_NUMBER_STR} > {}))",
991                    tx_seq, tx_seq, event_seq
992                )
993            };
994            let order_clause = if descending_order {
995                format!("e.{TX_SEQUENCE_NUMBER_STR} DESC, e.{EVENT_SEQUENCE_NUMBER_STR} DESC")
996            } else {
997                format!("e.{TX_SEQUENCE_NUMBER_STR} ASC, e.{EVENT_SEQUENCE_NUMBER_STR} ASC")
998            };
999            format!(
1000                "( \
1001                    SELECT *
1002                    FROM event_senders s
1003                    JOIN events e
1004                    USING (tx_sequence_number, event_sequence_number)
1005                    WHERE s.sender = '\\x{}'::bytea AND {} \
1006                    ORDER BY {} \
1007                    LIMIT {}
1008                )",
1009                Hex::encode(sender.to_vec()),
1010                cursor_clause,
1011                order_clause,
1012                limit,
1013            )
1014        } else if let EventFilter::Transaction(tx_digest) = filter {
1015            return self
1016                .query_events_by_tx_digest(tx_digest, cursor, tx_seq, limit, descending_order)
1017                .await;
1018        } else {
1019            let main_where_clause = match filter {
1020                EventFilter::All([]) => {
1021                    // No filter
1022                    "1 = 1".to_string()
1023                }
1024                EventFilter::MoveModule { package, module } => {
1025                    format!(
1026                        "package = '\\x{}'::bytea AND module = '{}'",
1027                        package.to_hex(),
1028                        module,
1029                    )
1030                }
1031                EventFilter::MoveEventType(struct_tag) => {
1032                    format!(
1033                        "event_type = '{}'",
1034                        struct_tag.to_canonical_display(/* with_prefix */ true),
1035                    )
1036                }
1037                EventFilter::MoveEventModule { package, module } => {
1038                    let package_module_prefix = format!("{}::{}", package.to_hex_literal(), module);
1039                    format!("event_type LIKE '{package_module_prefix}::%'")
1040                }
1041                EventFilter::Sender(_) => {
1042                    // Processed above
1043                    unreachable!()
1044                }
1045                EventFilter::Transaction(_) => {
1046                    // Processed above
1047                    unreachable!()
1048                }
1049                EventFilter::TimeRange { .. } | EventFilter::Any(_) => {
1050                    return Err(IndexerError::NotSupportedError(
1051                        "This type of EventFilter is not supported.".to_owned(),
1052                    ));
1053                }
1054            };
1055
1056            let cursor_clause = if descending_order {
1057                format!(
1058                    "AND ({TX_SEQUENCE_NUMBER_STR} < {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} < {}))",
1059                    tx_seq, tx_seq, event_seq
1060                )
1061            } else {
1062                format!(
1063                    "AND ({TX_SEQUENCE_NUMBER_STR} > {} OR ({TX_SEQUENCE_NUMBER_STR} = {} AND {EVENT_SEQUENCE_NUMBER_STR} > {}))",
1064                    tx_seq, tx_seq, event_seq
1065                )
1066            };
1067            let order_clause = if descending_order {
1068                format!("{TX_SEQUENCE_NUMBER_STR} DESC, {EVENT_SEQUENCE_NUMBER_STR} DESC")
1069            } else {
1070                format!("{TX_SEQUENCE_NUMBER_STR} ASC, {EVENT_SEQUENCE_NUMBER_STR} ASC")
1071            };
1072
1073            format!(
1074                "
1075                    SELECT * FROM events \
1076                    WHERE {} {} \
1077                    ORDER BY {} \
1078                    LIMIT {}
1079                ",
1080                main_where_clause, cursor_clause, order_clause, limit,
1081            )
1082        };
1083        debug!("query events: {}", query);
1084        let stored_events = diesel::sql_query(query)
1085            .load::<StoredEvent>(&mut connection)
1086            .await?;
1087
1088        let mut sui_event_futures = vec![];
1089        for stored_event in stored_events {
1090            sui_event_futures.push(tokio::task::spawn(
1091                stored_event.try_into_sui_event(self.package_resolver.clone()),
1092            ));
1093        }
1094
1095        let sui_events = futures::future::join_all(sui_event_futures)
1096            .await
1097            .into_iter()
1098            .collect::<Result<Vec<_>, _>>()
1099            .tap_err(|e| error!("Failed to join sui event futures: {}", e))?
1100            .into_iter()
1101            .collect::<Result<Vec<_>, _>>()
1102            .tap_err(|e| error!("Failed to collect sui event futures: {}", e))?;
1103        Ok(sui_events)
1104    }
1105
1106    pub async fn get_dynamic_fields(
1107        &self,
1108        parent_object_id: ObjectID,
1109        cursor: Option<ObjectID>,
1110        limit: usize,
1111    ) -> Result<Vec<DynamicFieldInfo>, IndexerError> {
1112        let stored_objects = self
1113            .get_dynamic_fields_raw(parent_object_id, cursor, limit)
1114            .await?;
1115        let mut df_futures = vec![];
1116        let indexer_reader_arc = Arc::new(self.clone());
1117        for stored_object in stored_objects {
1118            let indexer_reader_arc_clone = Arc::clone(&indexer_reader_arc);
1119            df_futures.push(tokio::task::spawn(async move {
1120                indexer_reader_arc_clone
1121                    .try_create_dynamic_field_info(stored_object)
1122                    .await
1123            }));
1124        }
1125        let df_infos = futures::future::join_all(df_futures)
1126            .await
1127            .into_iter()
1128            .collect::<Result<Vec<_>, _>>()
1129            .tap_err(|e| error!("Error joining DF futures: {:?}", e))?
1130            .into_iter()
1131            .collect::<Result<Vec<_>, _>>()
1132            .tap_err(|e| error!("Error calling try_create_dynamic_field_info: {:?}", e))?
1133            .into_iter()
1134            .flatten()
1135            .collect::<Vec<_>>();
1136        Ok(df_infos)
1137    }
1138
1139    pub async fn get_dynamic_fields_raw(
1140        &self,
1141        parent_object_id: ObjectID,
1142        cursor: Option<ObjectID>,
1143        limit: usize,
1144    ) -> Result<Vec<StoredObject>, IndexerError> {
1145        use diesel_async::RunQueryDsl;
1146
1147        let mut connection = self.pool.get().await?;
1148
1149        let mut query = objects::table
1150            .filter(objects::owner_type.eq(OwnerType::Object as i16))
1151            .filter(objects::owner_id.eq(parent_object_id.to_vec()))
1152            .order(objects::object_id.asc())
1153            .limit(limit as i64)
1154            .into_boxed();
1155
1156        if let Some(object_cursor) = cursor {
1157            query = query.filter(objects::object_id.gt(object_cursor.to_vec()));
1158        }
1159
1160        query
1161            .load::<StoredObject>(&mut connection)
1162            .await
1163            .map_err(Into::into)
1164    }
1165
1166    async fn try_create_dynamic_field_info(
1167        &self,
1168        stored_object: StoredObject,
1169    ) -> Result<Option<DynamicFieldInfo>, IndexerError> {
1170        if stored_object.df_kind.is_none() {
1171            return Ok(None);
1172        }
1173
1174        let object: Object = stored_object.try_into()?;
1175        let move_object = match object.data.try_as_move().cloned() {
1176            Some(move_object) => move_object,
1177            None => {
1178                return Err(IndexerError::ResolveMoveStructError(
1179                    "Object is not a MoveObject".to_string(),
1180                ));
1181            }
1182        };
1183        let type_tag: TypeTag = move_object.type_().clone().into();
1184        let layout = self
1185            .package_resolver
1186            .type_layout(type_tag.clone())
1187            .await
1188            .map_err(|e| {
1189                IndexerError::ResolveMoveStructError(format!(
1190                    "Failed to get type layout for type {}: {e}",
1191                    type_tag.to_canonical_display(/* with_prefix */ true),
1192                ))
1193            })?;
1194
1195        let field = DFV::FieldVisitor::deserialize(move_object.contents(), &layout)
1196            .tap_err(|e| warn!("{e}"))
1197            .context("Failed to deserialize dynamic field")?;
1198
1199        let type_ = field.kind;
1200        let name_type: TypeTag = field.name_layout.into();
1201        let bcs_name = field.name_bytes.to_owned();
1202
1203        let name_value = BoundedVisitor::deserialize_value(field.name_bytes, field.name_layout)
1204            .tap_err(|e| warn!("{e}"))
1205            .context("Failed to deserialize dynamic field name")?;
1206
1207        let name = DynamicFieldName {
1208            type_: name_type,
1209            value: SuiMoveValue::from(name_value).to_json_value(),
1210        };
1211
1212        let value_metadata = field.value_metadata().map_err(|e| {
1213            warn!("{e}");
1214            IndexerError::UncategorizedError(anyhow!(e))
1215        })?;
1216
1217        Ok(Some(match value_metadata {
1218            DFV::ValueMetadata::DynamicField(object_type) => DynamicFieldInfo {
1219                name,
1220                bcs_name,
1221                type_,
1222                object_type: object_type.to_canonical_string(/* with_prefix */ true),
1223                object_id: object.id(),
1224                version: object.version(),
1225                digest: object.digest(),
1226            },
1227
1228            DFV::ValueMetadata::DynamicObjectField(object_id) => {
1229                let object = self.get_object(&object_id, None).await?.ok_or_else(|| {
1230                    IndexerError::UncategorizedError(anyhow!(
1231                        "Failed to find object_id {} when trying to create dynamic field info",
1232                        object_id.to_canonical_display(/* with_prefix */ true),
1233                    ))
1234                })?;
1235
1236                let object_type = object.data.type_().unwrap().clone();
1237                DynamicFieldInfo {
1238                    name,
1239                    bcs_name,
1240                    type_,
1241                    object_type: object_type.to_canonical_string(/* with_prefix */ true),
1242                    object_id,
1243                    version: object.version(),
1244                    digest: object.digest(),
1245                }
1246            }
1247        }))
1248    }
1249
1250    pub async fn bcs_name_from_dynamic_field_name(
1251        &self,
1252        name: &DynamicFieldName,
1253    ) -> Result<Vec<u8>, IndexerError> {
1254        let move_type_layout = self
1255            .package_resolver()
1256            .type_layout(name.type_.clone())
1257            .await
1258            .map_err(|e| {
1259                IndexerError::ResolveMoveStructError(format!(
1260                    "Failed to get type layout for type {}: {}",
1261                    name.type_, e
1262                ))
1263            })?;
1264        let sui_json_value = sui_json::SuiJsonValue::new(name.value.clone())?;
1265        let name_bcs_value = sui_json_value.to_bcs_bytes(&move_type_layout)?;
1266        Ok(name_bcs_value)
1267    }
1268
1269    pub async fn get_owned_coins(
1270        &self,
1271        owner: SuiAddress,
1272        // If coin_type is None, look for all coins.
1273        coin_type: Option<String>,
1274        cursor: ObjectID,
1275        limit: usize,
1276    ) -> Result<Vec<SuiCoin>, IndexerError> {
1277        use diesel_async::RunQueryDsl;
1278
1279        let mut connection = self.pool.get().await?;
1280        let mut query = objects::dsl::objects
1281            .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
1282            .filter(objects::dsl::owner_id.eq(owner.to_vec()))
1283            .filter(objects::dsl::object_id.gt(cursor.to_vec()))
1284            .into_boxed();
1285        if let Some(coin_type) = coin_type {
1286            query = query.filter(objects::dsl::coin_type.eq(Some(coin_type)));
1287        } else {
1288            query = query.filter(objects::dsl::coin_type.is_not_null());
1289        }
1290
1291        query
1292            .order((objects::dsl::coin_type.asc(), objects::dsl::object_id.asc()))
1293            .limit(limit as i64)
1294            .load::<StoredObject>(&mut connection)
1295            .await?
1296            .into_iter()
1297            .map(|o| o.try_into())
1298            .collect::<IndexerResult<Vec<_>>>()
1299    }
1300
1301    pub async fn get_coin_balances(
1302        &self,
1303        owner: SuiAddress,
1304        // If coin_type is None, look for all coins.
1305        coin_type: Option<String>,
1306    ) -> Result<Vec<Balance>, IndexerError> {
1307        use diesel_async::RunQueryDsl;
1308
1309        let mut connection = self.pool.get().await?;
1310
1311        let coin_type_filter = if let Some(coin_type) = coin_type {
1312            format!("= '{}'", coin_type)
1313        } else {
1314            "IS NOT NULL".to_string()
1315        };
1316        // Note: important to cast to BIGINT to avoid deserialize confusion
1317        let query = format!(
1318            "
1319            SELECT coin_type, \
1320            CAST(COUNT(*) AS BIGINT) AS coin_num, \
1321            CAST(SUM(coin_balance) AS BIGINT) AS coin_balance \
1322            FROM objects \
1323            WHERE owner_type = {} \
1324            AND owner_id = '\\x{}'::BYTEA \
1325            AND coin_type {} \
1326            GROUP BY coin_type \
1327            ORDER BY coin_type ASC
1328        ",
1329            OwnerType::Address as i16,
1330            Hex::encode(owner.to_vec()),
1331            coin_type_filter,
1332        );
1333
1334        debug!("get coin balances query: {query}");
1335        diesel::sql_query(query)
1336            .load::<CoinBalance>(&mut connection)
1337            .await?
1338            .into_iter()
1339            .map(|cb| cb.try_into())
1340            .collect::<IndexerResult<Vec<_>>>()
1341    }
1342
1343    pub async fn get_singleton_object(&self, type_: &StructTag) -> Result<Option<Object>> {
1344        use diesel_async::RunQueryDsl;
1345
1346        let mut connection = self.pool.get().await?;
1347
1348        let object = match objects::table
1349            .filter(objects::object_type_package.eq(type_.address.to_vec()))
1350            .filter(objects::object_type_module.eq(type_.module.to_string()))
1351            .filter(objects::object_type_name.eq(type_.name.to_string()))
1352            .filter(objects::object_type.eq(type_.to_canonical_string(/* with_prefix */ true)))
1353            .first::<StoredObject>(&mut connection)
1354            .await
1355            .optional()?
1356        {
1357            Some(object) => object,
1358            None => return Ok(None),
1359        }
1360        .try_into()?;
1361
1362        Ok(Some(object))
1363    }
1364
1365    pub async fn get_coin_metadata(
1366        &self,
1367        coin_struct: StructTag,
1368    ) -> Result<Option<SuiCoinMetadata>, IndexerError> {
1369        let coin_metadata_type = CoinMetadata::type_(coin_struct);
1370
1371        self.get_singleton_object(&coin_metadata_type)
1372            .await?
1373            .and_then(|o| SuiCoinMetadata::try_from(o).ok())
1374            .pipe(Ok)
1375    }
1376
1377    pub async fn get_total_supply(&self, coin_struct: StructTag) -> Result<Supply, IndexerError> {
1378        let treasury_cap_type = TreasuryCap::type_(coin_struct);
1379
1380        self.get_singleton_object(&treasury_cap_type)
1381            .await?
1382            .and_then(|o| TreasuryCap::try_from(o).ok())
1383            .ok_or(IndexerError::GenericError(format!(
1384                "Cannot find treasury cap object with type {}",
1385                treasury_cap_type
1386            )))?
1387            .total_supply
1388            .pipe(Ok)
1389    }
1390
1391    pub fn package_resolver(&self) -> PackageResolver {
1392        self.package_resolver.clone()
1393    }
1394}
1395
1396// NOTE: Do not make this public and easily accessible as we need to be careful that it is only
1397// used in non-async contexts via the use of tokio::task::spawn_blocking in order to avoid blocking
1398// the async runtime.
1399//
1400// Maybe we should look into introducing an async object store trait...
1401struct ConnectionAsObjectStore {
1402    inner: std::sync::Mutex<
1403        diesel_async::async_connection_wrapper::AsyncConnectionWrapper<
1404            crate::database::Connection<'static>,
1405        >,
1406    >,
1407}
1408
1409impl ConnectionAsObjectStore {
1410    async fn from_pool(
1411        pool: &ConnectionPool,
1412    ) -> Result<Self, diesel_async::pooled_connection::PoolError> {
1413        let connection = std::sync::Mutex::new(pool.dedicated_connection().await?.into());
1414
1415        Ok(Self { inner: connection })
1416    }
1417
1418    fn get_object_from_objects(
1419        &self,
1420        object_id: &ObjectID,
1421        version: Option<VersionNumber>,
1422    ) -> Result<Option<StoredObject>, IndexerError> {
1423        use diesel::RunQueryDsl;
1424
1425        let mut guard = self.inner.lock().unwrap();
1426        let connection: &mut diesel_async::async_connection_wrapper::AsyncConnectionWrapper<_> =
1427            &mut guard;
1428
1429        let mut query = objects::table
1430            .filter(objects::object_id.eq(object_id.to_vec()))
1431            .into_boxed();
1432        if let Some(version) = version {
1433            query = query.filter(objects::object_version.eq(version.value() as i64))
1434        }
1435
1436        query
1437            .first::<StoredObject>(connection)
1438            .optional()
1439            .map_err(Into::into)
1440    }
1441
1442    fn get_object_from_history(
1443        &self,
1444        object_id: &ObjectID,
1445        version: Option<VersionNumber>,
1446    ) -> Result<Option<StoredObject>, IndexerError> {
1447        use diesel::RunQueryDsl;
1448
1449        let mut guard = self.inner.lock().unwrap();
1450        let connection: &mut diesel_async::async_connection_wrapper::AsyncConnectionWrapper<_> =
1451            &mut guard;
1452
1453        let mut history_query = objects_history::table
1454            .filter(objects_history::dsl::object_id.eq(object_id.to_vec()))
1455            .into_boxed();
1456
1457        if let Some(version) = version {
1458            history_query = history_query
1459                .filter(objects_history::dsl::object_version.eq(version.value() as i64));
1460        }
1461
1462        let history_latest = history_query
1463            .order_by(objects_history::dsl::object_version.desc())
1464            .first::<StoredHistoryObject>(connection)
1465            .optional()?;
1466
1467        if let Some(history_record) = history_latest {
1468            return Ok(Some(history_record.try_into()?));
1469        }
1470
1471        let mut snapshot_query = objects_snapshot::table
1472            .filter(objects_snapshot::dsl::object_id.eq(object_id.to_vec()))
1473            .into_boxed();
1474
1475        if let Some(version) = version {
1476            snapshot_query = snapshot_query
1477                .filter(objects_snapshot::dsl::object_version.eq(version.value() as i64));
1478        }
1479
1480        snapshot_query
1481            .first::<StoredObjectSnapshot>(connection)
1482            .optional()?
1483            .map(|o| o.try_into())
1484            .transpose()
1485    }
1486
1487    fn get_object(
1488        &self,
1489        object_id: &ObjectID,
1490        version: Option<VersionNumber>,
1491    ) -> Result<Option<Object>, IndexerError> {
1492        let mut result = self.get_object_from_objects(object_id, version)?;
1493
1494        // This is for mvr-mode, which doesn't maintain an `objects` table.
1495        if result.is_none() {
1496            result = self.get_object_from_history(object_id, version)?;
1497        }
1498
1499        result.map(|o| o.try_into()).transpose()
1500    }
1501}
1502
1503impl sui_types::storage::ObjectStore for ConnectionAsObjectStore {
1504    fn get_object(&self, object_id: &ObjectID) -> Option<sui_types::object::Object> {
1505        self.get_object(object_id, None)
1506            .expect("Failed to get object")
1507    }
1508
1509    fn get_object_by_key(
1510        &self,
1511        object_id: &ObjectID,
1512        version: sui_types::base_types::VersionNumber,
1513    ) -> Option<sui_types::object::Object> {
1514        self.get_object(object_id, Some(version))
1515            .expect("Failed to get object")
1516    }
1517}