sui_indexer_alt/handlers/
kv_objects.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{Context, Result};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::{
    db::{Connection, Db},
    pipeline::{concurrent::Handler, Processor},
    types::full_checkpoint_content::CheckpointData,
};
use sui_indexer_alt_schema::{objects::StoredObject, schema::kv_objects};

pub(crate) struct KvObjects;

impl Processor for KvObjects {
    const NAME: &'static str = "kv_objects";
    type Value = StoredObject;

    fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
        let deleted_objects = checkpoint
            .eventually_removed_object_refs_post_version()
            .into_iter()
            .map(|(id, version, _)| {
                Ok(StoredObject {
                    object_id: id.to_vec(),
                    object_version: version.value() as i64,
                    serialized_object: None,
                })
            });

        let created_objects =
            checkpoint
                .transactions
                .iter()
                .flat_map(|txn| txn.output_objects.iter())
                .map(|o| {
                    let id = o.id();
                    let version = o.version().value();
                    Ok(StoredObject {
                        object_id: id.to_vec(),
                        object_version: version as i64,
                        serialized_object: Some(bcs::to_bytes(o).with_context(|| {
                            format!("Serializing object {id} version {version}")
                        })?),
                    })
                });

        deleted_objects
            .chain(created_objects)
            .collect::<Result<Vec<_>, _>>()
    }
}

#[async_trait::async_trait]
impl Handler for KvObjects {
    type Store = Db;

    const MIN_EAGER_ROWS: usize = 100;
    const MAX_PENDING_ROWS: usize = 10000;

    async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
        Ok(diesel::insert_into(kv_objects::table)
            .values(values)
            .on_conflict_do_nothing()
            .execute(conn)
            .await?)
    }
}