sui_indexer_alt_jsonrpc/data/
object_versions.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashMap, sync::Arc};

use async_graphql::dataloader::Loader;
use diesel::sql_types::{Array, Bytea};
use sui_indexer_alt_schema::objects::StoredObjVersionKey;
use sui_types::base_types::ObjectID;

use crate::data::error::Error;

use super::pg_reader::PgReader;

/// Key for fetching the latest version of an object, not accounting for deletions or wraps. If the
/// object has been deleted or wrapped, the version before the delete/wrap is returned.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) struct LatestObjectVersionKey(pub ObjectID);

#[async_trait::async_trait]
impl Loader<LatestObjectVersionKey> for PgReader {
    type Value = StoredObjVersionKey;
    type Error = Arc<Error>;

    async fn load(
        &self,
        keys: &[LatestObjectVersionKey],
    ) -> Result<HashMap<LatestObjectVersionKey, StoredObjVersionKey>, Self::Error> {
        if keys.is_empty() {
            return Ok(HashMap::new());
        }

        let mut conn = self.connect().await.map_err(Arc::new)?;

        let ids: Vec<_> = keys.iter().map(|k| k.0.into_bytes()).collect();
        let query = diesel::sql_query(
            r#"
                SELECT
                    k.object_id,
                    v.object_version
                FROM (
                    SELECT UNNEST($1) object_id
                ) k
                CROSS JOIN LATERAL (
                    SELECT
                        object_version
                    FROM
                        obj_versions
                    WHERE
                        obj_versions.object_id = k.object_id
                    ORDER BY
                        object_version DESC
                    LIMIT
                        1
                ) v
            "#,
        )
        .bind::<Array<Bytea>, _>(ids);

        let obj_versions: Vec<StoredObjVersionKey> = conn.results(query).await.map_err(Arc::new)?;
        let id_to_stored: HashMap<_, _> = obj_versions
            .into_iter()
            .map(|stored| (stored.object_id.clone(), stored))
            .collect();

        Ok(keys
            .iter()
            .filter_map(|key| {
                let slice: &[u8] = key.0.as_ref();
                Some((*key, id_to_stored.get(slice).cloned()?))
            })
            .collect())
    }
}