sui_indexer_alt/handlers/
obj_versions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::pipeline::Processor;
10use sui_indexer_alt_framework::postgres::Connection;
11use sui_indexer_alt_framework::postgres::handler::Handler;
12use sui_indexer_alt_framework::types::effects::TransactionEffectsAPI;
13use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
14use sui_indexer_alt_schema::objects::StoredObjVersion;
15use sui_indexer_alt_schema::schema::obj_versions;
16
17pub(crate) struct ObjVersions;
18
19#[async_trait]
20impl Processor for ObjVersions {
21    const NAME: &'static str = "obj_versions";
22    type Value = StoredObjVersion;
23
24    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
25        let Checkpoint {
26            transactions,
27            summary,
28            ..
29        } = checkpoint.as_ref();
30
31        let cp_sequence_number = summary.sequence_number as i64;
32        Ok(transactions
33            .iter()
34            .flat_map(|tx| {
35                let lamport = tx.effects.lamport_version();
36
37                tx.effects
38                    .object_changes()
39                    .into_iter()
40                    .map(move |c| StoredObjVersion {
41                        object_id: c.id.to_vec(),
42                        // If the object was created or modified, it has an output version,
43                        // otherwise it was deleted/wrapped and its version is the transaction's
44                        // lamport version.
45                        object_version: c.output_version.unwrap_or(lamport).value() as i64,
46                        object_digest: c.output_digest.map(|d| d.inner().into()),
47                        cp_sequence_number,
48                    })
49            })
50            .collect())
51    }
52}
53
54#[async_trait]
55impl Handler for ObjVersions {
56    const MIN_EAGER_ROWS: usize = 100;
57    const MAX_PENDING_ROWS: usize = 10000;
58
59    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
60        Ok(diesel::insert_into(obj_versions::table)
61            .values(values)
62            .on_conflict_do_nothing()
63            .execute(conn)
64            .await?)
65    }
66}