sui_indexer/backfill/backfill_instances/ingestion_backfills/
tx_affected_objects.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::backfill_instances::ingestion_backfills::IngestionBackfillTrait;
use crate::database::ConnectionPool;
use crate::models::tx_indices::StoredTxAffectedObjects;
use crate::schema::tx_affected_objects;
use diesel_async::RunQueryDsl;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::full_checkpoint_content::CheckpointData;

pub struct TxAffectedObjectsBackfill;

#[async_trait::async_trait]
impl IngestionBackfillTrait for TxAffectedObjectsBackfill {
    type ProcessedType = StoredTxAffectedObjects;

    fn process_checkpoint(checkpoint: &CheckpointData) -> Vec<Self::ProcessedType> {
        let first_tx = checkpoint.checkpoint_summary.network_total_transactions as usize
            - checkpoint.transactions.len();

        checkpoint
            .transactions
            .iter()
            .enumerate()
            .flat_map(|(i, tx)| {
                tx.effects
                    .object_changes()
                    .into_iter()
                    .map(move |change| StoredTxAffectedObjects {
                        tx_sequence_number: (first_tx + i) as i64,
                        affected: change.id.to_vec(),
                        sender: tx.transaction.sender_address().to_vec(),
                    })
            })
            .collect()
    }

    async fn commit_chunk(pool: ConnectionPool, processed_data: Vec<Self::ProcessedType>) {
        let mut conn = pool.get().await.unwrap();
        diesel::insert_into(tx_affected_objects::table)
            .values(processed_data)
            .on_conflict_do_nothing()
            .execute(&mut conn)
            .await
            .unwrap();
    }
}