sui_indexer_alt/handlers/
kv_objects.rs1use std::sync::Arc;
5
6use anyhow::Context;
7use anyhow::Result;
8use async_trait::async_trait;
9use diesel_async::RunQueryDsl;
10use sui_indexer_alt_framework::pipeline::Processor;
11use sui_indexer_alt_framework::postgres::Connection;
12use sui_indexer_alt_framework::postgres::handler::Handler;
13use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
14use sui_indexer_alt_schema::objects::StoredObject;
15use sui_indexer_alt_schema::schema::kv_objects;
16
17pub(crate) struct KvObjects;
18
19#[async_trait]
20impl Processor for KvObjects {
21 const NAME: &'static str = "kv_objects";
22 type Value = StoredObject;
23
24 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
25 let deleted_objects = checkpoint
26 .eventually_removed_object_refs_post_version()
27 .into_iter()
28 .map(|(id, version, _)| {
29 Ok(StoredObject {
30 object_id: id.to_vec(),
31 object_version: version.value() as i64,
32 serialized_object: None,
33 })
34 });
35
36 let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
37 txn.output_objects(&checkpoint.object_set).map(|o| {
38 let id = o.id();
39 let version = o.version();
40 Ok(StoredObject {
41 object_id: id.to_vec(),
42 object_version: version.value() as i64,
43 serialized_object: Some(bcs::to_bytes(o).with_context(|| {
44 format!("Serializing object {id} version {}", version.value())
45 })?),
46 })
47 })
48 });
49
50 deleted_objects
51 .chain(created_objects)
52 .collect::<Result<Vec<_>, _>>()
53 }
54}
55
56#[async_trait]
57impl Handler for KvObjects {
58 const MIN_EAGER_ROWS: usize = 100;
59 const MAX_PENDING_ROWS: usize = 10000;
60
61 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
62 Ok(diesel::insert_into(kv_objects::table)
63 .values(values)
64 .on_conflict_do_nothing()
65 .execute(conn)
66 .await?)
67 }
68}