sui_indexer_alt/handlers/
sum_displays.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::BTreeMap, sync::Arc};
5
6use anyhow::{Result, anyhow};
7use async_trait::async_trait;
8use diesel::{ExpressionMethods, upsert::excluded};
9use diesel_async::RunQueryDsl;
10use futures::future::try_join_all;
11use sui_indexer_alt_framework::{
12    FieldCount,
13    pipeline::{Processor, sequential::Handler},
14    postgres::{Connection, Db},
15    types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData},
16};
17use sui_indexer_alt_schema::{displays::StoredDisplay, schema::sum_displays};
18
19const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT;
20
21pub(crate) struct SumDisplays;
22
23#[async_trait]
24impl Processor for SumDisplays {
25    const NAME: &'static str = "sum_displays";
26
27    type Value = StoredDisplay;
28
29    async fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
30        let CheckpointData { transactions, .. } = checkpoint.as_ref();
31
32        let mut values = vec![];
33        for tx in transactions {
34            let Some(events) = &tx.events else {
35                continue;
36            };
37
38            for event in &events.data {
39                let Some((object_type, update)) = DisplayVersionUpdatedEvent::try_from_event(event)
40                else {
41                    continue;
42                };
43
44                values.push(StoredDisplay {
45                    object_type: bcs::to_bytes(&object_type).map_err(|e| {
46                        anyhow!(
47                            "Error serializing object type {}: {e}",
48                            object_type.to_canonical_display(/* with_prefix */ true)
49                        )
50                    })?,
51
52                    display_id: update.id.bytes.to_vec(),
53                    display_version: update.version as i16,
54                    display: event.contents.clone(),
55                })
56            }
57        }
58
59        Ok(values)
60    }
61}
62
63#[async_trait]
64impl Handler for SumDisplays {
65    type Store = Db;
66    type Batch = BTreeMap<Vec<u8>, Self::Value>;
67
68    fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
69        for value in values {
70            batch.insert(value.object_type.clone(), value);
71        }
72    }
73
74    async fn commit<'a>(batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
75        let values: Vec<_> = batch.values().cloned().collect();
76        let updates = values
77            .chunks(MAX_INSERT_CHUNK_ROWS)
78            .map(|chunk: &[StoredDisplay]| {
79                diesel::insert_into(sum_displays::table)
80                    .values(chunk)
81                    .on_conflict(sum_displays::object_type)
82                    .do_update()
83                    .set((
84                        sum_displays::display_id.eq(excluded(sum_displays::display_id)),
85                        sum_displays::display_version.eq(excluded(sum_displays::display_version)),
86                        sum_displays::display.eq(excluded(sum_displays::display)),
87                    ))
88                    .execute(conn)
89            });
90
91        Ok(try_join_all(updates).await?.into_iter().sum())
92    }
93}