sui_indexer_alt/handlers/
kv_objects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10    pipeline::Processor,
11    postgres::{Connection, handler::Handler},
12    types::full_checkpoint_content::Checkpoint,
13};
14use sui_indexer_alt_schema::{objects::StoredObject, schema::kv_objects};
15
16pub(crate) struct KvObjects;
17
18#[async_trait]
19impl Processor for KvObjects {
20    const NAME: &'static str = "kv_objects";
21    type Value = StoredObject;
22
23    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
24        let deleted_objects = checkpoint
25            .eventually_removed_object_refs_post_version()
26            .into_iter()
27            .map(|(id, version, _)| {
28                Ok(StoredObject {
29                    object_id: id.to_vec(),
30                    object_version: version.value() as i64,
31                    serialized_object: None,
32                })
33            });
34
35        let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
36            txn.output_objects(&checkpoint.object_set).map(|o| {
37                let id = o.id();
38                let version = o.version();
39                Ok(StoredObject {
40                    object_id: id.to_vec(),
41                    object_version: version.value() as i64,
42                    serialized_object: Some(bcs::to_bytes(o).with_context(|| {
43                        format!("Serializing object {id} version {}", version.value())
44                    })?),
45                })
46            })
47        });
48
49        deleted_objects
50            .chain(created_objects)
51            .collect::<Result<Vec<_>, _>>()
52    }
53}
54
55#[async_trait]
56impl Handler for KvObjects {
57    const MIN_EAGER_ROWS: usize = 100;
58    const MAX_PENDING_ROWS: usize = 10000;
59
60    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
61        Ok(diesel::insert_into(kv_objects::table)
62            .values(values)
63            .on_conflict_do_nothing()
64            .execute(conn)
65            .await?)
66    }
67}