sui_indexer_alt/handlers/
kv_objects.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use anyhow::Context;
7use anyhow::Result;
8use async_trait::async_trait;
9use diesel_async::RunQueryDsl;
10use sui_indexer_alt_framework::pipeline::Processor;
11use sui_indexer_alt_framework::postgres::Connection;
12use sui_indexer_alt_framework::postgres::handler::Handler;
13use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
14use sui_indexer_alt_schema::objects::StoredObject;
15use sui_indexer_alt_schema::schema::kv_objects;
16
17pub(crate) struct KvObjects;
18
19#[async_trait]
20impl Processor for KvObjects {
21    const NAME: &'static str = "kv_objects";
22    type Value = StoredObject;
23
24    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
25        let deleted_objects = checkpoint
26            .eventually_removed_object_refs_post_version()
27            .into_iter()
28            .map(|(id, version, _)| {
29                Ok(StoredObject {
30                    object_id: id.to_vec(),
31                    object_version: version.value() as i64,
32                    serialized_object: None,
33                })
34            });
35
36        let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
37            txn.output_objects(&checkpoint.object_set).map(|o| {
38                let id = o.id();
39                let version = o.version();
40                Ok(StoredObject {
41                    object_id: id.to_vec(),
42                    object_version: version.value() as i64,
43                    serialized_object: Some(bcs::to_bytes(o).with_context(|| {
44                        format!("Serializing object {id} version {}", version.value())
45                    })?),
46                })
47            })
48        });
49
50        deleted_objects
51            .chain(created_objects)
52            .collect::<Result<Vec<_>, _>>()
53    }
54}
55
56#[async_trait]
57impl Handler for KvObjects {
58    const MIN_EAGER_ROWS: usize = 100;
59    const MAX_PENDING_ROWS: usize = 10000;
60
61    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
62        Ok(diesel::insert_into(kv_objects::table)
63            .values(values)
64            .on_conflict_do_nothing()
65            .execute(conn)
66            .await?)
67    }
68}