sui_indexer_alt/handlers/
obj_versions.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10    pipeline::Processor,
11    postgres::{Connection, handler::Handler},
12    types::{effects::TransactionEffectsAPI, full_checkpoint_content::Checkpoint},
13};
14use sui_indexer_alt_schema::{objects::StoredObjVersion, schema::obj_versions};
15
16pub(crate) struct ObjVersions;
17
18#[async_trait]
19impl Processor for ObjVersions {
20    const NAME: &'static str = "obj_versions";
21    type Value = StoredObjVersion;
22
23    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
24        let Checkpoint {
25            transactions,
26            summary,
27            ..
28        } = checkpoint.as_ref();
29
30        let cp_sequence_number = summary.sequence_number as i64;
31        Ok(transactions
32            .iter()
33            .flat_map(|tx| {
34                let lamport = tx.effects.lamport_version();
35
36                tx.effects
37                    .object_changes()
38                    .into_iter()
39                    .map(move |c| StoredObjVersion {
40                        object_id: c.id.to_vec(),
41                        // If the object was created or modified, it has an output version,
42                        // otherwise it was deleted/wrapped and its version is the transaction's
43                        // lamport version.
44                        object_version: c.output_version.unwrap_or(lamport).value() as i64,
45                        object_digest: c.output_digest.map(|d| d.inner().into()),
46                        cp_sequence_number,
47                    })
48            })
49            .collect())
50    }
51}
52
53#[async_trait]
54impl Handler for ObjVersions {
55    const MIN_EAGER_ROWS: usize = 100;
56    const MAX_PENDING_ROWS: usize = 10000;
57
58    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
59        Ok(diesel::insert_into(obj_versions::table)
60            .values(values)
61            .on_conflict_do_nothing()
62            .execute(conn)
63            .await?)
64    }
65}