sui_indexer_alt/handlers/
sum_displays.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use anyhow::Result;
8use anyhow::anyhow;
9use async_trait::async_trait;
10use diesel::ExpressionMethods;
11use diesel::upsert::excluded;
12use diesel_async::RunQueryDsl;
13use futures::future::try_join_all;
14use sui_indexer_alt_framework::FieldCount;
15use sui_indexer_alt_framework::pipeline::Processor;
16use sui_indexer_alt_framework::pipeline::sequential::Handler;
17use sui_indexer_alt_framework::postgres::Connection;
18use sui_indexer_alt_framework::postgres::Db;
19use sui_indexer_alt_framework::types::display::DisplayVersionUpdatedEvent;
20use sui_indexer_alt_framework::types::full_checkpoint_content::Checkpoint;
21use sui_indexer_alt_schema::displays::StoredDisplay;
22use sui_indexer_alt_schema::schema::sum_displays;
23
24const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT;
25
26pub(crate) struct SumDisplays;
27
28#[async_trait]
29impl Processor for SumDisplays {
30    const NAME: &'static str = "sum_displays";
31
32    type Value = StoredDisplay;
33
34    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
35        let Checkpoint { transactions, .. } = checkpoint.as_ref();
36
37        let mut values = vec![];
38        for tx in transactions {
39            let Some(events) = &tx.events else {
40                continue;
41            };
42
43            for event in &events.data {
44                let Some((object_type, update)) = DisplayVersionUpdatedEvent::try_from_event(event)
45                else {
46                    continue;
47                };
48
49                values.push(StoredDisplay {
50                    object_type: bcs::to_bytes(&object_type).map_err(|e| {
51                        anyhow!(
52                            "Error serializing object type {}: {e}",
53                            object_type.to_canonical_display(/* with_prefix */ true)
54                        )
55                    })?,
56
57                    display_id: update.id.bytes.to_vec(),
58                    display_version: update.version as i16,
59                    display: event.contents.clone(),
60                })
61            }
62        }
63
64        Ok(values)
65    }
66}
67
68#[async_trait]
69impl Handler for SumDisplays {
70    type Store = Db;
71    type Batch = BTreeMap<Vec<u8>, Self::Value>;
72
73    fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>) {
74        for value in values {
75            batch.insert(value.object_type.clone(), value);
76        }
77    }
78
79    async fn commit<'a>(&self, batch: &Self::Batch, conn: &mut Connection<'a>) -> Result<usize> {
80        let values: Vec<_> = batch.values().cloned().collect();
81        let updates = values
82            .chunks(MAX_INSERT_CHUNK_ROWS)
83            .map(|chunk: &[StoredDisplay]| {
84                diesel::insert_into(sum_displays::table)
85                    .values(chunk)
86                    .on_conflict(sum_displays::object_type)
87                    .do_update()
88                    .set((
89                        sum_displays::display_id.eq(excluded(sum_displays::display_id)),
90                        sum_displays::display_version.eq(excluded(sum_displays::display_version)),
91                        sum_displays::display.eq(excluded(sum_displays::display)),
92                    ))
93                    .execute(conn)
94            });
95
96        Ok(try_join_all(updates).await?.into_iter().sum())
97    }
98}