sui_indexer_alt/handlers/
obj_versions.rs1use std::sync::Arc;
5
6use anyhow::Result;
7use async_trait::async_trait;
8use diesel_async::RunQueryDsl;
9use sui_indexer_alt_framework::pipeline::Processor;
10use sui_indexer_alt_framework::postgres::Connection;
11use sui_indexer_alt_framework::postgres::handler::Handler;
12use sui_indexer_alt_framework::types::effects::TransactionEffectsAPI;
13use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
14use sui_indexer_alt_schema::objects::StoredObjVersion;
15use sui_indexer_alt_schema::schema::obj_versions;
16
17pub(crate) struct ObjVersions;
18
19#[async_trait]
20impl Processor for ObjVersions {
21 const NAME: &'static str = "obj_versions";
22 type Value = StoredObjVersion;
23
24 async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
25 let Checkpoint {
26 transactions,
27 summary,
28 ..
29 } = checkpoint.as_ref();
30
31 let cp_sequence_number = summary.sequence_number as i64;
32 Ok(transactions
33 .iter()
34 .flat_map(|tx| {
35 let lamport = tx.effects.lamport_version();
36
37 tx.effects
38 .object_changes()
39 .into_iter()
40 .map(move |c| StoredObjVersion {
41 object_id: c.id.to_vec(),
42 object_version: c.output_version.unwrap_or(lamport).value() as i64,
46 object_digest: c.output_digest.map(|d| d.inner().into()),
47 cp_sequence_number,
48 })
49 })
50 .collect())
51 }
52}
53
54#[async_trait]
55impl Handler for ObjVersions {
56 const MIN_EAGER_ROWS: usize = 100;
57 const MAX_PENDING_ROWS: usize = 10000;
58
59 async fn commit<'a>(values: &[Self::Value], conn: &mut Connection<'a>) -> Result<usize> {
60 Ok(diesel::insert_into(obj_versions::table)
61 .values(values)
62 .on_conflict_do_nothing()
63 .execute(conn)
64 .await?)
65 }
66}