sui_indexer_alt/handlers/
obj_versions.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::{
10 pipeline::Processor,
11 postgres::{Connection, handler::Handler},
12 types::{effects::TransactionEffectsAPI, full_checkpoint_content::Checkpoint},
13};
14use sui_indexer_alt_schema::{objects::StoredObjVersion, schema::obj_versions};
15
16pub(crate) struct ObjVersions;
17
18#[async_trait]
19impl Processor for ObjVersions {
20 const NAME: &'static str = "obj_versions";
21 type Value = StoredObjVersion;
22
23 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
24 let Checkpoint {
25 transactions,
26 summary,
27 ..
28 } = checkpoint.as_ref();
29
30 let cp_sequence_number = summary.sequence_number as i64;
31 Ok(transactions
32 .iter()
33 .flat_map(|tx| {
34 let lamport = tx.effects.lamport_version();
35
36 tx.effects
37 .object_changes()
38 .into_iter()
39 .map(move |c| StoredObjVersion {
40 object_id: c.id.to_vec(),
41 object_version: c.output_version.unwrap_or(lamport).value() as i64,
45 object_digest: c.output_digest.map(|d| d.inner().into()),
46 cp_sequence_number,
47 })
48 })
49 .collect())
50 }
51}
52
53#[async_trait]
54impl Handler for ObjVersions {
55 const MIN_EAGER_ROWS: usize = 100;
56 const MAX_PENDING_ROWS: usize = 10000;
57
58 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
59 Ok(diesel::insert_into(obj_versions::table)
60 .values(values)
61 .on_conflict_do_nothing()
62 .execute(conn)
63 .await?)
64 }
65}