sui_indexer_alt/handlers/
kv_objects.rs1use std::sync::Arc;
5
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10 pipeline::Processor,
11 postgres::{Connection, handler::Handler},
12 types::full_checkpoint_content::Checkpoint,
13};
14use sui_indexer_alt_schema::{objects::StoredObject, schema::kv_objects};
15
16pub(crate) struct KvObjects;
17
18#[async_trait]
19impl Processor for KvObjects {
20 const NAME: &'static str = "kv_objects";
21 type Value = StoredObject;
22
23 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
24 let deleted_objects = checkpoint
25 .eventually_removed_object_refs_post_version()
26 .into_iter()
27 .map(|(id, version, _)| {
28 Ok(StoredObject {
29 object_id: id.to_vec(),
30 object_version: version.value() as i64,
31 serialized_object: None,
32 })
33 });
34
35 let created_objects = checkpoint.transactions.iter().flat_map(|txn| {
36 txn.output_objects(&checkpoint.object_set).map(|o| {
37 let id = o.id();
38 let version = o.version();
39 Ok(StoredObject {
40 object_id: id.to_vec(),
41 object_version: version.value() as i64,
42 serialized_object: Some(bcs::to_bytes(o).with_context(|| {
43 format!("Serializing object {id} version {}", version.value())
44 })?),
45 })
46 })
47 });
48
49 deleted_objects
50 .chain(created_objects)
51 .collect::<Result<Vec<_>, _>>()
52 }
53}
54
55#[async_trait]
56impl Handler for KvObjects {
57 const MIN_EAGER_ROWS: usize = 100;
58 const MAX_PENDING_ROWS: usize = 10000;
59
60 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
61 Ok(diesel::insert_into(kv_objects::table)
62 .values(values)
63 .on_conflict_do_nothing()
64 .execute(conn)
65 .await?)
66 }
67}