sui_indexer/store/
pg_indexer_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::collections::{BTreeMap, HashMap};
5use std::io::Cursor;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use core::result::Result::Ok;
10use csv::{ReaderBuilder, Writer};
11use diesel::ExpressionMethods;
12use diesel::OptionalExtension;
13use diesel::QueryDsl;
14use diesel::dsl::{max, min};
15use diesel_async::scoped_futures::ScopedFutureExt;
16use futures::future::Either;
17use itertools::Itertools;
18use object_store::path::Path;
19use strum::IntoEnumIterator;
20use sui_types::base_types::ObjectID;
21use tap::TapFallible;
22use tracing::{info, warn};
23
24use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
25use sui_protocol_config::ProtocolConfig;
26use sui_storage::object_store::util::put;
27
28use crate::config::UploadOptions;
29use crate::database::ConnectionPool;
30use crate::errors::{Context, IndexerError};
31use crate::handlers::TransactionObjectChangesToCommit;
32use crate::handlers::pruner::PrunableTable;
33use crate::handlers::{CommitterWatermark, EpochToCommit};
34use crate::metrics::IndexerMetrics;
35use crate::models::checkpoints::StoredChainIdentifier;
36use crate::models::checkpoints::StoredCheckpoint;
37use crate::models::checkpoints::StoredCpTx;
38use crate::models::display::StoredDisplay;
39use crate::models::epoch::StoredEpochInfo;
40use crate::models::epoch::{StoredFeatureFlag, StoredProtocolConfig};
41use crate::models::events::StoredEvent;
42use crate::models::obj_indices::StoredObjectVersion;
43use crate::models::objects::{
44    StoredDeletedObject, StoredFullHistoryObject, StoredHistoryObject, StoredObject,
45    StoredObjectSnapshot,
46};
47use crate::models::packages::StoredPackage;
48use crate::models::transactions::StoredTransaction;
49use crate::models::watermarks::StoredWatermark;
50use crate::schema::{
51    chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
52    event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
53    event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
54    objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
55    raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun,
56    tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
57    watermarks,
58};
59use crate::store::{read_with_retry, transaction_with_retry};
60use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject};
61use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex};
62
63use super::IndexerStore;
64use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
65
66use crate::models::raw_checkpoints::StoredRawCheckpoint;
67use diesel::upsert::excluded;
68use sui_types::digests::{ChainIdentifier, CheckpointDigest};
69
70#[macro_export]
71macro_rules! chunk {
72    ($data: expr, $size: expr) => {{
73        $data
74            .into_iter()
75            .chunks($size)
76            .into_iter()
77            .map(|c| c.collect())
78            .collect::<Vec<Vec<_>>>()
79    }};
80}
81
82// In one DB transaction, the update could be chunked into
83// a few statements, this is the amount of rows to update in one statement
84// TODO: I think with the `per_db_tx` params, `PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX`
85// is now less relevant. We should do experiments and remove it if it's true.
86const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
87// The amount of rows to update in one DB transaction
88const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
89// The amount of rows to update in one DB transaction, for objects particularly
90// Having this number too high may cause many db deadlocks because of
91// optimistic locking.
92const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
93const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
94
95#[derive(Clone)]
96pub struct PgIndexerStoreConfig {
97    pub parallel_chunk_size: usize,
98    pub parallel_objects_chunk_size: usize,
99    pub gcs_cred_path: Option<String>,
100    pub gcs_display_bucket: Option<String>,
101}
102
103#[derive(Clone)]
104pub struct PgIndexerStore {
105    pool: ConnectionPool,
106    metrics: IndexerMetrics,
107    partition_manager: PgPartitionManager,
108    config: PgIndexerStoreConfig,
109}
110
111impl PgIndexerStore {
112    pub fn new(
113        pool: ConnectionPool,
114        upload_options: UploadOptions,
115        metrics: IndexerMetrics,
116    ) -> Self {
117        let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
118            .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
119            .parse::<usize>()
120            .unwrap();
121        let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
122            .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
123            .parse::<usize>()
124            .unwrap();
125        let partition_manager =
126            PgPartitionManager::new(pool.clone()).expect("Failed to initialize partition manager");
127        let config = PgIndexerStoreConfig {
128            parallel_chunk_size,
129            parallel_objects_chunk_size,
130            gcs_cred_path: upload_options.gcs_cred_path,
131            gcs_display_bucket: upload_options.gcs_display_bucket,
132        };
133
134        Self {
135            pool,
136            metrics,
137            partition_manager,
138            config,
139        }
140    }
141
142    pub fn pool(&self) -> ConnectionPool {
143        self.pool.clone()
144    }
145
146    /// Get the range of the protocol versions that need to be indexed.
147    pub async fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
148        use diesel_async::RunQueryDsl;
149
150        let mut connection = self.pool.get().await?;
151        // We start indexing from the next protocol version after the latest one stored in the db.
152        let start = protocol_configs::table
153            .select(max(protocol_configs::protocol_version))
154            .first::<Option<i64>>(&mut connection)
155            .await
156            .map_err(Into::into)
157            .context("Failed reading latest protocol version from PostgresDB")?
158            .map_or(1, |v| v + 1);
159
160        // We end indexing at the protocol version of the latest epoch stored in the db.
161        let end = epochs::table
162            .select(max(epochs::protocol_version))
163            .first::<Option<i64>>(&mut connection)
164            .await
165            .map_err(Into::into)
166            .context("Failed reading latest epoch protocol version from PostgresDB")?
167            .unwrap_or(1);
168        Ok((start, end))
169    }
170
171    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
172        use diesel_async::RunQueryDsl;
173
174        let mut connection = self.pool.get().await?;
175
176        chain_identifier::table
177            .select(chain_identifier::checkpoint_digest)
178            .first::<Vec<u8>>(&mut connection)
179            .await
180            .optional()
181            .map_err(Into::into)
182            .context("Failed reading chain id from PostgresDB")
183    }
184
185    // `pub` is needed for wait_for_checkpoint in tests
186    pub async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
187        use diesel_async::RunQueryDsl;
188
189        let mut connection = self.pool.get().await?;
190
191        checkpoints::table
192            .select(max(checkpoints::sequence_number))
193            .first::<Option<i64>>(&mut connection)
194            .await
195            .map_err(Into::into)
196            .map(|v| v.map(|v| v as u64))
197            .context("Failed reading latest checkpoint sequence number from PostgresDB")
198    }
199
200    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
201        use diesel_async::RunQueryDsl;
202
203        let mut connection = self.pool.get().await?;
204
205        checkpoints::table
206            .select((
207                min(checkpoints::sequence_number),
208                max(checkpoints::sequence_number),
209            ))
210            .first::<(Option<i64>, Option<i64>)>(&mut connection)
211            .await
212            .map_err(Into::into)
213            .map(|(min, max)| {
214                (
215                    min.unwrap_or_default() as u64,
216                    max.unwrap_or_default() as u64,
217                )
218            })
219            .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
220    }
221
222    async fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
223        use diesel_async::RunQueryDsl;
224
225        let mut connection = self.pool.get().await?;
226
227        epochs::table
228            .select((min(epochs::epoch), max(epochs::epoch)))
229            .first::<(Option<i64>, Option<i64>)>(&mut connection)
230            .await
231            .map_err(Into::into)
232            .map(|(min, max)| {
233                (
234                    min.unwrap_or_default() as u64,
235                    max.unwrap_or_default() as u64,
236                )
237            })
238            .context("Failed reading min and max epoch numbers from PostgresDB")
239    }
240
241    async fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
242        use diesel_async::RunQueryDsl;
243
244        let mut connection = self.pool.get().await?;
245
246        pruner_cp_watermark::table
247            .select(min(pruner_cp_watermark::checkpoint_sequence_number))
248            .first::<Option<i64>>(&mut connection)
249            .await
250            .map_err(Into::into)
251            .map(|v| v.unwrap_or_default() as u64)
252            .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
253    }
254
255    pub async fn get_checkpoint_range_for_epoch(
256        &self,
257        epoch: u64,
258    ) -> Result<(u64, Option<u64>), IndexerError> {
259        use diesel_async::RunQueryDsl;
260
261        let mut connection = self.pool.get().await?;
262
263        epochs::table
264            .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
265            .filter(epochs::epoch.eq(epoch as i64))
266            .first::<(i64, Option<i64>)>(&mut connection)
267            .await
268            .map_err(Into::into)
269            .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
270            .context("Failed reading checkpoint range from PostgresDB")
271    }
272
273    pub async fn get_transaction_range_for_checkpoint(
274        &self,
275        checkpoint: u64,
276    ) -> Result<(u64, u64), IndexerError> {
277        use diesel_async::RunQueryDsl;
278
279        let mut connection = self.pool.get().await?;
280
281        pruner_cp_watermark::table
282            .select((
283                pruner_cp_watermark::min_tx_sequence_number,
284                pruner_cp_watermark::max_tx_sequence_number,
285            ))
286            .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
287            .first::<(i64, i64)>(&mut connection)
288            .await
289            .map_err(Into::into)
290            .map(|(min, max)| (min as u64, max as u64))
291            .context("Failed reading transaction range from PostgresDB")
292    }
293
294    pub async fn get_latest_object_snapshot_checkpoint_sequence_number(
295        &self,
296    ) -> Result<Option<u64>, IndexerError> {
297        use diesel_async::RunQueryDsl;
298
299        let mut connection = self.pool.get().await?;
300
301        objects_snapshot::table
302            .select(max(objects_snapshot::checkpoint_sequence_number))
303            .first::<Option<i64>>(&mut connection)
304            .await
305            .map_err(Into::into)
306            .map(|v| v.map(|v| v as u64))
307            .context(
308                "Failed reading latest object snapshot checkpoint sequence number from PostgresDB",
309            )
310    }
311
312    async fn persist_display_updates(
313        &self,
314        display_updates: Vec<StoredDisplay>,
315    ) -> Result<(), IndexerError> {
316        use diesel_async::RunQueryDsl;
317
318        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
319            async {
320                diesel::insert_into(display::table)
321                    .values(display_updates)
322                    .on_conflict(display::object_type)
323                    .do_update()
324                    .set((
325                        display::id.eq(excluded(display::id)),
326                        display::version.eq(excluded(display::version)),
327                        display::bcs.eq(excluded(display::bcs)),
328                    ))
329                    .execute(conn)
330                    .await?;
331
332                Ok::<(), IndexerError>(())
333            }
334            .scope_boxed()
335        })
336        .await?;
337
338        Ok(())
339    }
340
341    async fn persist_object_mutation_chunk(
342        &self,
343        mutated_object_mutation_chunk: Vec<StoredObject>,
344    ) -> Result<(), IndexerError> {
345        use diesel_async::RunQueryDsl;
346
347        let guard = self
348            .metrics
349            .checkpoint_db_commit_latency_objects_chunks
350            .start_timer();
351        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
352            async {
353                diesel::insert_into(objects::table)
354                    .values(mutated_object_mutation_chunk.clone())
355                    .on_conflict(objects::object_id)
356                    .do_update()
357                    .set((
358                        objects::object_id.eq(excluded(objects::object_id)),
359                        objects::object_version.eq(excluded(objects::object_version)),
360                        objects::object_digest.eq(excluded(objects::object_digest)),
361                        objects::owner_type.eq(excluded(objects::owner_type)),
362                        objects::owner_id.eq(excluded(objects::owner_id)),
363                        objects::object_type.eq(excluded(objects::object_type)),
364                        objects::serialized_object.eq(excluded(objects::serialized_object)),
365                        objects::coin_type.eq(excluded(objects::coin_type)),
366                        objects::coin_balance.eq(excluded(objects::coin_balance)),
367                        objects::df_kind.eq(excluded(objects::df_kind)),
368                    ))
369                    .execute(conn)
370                    .await?;
371                Ok::<(), IndexerError>(())
372            }
373            .scope_boxed()
374        })
375        .await
376        .tap_ok(|_| {
377            guard.stop_and_record();
378        })
379        .tap_err(|e| {
380            tracing::error!("Failed to persist object mutations with error: {}", e);
381        })
382    }
383
384    async fn persist_object_deletion_chunk(
385        &self,
386        deleted_objects_chunk: Vec<StoredDeletedObject>,
387    ) -> Result<(), IndexerError> {
388        use diesel_async::RunQueryDsl;
389        let guard = self
390            .metrics
391            .checkpoint_db_commit_latency_objects_chunks
392            .start_timer();
393        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
394            async {
395                diesel::delete(
396                    objects::table.filter(
397                        objects::object_id.eq_any(
398                            deleted_objects_chunk
399                                .iter()
400                                .map(|o| o.object_id.clone())
401                                .collect::<Vec<_>>(),
402                        ),
403                    ),
404                )
405                .execute(conn)
406                .await
407                .map_err(IndexerError::from)
408                .context("Failed to write object deletion to PostgresDB")?;
409
410                Ok::<(), IndexerError>(())
411            }
412            .scope_boxed()
413        })
414        .await
415        .tap_ok(|_| {
416            guard.stop_and_record();
417        })
418        .tap_err(|e| {
419            tracing::error!("Failed to persist object deletions with error: {}", e);
420        })
421    }
422
423    async fn persist_object_snapshot_mutation_chunk(
424        &self,
425        objects_snapshot_mutations: Vec<StoredObjectSnapshot>,
426    ) -> Result<(), IndexerError> {
427        use diesel_async::RunQueryDsl;
428        let guard = self
429            .metrics
430            .checkpoint_db_commit_latency_objects_snapshot_chunks
431            .start_timer();
432        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
433            async {
434                for mutation_chunk in
435                    objects_snapshot_mutations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
436                {
437                    diesel::insert_into(objects_snapshot::table)
438                        .values(mutation_chunk)
439                        .on_conflict(objects_snapshot::object_id)
440                        .do_update()
441                        .set((
442                            objects_snapshot::object_version
443                                .eq(excluded(objects_snapshot::object_version)),
444                            objects_snapshot::object_status
445                                .eq(excluded(objects_snapshot::object_status)),
446                            objects_snapshot::object_digest
447                                .eq(excluded(objects_snapshot::object_digest)),
448                            objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
449                            objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
450                            objects_snapshot::object_type_package
451                                .eq(excluded(objects_snapshot::object_type_package)),
452                            objects_snapshot::object_type_module
453                                .eq(excluded(objects_snapshot::object_type_module)),
454                            objects_snapshot::object_type_name
455                                .eq(excluded(objects_snapshot::object_type_name)),
456                            objects_snapshot::object_type
457                                .eq(excluded(objects_snapshot::object_type)),
458                            objects_snapshot::serialized_object
459                                .eq(excluded(objects_snapshot::serialized_object)),
460                            objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
461                            objects_snapshot::coin_balance
462                                .eq(excluded(objects_snapshot::coin_balance)),
463                            objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
464                            objects_snapshot::checkpoint_sequence_number
465                                .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
466                        ))
467                        .execute(conn)
468                        .await?;
469                }
470                Ok::<(), IndexerError>(())
471            }
472            .scope_boxed()
473        })
474        .await
475        .tap_ok(|_| {
476            guard.stop_and_record();
477        })
478        .tap_err(|e| {
479            tracing::error!("Failed to persist object snapshot with error: {}", e);
480        })
481    }
482
483    async fn persist_object_snapshot_deletion_chunk(
484        &self,
485        objects_snapshot_deletions: Vec<StoredObjectSnapshot>,
486    ) -> Result<(), IndexerError> {
487        use diesel_async::RunQueryDsl;
488        let guard = self
489            .metrics
490            .checkpoint_db_commit_latency_objects_snapshot_chunks
491            .start_timer();
492
493        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
494            async {
495                for deletion_chunk in
496                    objects_snapshot_deletions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
497                {
498                    diesel::delete(
499                        objects_snapshot::table.filter(
500                            objects_snapshot::object_id.eq_any(
501                                deletion_chunk
502                                    .iter()
503                                    .map(|o| o.object_id.clone())
504                                    .collect::<Vec<_>>(),
505                            ),
506                        ),
507                    )
508                    .execute(conn)
509                    .await
510                    .map_err(IndexerError::from)
511                    .context("Failed to write object deletion to PostgresDB")?;
512                }
513                Ok::<(), IndexerError>(())
514            }
515            .scope_boxed()
516        })
517        .await
518        .tap_ok(|_| {
519            let elapsed = guard.stop_and_record();
520            info!(
521                elapsed,
522                "Deleted {} chunked object snapshots",
523                objects_snapshot_deletions.len(),
524            );
525        })
526        .tap_err(|e| {
527            tracing::error!(
528                "Failed to persist object snapshot deletions with error: {}",
529                e
530            );
531        })
532    }
533
534    async fn persist_objects_history_chunk(
535        &self,
536        stored_objects_history: Vec<StoredHistoryObject>,
537    ) -> Result<(), IndexerError> {
538        use diesel_async::RunQueryDsl;
539        let guard = self
540            .metrics
541            .checkpoint_db_commit_latency_objects_history_chunks
542            .start_timer();
543        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
544            async {
545                for stored_objects_history_chunk in
546                    stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
547                {
548                    let error_message = concat!(
549                        "Failed to write to ",
550                        stringify!((objects_history::table)),
551                        " DB"
552                    );
553                    diesel::insert_into(objects_history::table)
554                        .values(stored_objects_history_chunk)
555                        .on_conflict_do_nothing()
556                        .execute(conn)
557                        .await
558                        .map_err(IndexerError::from)
559                        .context(error_message)?;
560                }
561                Ok::<(), IndexerError>(())
562            }
563            .scope_boxed()
564        })
565        .await
566        .tap_ok(|_| {
567            guard.stop_and_record();
568        })
569        .tap_err(|e| {
570            tracing::error!("Failed to persist object history with error: {}", e);
571        })
572    }
573
574    async fn persist_full_objects_history_chunk(
575        &self,
576        objects: Vec<StoredFullHistoryObject>,
577    ) -> Result<(), IndexerError> {
578        use diesel_async::RunQueryDsl;
579
580        let guard = self
581            .metrics
582            .checkpoint_db_commit_latency_full_objects_history_chunks
583            .start_timer();
584
585        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
586            async {
587                for objects_chunk in objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
588                    diesel::insert_into(full_objects_history::table)
589                        .values(objects_chunk)
590                        .on_conflict_do_nothing()
591                        .execute(conn)
592                        .await
593                        .map_err(IndexerError::from)
594                        .context("Failed to write to full_objects_history table")?;
595                }
596
597                Ok(())
598            }
599            .scope_boxed()
600        })
601        .await
602        .tap_ok(|_| {
603            let elapsed = guard.stop_and_record();
604            info!(
605                elapsed,
606                "Persisted {} chunked full objects history",
607                objects.len(),
608            );
609        })
610        .tap_err(|e| {
611            tracing::error!("Failed to persist full object history with error: {}", e);
612        })
613    }
614
615    async fn persist_objects_version_chunk(
616        &self,
617        object_versions: Vec<StoredObjectVersion>,
618    ) -> Result<(), IndexerError> {
619        use diesel_async::RunQueryDsl;
620
621        let guard = self
622            .metrics
623            .checkpoint_db_commit_latency_objects_version_chunks
624            .start_timer();
625
626        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
627            async {
628                for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
629                {
630                    diesel::insert_into(objects_version::table)
631                        .values(object_version_chunk)
632                        .on_conflict_do_nothing()
633                        .execute(conn)
634                        .await
635                        .map_err(IndexerError::from)
636                        .context("Failed to write to objects_version table")?;
637                }
638                Ok::<(), IndexerError>(())
639            }
640            .scope_boxed()
641        })
642        .await
643        .tap_ok(|_| {
644            let elapsed = guard.stop_and_record();
645            info!(
646                elapsed,
647                "Persisted {} chunked object versions",
648                object_versions.len(),
649            );
650        })
651        .tap_err(|e| {
652            tracing::error!("Failed to persist object versions with error: {}", e);
653        })
654    }
655
656    async fn persist_raw_checkpoints_impl(
657        &self,
658        raw_checkpoints: &[StoredRawCheckpoint],
659    ) -> Result<(), IndexerError> {
660        use diesel_async::RunQueryDsl;
661
662        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
663            async {
664                diesel::insert_into(raw_checkpoints::table)
665                    .values(raw_checkpoints)
666                    .on_conflict_do_nothing()
667                    .execute(conn)
668                    .await
669                    .map_err(IndexerError::from)
670                    .context("Failed to write to raw_checkpoints table")?;
671                Ok::<(), IndexerError>(())
672            }
673            .scope_boxed()
674        })
675        .await
676    }
677
678    async fn persist_checkpoints(
679        &self,
680        checkpoints: Vec<IndexedCheckpoint>,
681    ) -> Result<(), IndexerError> {
682        use diesel_async::RunQueryDsl;
683
684        let Some(first_checkpoint) = checkpoints.as_slice().first() else {
685            return Ok(());
686        };
687
688        // If the first checkpoint has sequence number 0, we need to persist the digest as
689        // chain identifier.
690        if first_checkpoint.sequence_number == 0 {
691            let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
692            self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())
693                .await?;
694            self.persist_chain_identifier(checkpoint_digest).await?;
695        }
696        let guard = self
697            .metrics
698            .checkpoint_db_commit_latency_checkpoints
699            .start_timer();
700
701        let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
702        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
703            async {
704                for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
705                    diesel::insert_into(pruner_cp_watermark::table)
706                        .values(stored_cp_tx_chunk)
707                        .on_conflict_do_nothing()
708                        .execute(conn)
709                        .await
710                        .map_err(IndexerError::from)
711                        .context("Failed to write to pruner_cp_watermark table")?;
712                }
713                Ok::<(), IndexerError>(())
714            }
715            .scope_boxed()
716        })
717        .await
718        .tap_ok(|_| {
719            info!(
720                "Persisted {} pruner_cp_watermark rows.",
721                stored_cp_txs.len(),
722            );
723        })
724        .tap_err(|e| {
725            tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
726        })?;
727
728        let stored_checkpoints = checkpoints
729            .iter()
730            .map(StoredCheckpoint::from)
731            .collect::<Vec<_>>();
732        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
733            async {
734                for stored_checkpoint_chunk in
735                    stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
736                {
737                    diesel::insert_into(checkpoints::table)
738                        .values(stored_checkpoint_chunk)
739                        .on_conflict_do_nothing()
740                        .execute(conn)
741                        .await
742                        .map_err(IndexerError::from)
743                        .context("Failed to write to checkpoints table")?;
744                    let time_now_ms = chrono::Utc::now().timestamp_millis();
745                    for stored_checkpoint in stored_checkpoint_chunk {
746                        self.metrics
747                            .db_commit_lag_ms
748                            .set(time_now_ms - stored_checkpoint.timestamp_ms);
749                        self.metrics
750                            .max_committed_checkpoint_sequence_number
751                            .set(stored_checkpoint.sequence_number);
752                        self.metrics
753                            .committed_checkpoint_timestamp_ms
754                            .set(stored_checkpoint.timestamp_ms);
755                    }
756
757                    for stored_checkpoint in stored_checkpoint_chunk {
758                        info!(
759                            "Indexer lag: \
760                            persisted checkpoint {} with time now {} and checkpoint time {}",
761                            stored_checkpoint.sequence_number,
762                            time_now_ms,
763                            stored_checkpoint.timestamp_ms
764                        );
765                    }
766                }
767                Ok::<(), IndexerError>(())
768            }
769            .scope_boxed()
770        })
771        .await
772        .tap_ok(|_| {
773            let elapsed = guard.stop_and_record();
774            info!(
775                elapsed,
776                "Persisted {} checkpoints",
777                stored_checkpoints.len()
778            );
779        })
780        .tap_err(|e| {
781            tracing::error!("Failed to persist checkpoints with error: {}", e);
782        })
783    }
784
785    async fn persist_transactions_chunk(
786        &self,
787        transactions: Vec<IndexedTransaction>,
788    ) -> Result<(), IndexerError> {
789        use diesel_async::RunQueryDsl;
790        let guard = self
791            .metrics
792            .checkpoint_db_commit_latency_transactions_chunks
793            .start_timer();
794        let transformation_guard = self
795            .metrics
796            .checkpoint_db_commit_latency_transactions_chunks_transformation
797            .start_timer();
798        let transactions = transactions
799            .iter()
800            .map(StoredTransaction::from)
801            .collect::<Vec<_>>();
802        drop(transformation_guard);
803
804        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
805            async {
806                for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
807                    let error_message = concat!(
808                        "Failed to write to ",
809                        stringify!((transactions::table)),
810                        " DB"
811                    );
812                    diesel::insert_into(transactions::table)
813                        .values(transaction_chunk)
814                        .on_conflict_do_nothing()
815                        .execute(conn)
816                        .await
817                        .map_err(IndexerError::from)
818                        .context(error_message)?;
819                }
820                Ok::<(), IndexerError>(())
821            }
822            .scope_boxed()
823        })
824        .await
825        .tap_ok(|_| {
826            let elapsed = guard.stop_and_record();
827            info!(
828                elapsed,
829                "Persisted {} chunked transactions",
830                transactions.len()
831            );
832        })
833        .tap_err(|e| {
834            tracing::error!("Failed to persist transactions with error: {}", e);
835        })
836    }
837
838    async fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
839        use diesel_async::RunQueryDsl;
840        let guard = self
841            .metrics
842            .checkpoint_db_commit_latency_events_chunks
843            .start_timer();
844        let len = events.len();
845        let events = events
846            .into_iter()
847            .map(StoredEvent::from)
848            .collect::<Vec<_>>();
849
850        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
851            async {
852                for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
853                    let error_message =
854                        concat!("Failed to write to ", stringify!((events::table)), " DB");
855                    diesel::insert_into(events::table)
856                        .values(event_chunk)
857                        .on_conflict_do_nothing()
858                        .execute(conn)
859                        .await
860                        .map_err(IndexerError::from)
861                        .context(error_message)?;
862                }
863                Ok::<(), IndexerError>(())
864            }
865            .scope_boxed()
866        })
867        .await
868        .tap_ok(|_| {
869            let elapsed = guard.stop_and_record();
870            info!(elapsed, "Persisted {} chunked events", len);
871        })
872        .tap_err(|e| {
873            tracing::error!("Failed to persist events with error: {}", e);
874        })
875    }
876
877    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
878        use diesel_async::RunQueryDsl;
879        if packages.is_empty() {
880            return Ok(());
881        }
882        let guard = self
883            .metrics
884            .checkpoint_db_commit_latency_packages
885            .start_timer();
886        let packages = packages
887            .into_iter()
888            .map(StoredPackage::from)
889            .collect::<Vec<_>>();
890        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
891            async {
892                for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
893                    diesel::insert_into(packages::table)
894                        .values(packages_chunk)
895                        .on_conflict(packages::package_id)
896                        .do_update()
897                        .set((
898                            packages::package_id.eq(excluded(packages::package_id)),
899                            packages::package_version.eq(excluded(packages::package_version)),
900                            packages::move_package.eq(excluded(packages::move_package)),
901                            packages::checkpoint_sequence_number
902                                .eq(excluded(packages::checkpoint_sequence_number)),
903                        ))
904                        .execute(conn)
905                        .await?;
906                }
907                Ok::<(), IndexerError>(())
908            }
909            .scope_boxed()
910        })
911        .await
912        .tap_ok(|_| {
913            let elapsed = guard.stop_and_record();
914            info!(elapsed, "Persisted {} packages", packages.len());
915        })
916        .tap_err(|e| {
917            tracing::error!("Failed to persist packages with error: {}", e);
918        })
919    }
920
921    async fn persist_event_indices_chunk(
922        &self,
923        indices: Vec<EventIndex>,
924    ) -> Result<(), IndexerError> {
925        use diesel_async::RunQueryDsl;
926
927        let guard = self
928            .metrics
929            .checkpoint_db_commit_latency_event_indices_chunks
930            .start_timer();
931        let len = indices.len();
932        let (
933            event_emit_packages,
934            event_emit_modules,
935            event_senders,
936            event_struct_packages,
937            event_struct_modules,
938            event_struct_names,
939            event_struct_instantiations,
940        ) = indices.into_iter().map(|i| i.split()).fold(
941            (
942                Vec::new(),
943                Vec::new(),
944                Vec::new(),
945                Vec::new(),
946                Vec::new(),
947                Vec::new(),
948                Vec::new(),
949            ),
950            |(
951                mut event_emit_packages,
952                mut event_emit_modules,
953                mut event_senders,
954                mut event_struct_packages,
955                mut event_struct_modules,
956                mut event_struct_names,
957                mut event_struct_instantiations,
958            ),
959             index| {
960                event_emit_packages.push(index.0);
961                event_emit_modules.push(index.1);
962                event_senders.push(index.2);
963                event_struct_packages.push(index.3);
964                event_struct_modules.push(index.4);
965                event_struct_names.push(index.5);
966                event_struct_instantiations.push(index.6);
967                (
968                    event_emit_packages,
969                    event_emit_modules,
970                    event_senders,
971                    event_struct_packages,
972                    event_struct_modules,
973                    event_struct_names,
974                    event_struct_instantiations,
975                )
976            },
977        );
978
979        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
980            async {
981                for event_emit_packages_chunk in
982                    event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
983                {
984                    diesel::insert_into(event_emit_package::table)
985                        .values(event_emit_packages_chunk)
986                        .on_conflict_do_nothing()
987                        .execute(conn)
988                        .await?;
989                }
990
991                for event_emit_modules_chunk in
992                    event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
993                {
994                    diesel::insert_into(event_emit_module::table)
995                        .values(event_emit_modules_chunk)
996                        .on_conflict_do_nothing()
997                        .execute(conn)
998                        .await?;
999                }
1000
1001                for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1002                    diesel::insert_into(event_senders::table)
1003                        .values(event_senders_chunk)
1004                        .on_conflict_do_nothing()
1005                        .execute(conn)
1006                        .await?;
1007                }
1008
1009                for event_struct_packages_chunk in
1010                    event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1011                {
1012                    diesel::insert_into(event_struct_package::table)
1013                        .values(event_struct_packages_chunk)
1014                        .on_conflict_do_nothing()
1015                        .execute(conn)
1016                        .await?;
1017                }
1018
1019                for event_struct_modules_chunk in
1020                    event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1021                {
1022                    diesel::insert_into(event_struct_module::table)
1023                        .values(event_struct_modules_chunk)
1024                        .on_conflict_do_nothing()
1025                        .execute(conn)
1026                        .await?;
1027                }
1028
1029                for event_struct_names_chunk in
1030                    event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1031                {
1032                    diesel::insert_into(event_struct_name::table)
1033                        .values(event_struct_names_chunk)
1034                        .on_conflict_do_nothing()
1035                        .execute(conn)
1036                        .await?;
1037                }
1038
1039                for event_struct_instantiations_chunk in
1040                    event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1041                {
1042                    diesel::insert_into(event_struct_instantiation::table)
1043                        .values(event_struct_instantiations_chunk)
1044                        .on_conflict_do_nothing()
1045                        .execute(conn)
1046                        .await?;
1047                }
1048                Ok(())
1049            }
1050            .scope_boxed()
1051        })
1052        .await?;
1053
1054        let elapsed = guard.stop_and_record();
1055        info!(elapsed, "Persisted {} chunked event indices", len);
1056        Ok(())
1057    }
1058
1059    async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1060        use diesel_async::RunQueryDsl;
1061
1062        let guard = self
1063            .metrics
1064            .checkpoint_db_commit_latency_tx_indices_chunks
1065            .start_timer();
1066        let len = indices.len();
1067        let (
1068            affected_addresses,
1069            affected_objects,
1070            input_objects,
1071            changed_objects,
1072            pkgs,
1073            mods,
1074            funs,
1075            digests,
1076            kinds,
1077        ) = indices.into_iter().map(|i| i.split()).fold(
1078            (
1079                Vec::new(),
1080                Vec::new(),
1081                Vec::new(),
1082                Vec::new(),
1083                Vec::new(),
1084                Vec::new(),
1085                Vec::new(),
1086                Vec::new(),
1087                Vec::new(),
1088            ),
1089            |(
1090                mut tx_affected_addresses,
1091                mut tx_affected_objects,
1092                mut tx_input_objects,
1093                mut tx_changed_objects,
1094                mut tx_pkgs,
1095                mut tx_mods,
1096                mut tx_funs,
1097                mut tx_digests,
1098                mut tx_kinds,
1099            ),
1100             index| {
1101                tx_affected_addresses.extend(index.0);
1102                tx_affected_objects.extend(index.1);
1103                tx_input_objects.extend(index.2);
1104                tx_changed_objects.extend(index.3);
1105                tx_pkgs.extend(index.4);
1106                tx_mods.extend(index.5);
1107                tx_funs.extend(index.6);
1108                tx_digests.extend(index.7);
1109                tx_kinds.extend(index.8);
1110                (
1111                    tx_affected_addresses,
1112                    tx_affected_objects,
1113                    tx_input_objects,
1114                    tx_changed_objects,
1115                    tx_pkgs,
1116                    tx_mods,
1117                    tx_funs,
1118                    tx_digests,
1119                    tx_kinds,
1120                )
1121            },
1122        );
1123
1124        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1125            async {
1126                for affected_addresses_chunk in
1127                    affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1128                {
1129                    diesel::insert_into(tx_affected_addresses::table)
1130                        .values(affected_addresses_chunk)
1131                        .on_conflict_do_nothing()
1132                        .execute(conn)
1133                        .await?;
1134                }
1135
1136                for affected_objects_chunk in
1137                    affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1138                {
1139                    diesel::insert_into(tx_affected_objects::table)
1140                        .values(affected_objects_chunk)
1141                        .on_conflict_do_nothing()
1142                        .execute(conn)
1143                        .await?;
1144                }
1145
1146                for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1147                    diesel::insert_into(tx_input_objects::table)
1148                        .values(input_objects_chunk)
1149                        .on_conflict_do_nothing()
1150                        .execute(conn)
1151                        .await?;
1152                }
1153
1154                for changed_objects_chunk in
1155                    changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1156                {
1157                    diesel::insert_into(tx_changed_objects::table)
1158                        .values(changed_objects_chunk)
1159                        .on_conflict_do_nothing()
1160                        .execute(conn)
1161                        .await?;
1162                }
1163
1164                for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1165                    diesel::insert_into(tx_calls_pkg::table)
1166                        .values(pkgs_chunk)
1167                        .on_conflict_do_nothing()
1168                        .execute(conn)
1169                        .await?;
1170                }
1171
1172                for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1173                    diesel::insert_into(tx_calls_mod::table)
1174                        .values(mods_chunk)
1175                        .on_conflict_do_nothing()
1176                        .execute(conn)
1177                        .await?;
1178                }
1179
1180                for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1181                    diesel::insert_into(tx_calls_fun::table)
1182                        .values(funs_chunk)
1183                        .on_conflict_do_nothing()
1184                        .execute(conn)
1185                        .await?;
1186                }
1187
1188                for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1189                    diesel::insert_into(tx_digests::table)
1190                        .values(digests_chunk)
1191                        .on_conflict_do_nothing()
1192                        .execute(conn)
1193                        .await?;
1194                }
1195
1196                for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1197                    diesel::insert_into(tx_kinds::table)
1198                        .values(kinds_chunk)
1199                        .on_conflict_do_nothing()
1200                        .execute(conn)
1201                        .await?;
1202                }
1203
1204                Ok(())
1205            }
1206            .scope_boxed()
1207        })
1208        .await?;
1209
1210        let elapsed = guard.stop_and_record();
1211        info!(elapsed, "Persisted {} chunked tx_indices", len);
1212        Ok(())
1213    }
1214
1215    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1216        use diesel_async::RunQueryDsl;
1217        let guard = self
1218            .metrics
1219            .checkpoint_db_commit_latency_epoch
1220            .start_timer();
1221        let epoch_id = epoch.new_epoch.epoch;
1222
1223        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1224            async {
1225                if let Some(last_epoch) = &epoch.last_epoch {
1226                    let last_epoch_id = last_epoch.epoch;
1227
1228                    info!(last_epoch_id, "Persisting epoch end data.");
1229                    diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch_id)))
1230                        .set(last_epoch)
1231                        .execute(conn)
1232                        .await?;
1233                }
1234
1235                let epoch_id = epoch.new_epoch.epoch;
1236                info!(epoch_id, "Persisting epoch beginning info");
1237                let error_message =
1238                    concat!("Failed to write to ", stringify!((epochs::table)), " DB");
1239                diesel::insert_into(epochs::table)
1240                    .values(epoch.new_epoch)
1241                    .on_conflict_do_nothing()
1242                    .execute(conn)
1243                    .await
1244                    .map_err(IndexerError::from)
1245                    .context(error_message)?;
1246                Ok::<(), IndexerError>(())
1247            }
1248            .scope_boxed()
1249        })
1250        .await
1251        .tap_ok(|_| {
1252            let elapsed = guard.stop_and_record();
1253            info!(elapsed, epoch_id, "Persisted epoch beginning info");
1254        })
1255        .tap_err(|e| {
1256            tracing::error!("Failed to persist epoch with error: {}", e);
1257        })
1258    }
1259
1260    async fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1261        use diesel_async::RunQueryDsl;
1262
1263        let mut connection = self.pool.get().await?;
1264
1265        let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1266        // partition_0 has been created, so no need to advance it.
1267        if let Some(last_epoch_id) = last_epoch_id {
1268            let last_db_epoch: Option<StoredEpochInfo> = epochs::table
1269                .filter(epochs::epoch.eq(last_epoch_id))
1270                .first::<StoredEpochInfo>(&mut connection)
1271                .await
1272                .optional()
1273                .map_err(Into::into)
1274                .context("Failed to read last epoch from PostgresDB")?;
1275            if let Some(last_epoch) = last_db_epoch {
1276                let epoch_partition_data =
1277                    EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1278                let table_partitions = self.partition_manager.get_table_partitions().await?;
1279                for (table, (_, last_partition)) in table_partitions {
1280                    // Only advance epoch partition for epoch partitioned tables.
1281                    if !self
1282                        .partition_manager
1283                        .get_strategy(&table)
1284                        .is_epoch_partitioned()
1285                    {
1286                        continue;
1287                    }
1288                    let guard = self.metrics.advance_epoch_latency.start_timer();
1289                    self.partition_manager
1290                        .advance_epoch(table.clone(), last_partition, &epoch_partition_data)
1291                        .await?;
1292                    let elapsed = guard.stop_and_record();
1293                    info!(
1294                        elapsed,
1295                        "Advanced epoch partition {} for table {}",
1296                        last_partition,
1297                        table.clone()
1298                    );
1299                }
1300            } else {
1301                tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1302            }
1303        }
1304
1305        Ok(())
1306    }
1307
1308    async fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1309        use diesel_async::RunQueryDsl;
1310        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1311            async {
1312                diesel::delete(
1313                    checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1314                )
1315                .execute(conn)
1316                .await
1317                .map_err(IndexerError::from)
1318                .context("Failed to prune checkpoints table")?;
1319
1320                Ok::<(), IndexerError>(())
1321            }
1322            .scope_boxed()
1323        })
1324        .await
1325    }
1326
1327    async fn prune_event_indices_table(
1328        &self,
1329        min_tx: u64,
1330        max_tx: u64,
1331    ) -> Result<(), IndexerError> {
1332        use diesel_async::RunQueryDsl;
1333
1334        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1335        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1336            async {
1337                diesel::delete(
1338                    event_emit_module::table
1339                        .filter(event_emit_module::tx_sequence_number.between(min_tx, max_tx)),
1340                )
1341                .execute(conn)
1342                .await?;
1343
1344                diesel::delete(
1345                    event_emit_package::table
1346                        .filter(event_emit_package::tx_sequence_number.between(min_tx, max_tx)),
1347                )
1348                .execute(conn)
1349                .await?;
1350
1351                diesel::delete(
1352                    event_senders::table
1353                        .filter(event_senders::tx_sequence_number.between(min_tx, max_tx)),
1354                )
1355                .execute(conn)
1356                .await?;
1357
1358                diesel::delete(event_struct_instantiation::table.filter(
1359                    event_struct_instantiation::tx_sequence_number.between(min_tx, max_tx),
1360                ))
1361                .execute(conn)
1362                .await?;
1363
1364                diesel::delete(
1365                    event_struct_module::table
1366                        .filter(event_struct_module::tx_sequence_number.between(min_tx, max_tx)),
1367                )
1368                .execute(conn)
1369                .await?;
1370
1371                diesel::delete(
1372                    event_struct_name::table
1373                        .filter(event_struct_name::tx_sequence_number.between(min_tx, max_tx)),
1374                )
1375                .execute(conn)
1376                .await?;
1377
1378                diesel::delete(
1379                    event_struct_package::table
1380                        .filter(event_struct_package::tx_sequence_number.between(min_tx, max_tx)),
1381                )
1382                .execute(conn)
1383                .await?;
1384
1385                Ok(())
1386            }
1387            .scope_boxed()
1388        })
1389        .await
1390    }
1391
1392    async fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1393        use diesel_async::RunQueryDsl;
1394
1395        let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1396        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1397            async {
1398                diesel::delete(
1399                    tx_affected_addresses::table
1400                        .filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)),
1401                )
1402                .execute(conn)
1403                .await?;
1404
1405                diesel::delete(
1406                    tx_affected_objects::table
1407                        .filter(tx_affected_objects::tx_sequence_number.between(min_tx, max_tx)),
1408                )
1409                .execute(conn)
1410                .await?;
1411
1412                diesel::delete(
1413                    tx_input_objects::table
1414                        .filter(tx_input_objects::tx_sequence_number.between(min_tx, max_tx)),
1415                )
1416                .execute(conn)
1417                .await?;
1418
1419                diesel::delete(
1420                    tx_changed_objects::table
1421                        .filter(tx_changed_objects::tx_sequence_number.between(min_tx, max_tx)),
1422                )
1423                .execute(conn)
1424                .await?;
1425
1426                diesel::delete(
1427                    tx_calls_pkg::table
1428                        .filter(tx_calls_pkg::tx_sequence_number.between(min_tx, max_tx)),
1429                )
1430                .execute(conn)
1431                .await?;
1432
1433                diesel::delete(
1434                    tx_calls_mod::table
1435                        .filter(tx_calls_mod::tx_sequence_number.between(min_tx, max_tx)),
1436                )
1437                .execute(conn)
1438                .await?;
1439
1440                diesel::delete(
1441                    tx_calls_fun::table
1442                        .filter(tx_calls_fun::tx_sequence_number.between(min_tx, max_tx)),
1443                )
1444                .execute(conn)
1445                .await?;
1446
1447                diesel::delete(
1448                    tx_digests::table
1449                        .filter(tx_digests::tx_sequence_number.between(min_tx, max_tx)),
1450                )
1451                .execute(conn)
1452                .await?;
1453
1454                Ok(())
1455            }
1456            .scope_boxed()
1457        })
1458        .await
1459    }
1460
1461    async fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1462        use diesel_async::RunQueryDsl;
1463
1464        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1465            async {
1466                diesel::delete(
1467                    pruner_cp_watermark::table
1468                        .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1469                )
1470                .execute(conn)
1471                .await
1472                .map_err(IndexerError::from)
1473                .context("Failed to prune pruner_cp_watermark table")?;
1474                Ok(())
1475            }
1476            .scope_boxed()
1477        })
1478        .await
1479    }
1480
1481    async fn get_network_total_transactions_by_end_of_epoch(
1482        &self,
1483        epoch: u64,
1484    ) -> Result<Option<u64>, IndexerError> {
1485        use diesel_async::RunQueryDsl;
1486
1487        let mut connection = self.pool.get().await?;
1488
1489        // TODO: (wlmyng) update to read from epochs::network_total_transactions
1490
1491        Ok(Some(
1492            checkpoints::table
1493                .filter(checkpoints::epoch.eq(epoch as i64))
1494                .select(checkpoints::network_total_transactions)
1495                .order_by(checkpoints::sequence_number.desc())
1496                .first::<i64>(&mut connection)
1497                .await
1498                .map_err(Into::into)
1499                .context("Failed to get network total transactions in epoch")
1500                .map(|v| v as u64)?,
1501        ))
1502    }
1503
1504    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1505        &self,
1506        watermark: CommitterWatermark,
1507    ) -> Result<(), IndexerError>
1508    where
1509        E::Iterator: Iterator<Item: AsRef<str>>,
1510    {
1511        use diesel_async::RunQueryDsl;
1512
1513        let guard = self
1514            .metrics
1515            .checkpoint_db_commit_latency_watermarks
1516            .start_timer();
1517
1518        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1519            let upper_bound_updates = E::iter()
1520                .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1521                .collect::<Vec<_>>();
1522            async {
1523                diesel::insert_into(watermarks::table)
1524                    .values(upper_bound_updates)
1525                    .on_conflict(watermarks::pipeline)
1526                    .do_update()
1527                    .set((
1528                        watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1529                        watermarks::checkpoint_hi_inclusive
1530                            .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1531                        watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1532                    ))
1533                    .execute(conn)
1534                    .await
1535                    .map_err(IndexerError::from)
1536                    .context("Failed to update watermarks upper bound")?;
1537
1538                Ok::<(), IndexerError>(())
1539            }
1540            .scope_boxed()
1541        })
1542        .await
1543        .tap_ok(|_| {
1544            let elapsed = guard.stop_and_record();
1545            info!(elapsed, "Persisted watermarks");
1546        })
1547        .tap_err(|e| {
1548            tracing::error!("Failed to persist watermarks with error: {}", e);
1549        })
1550    }
1551
1552    async fn map_epochs_to_cp_tx(
1553        &self,
1554        epochs: &[u64],
1555    ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1556        use diesel_async::RunQueryDsl;
1557
1558        let mut connection = self.pool.get().await?;
1559
1560        let results: Vec<(i64, i64, Option<i64>)> = epochs::table
1561            .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1562            .select((
1563                epochs::epoch,
1564                epochs::first_checkpoint_id,
1565                epochs::first_tx_sequence_number,
1566            ))
1567            .load::<(i64, i64, Option<i64>)>(&mut connection)
1568            .await
1569            .map_err(Into::into)
1570            .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1571
1572        Ok(results
1573            .into_iter()
1574            .map(|(epoch, checkpoint, tx)| {
1575                (
1576                    epoch as u64,
1577                    (checkpoint as u64, tx.unwrap_or_default() as u64),
1578                )
1579            })
1580            .collect())
1581    }
1582
1583    async fn update_watermarks_lower_bound(
1584        &self,
1585        watermarks: Vec<(PrunableTable, u64)>,
1586    ) -> Result<(), IndexerError> {
1587        use diesel_async::RunQueryDsl;
1588
1589        let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1590        let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?;
1591        let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1592            .into_iter()
1593            .map(|(table, epoch)| {
1594                let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1595                    IndexerError::PersistentStorageDataCorruptionError(format!(
1596                        "Epoch {} not found in epoch mapping",
1597                        epoch
1598                    ))
1599                })?;
1600
1601                Ok(StoredWatermark::from_lower_bound_update(
1602                    table.as_ref(),
1603                    epoch,
1604                    table.select_reader_lo(*checkpoint, *tx),
1605                ))
1606            })
1607            .collect();
1608        let lower_bound_updates = lookups?;
1609
1610        let guard = self
1611            .metrics
1612            .checkpoint_db_commit_latency_watermarks
1613            .start_timer();
1614
1615        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1616            async {
1617                use diesel::dsl::sql;
1618                use diesel::query_dsl::methods::FilterDsl;
1619
1620                diesel::insert_into(watermarks::table)
1621                    .values(lower_bound_updates)
1622                    .on_conflict(watermarks::pipeline)
1623                    .do_update()
1624                    .set((
1625                        watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1626                        watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1627                        watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1628                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1629                        )),
1630                    ))
1631                    .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1632                    .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1633                    .filter(
1634                        diesel::dsl::sql::<diesel::sql_types::BigInt>(
1635                            "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1636                        )
1637                        .gt(watermarks::timestamp_ms),
1638                    )
1639                    .execute(conn)
1640                    .await?;
1641
1642                Ok::<(), IndexerError>(())
1643            }
1644            .scope_boxed()
1645        })
1646        .await
1647        .tap_ok(|_| {
1648            let elapsed = guard.stop_and_record();
1649            info!(elapsed, "Persisted watermarks");
1650        })
1651        .tap_err(|e| {
1652            tracing::error!("Failed to persist watermarks with error: {}", e);
1653        })
1654    }
1655
1656    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1657        use diesel_async::RunQueryDsl;
1658
1659        // read_only transaction, otherwise this will block and get blocked by write transactions to
1660        // the same table.
1661        read_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1662            async {
1663                let stored = watermarks::table
1664                    .load::<StoredWatermark>(conn)
1665                    .await
1666                    .map_err(Into::into)
1667                    .context("Failed reading watermarks from PostgresDB")?;
1668
1669                let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1670                    "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1671                ))
1672                .get_result(conn)
1673                .await
1674                .map_err(Into::into)
1675                .context("Failed reading current timestamp from PostgresDB")?;
1676
1677                Ok((stored, timestamp))
1678            }
1679            .scope_boxed()
1680        })
1681        .await
1682    }
1683}
1684
1685#[async_trait]
1686impl IndexerStore for PgIndexerStore {
1687    async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1688        self.get_latest_checkpoint_sequence_number().await
1689    }
1690
1691    async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1692        self.get_prunable_epoch_range().await
1693    }
1694
1695    async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1696        self.get_available_checkpoint_range().await
1697    }
1698
1699    async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1700        self.get_chain_identifier().await
1701    }
1702
1703    async fn get_latest_object_snapshot_checkpoint_sequence_number(
1704        &self,
1705    ) -> Result<Option<u64>, IndexerError> {
1706        self.get_latest_object_snapshot_checkpoint_sequence_number()
1707            .await
1708    }
1709
1710    async fn persist_objects(
1711        &self,
1712        object_changes: Vec<TransactionObjectChangesToCommit>,
1713    ) -> Result<(), IndexerError> {
1714        if object_changes.is_empty() {
1715            return Ok(());
1716        }
1717        let guard = self
1718            .metrics
1719            .checkpoint_db_commit_latency_objects
1720            .start_timer();
1721        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1722        let object_mutations = indexed_mutations
1723            .into_iter()
1724            .map(StoredObject::from)
1725            .collect::<Vec<_>>();
1726        let object_deletions = indexed_deletions
1727            .into_iter()
1728            .map(StoredDeletedObject::from)
1729            .collect::<Vec<_>>();
1730        let mutation_len = object_mutations.len();
1731        let deletion_len = object_deletions.len();
1732
1733        let object_mutation_chunks =
1734            chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1735        let object_deletion_chunks =
1736            chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1737        let mutation_futures = object_mutation_chunks
1738            .into_iter()
1739            .map(|c| self.persist_object_mutation_chunk(c))
1740            .map(Either::Left);
1741        let deletion_futures = object_deletion_chunks
1742            .into_iter()
1743            .map(|c| self.persist_object_deletion_chunk(c))
1744            .map(Either::Right);
1745        let all_futures = mutation_futures.chain(deletion_futures).collect::<Vec<_>>();
1746
1747        futures::future::join_all(all_futures)
1748            .await
1749            .into_iter()
1750            .collect::<Result<Vec<_>, _>>()
1751            .map_err(|e| {
1752                IndexerError::PostgresWriteError(format!(
1753                    "Failed to persist all object mutation or deletion chunks: {:?}",
1754                    e
1755                ))
1756            })?;
1757        let elapsed = guard.stop_and_record();
1758        info!(
1759            elapsed,
1760            "Persisted {} objects mutations and {} deletions", mutation_len, deletion_len
1761        );
1762        Ok(())
1763    }
1764
1765    async fn persist_objects_snapshot(
1766        &self,
1767        object_changes: Vec<TransactionObjectChangesToCommit>,
1768    ) -> Result<(), IndexerError> {
1769        if object_changes.is_empty() {
1770            return Ok(());
1771        }
1772        let guard = self
1773            .metrics
1774            .checkpoint_db_commit_latency_objects_snapshot
1775            .start_timer();
1776        let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1777        let object_snapshot_mutations: Vec<StoredObjectSnapshot> = indexed_mutations
1778            .into_iter()
1779            .map(StoredObjectSnapshot::from)
1780            .collect();
1781        let object_snapshot_deletions: Vec<StoredObjectSnapshot> = indexed_deletions
1782            .into_iter()
1783            .map(StoredObjectSnapshot::from)
1784            .collect();
1785        let mutation_len = object_snapshot_mutations.len();
1786        let deletion_len = object_snapshot_deletions.len();
1787        let object_snapshot_mutation_chunks = chunk!(
1788            object_snapshot_mutations,
1789            self.config.parallel_objects_chunk_size
1790        );
1791        let object_snapshot_deletion_chunks = chunk!(
1792            object_snapshot_deletions,
1793            self.config.parallel_objects_chunk_size
1794        );
1795        let mutation_futures = object_snapshot_mutation_chunks
1796            .into_iter()
1797            .map(|c| self.persist_object_snapshot_mutation_chunk(c))
1798            .map(Either::Left)
1799            .collect::<Vec<_>>();
1800        let deletion_futures = object_snapshot_deletion_chunks
1801            .into_iter()
1802            .map(|c| self.persist_object_snapshot_deletion_chunk(c))
1803            .map(Either::Right)
1804            .collect::<Vec<_>>();
1805        let all_futures = mutation_futures
1806            .into_iter()
1807            .chain(deletion_futures)
1808            .collect::<Vec<_>>();
1809        futures::future::join_all(all_futures)
1810            .await
1811            .into_iter()
1812            .collect::<Result<Vec<_>, _>>()
1813            .map_err(|e| {
1814                IndexerError::PostgresWriteError(format!(
1815                    "Failed to persist object snapshot mutation or deletion chunks: {:?}",
1816                    e
1817                ))
1818            })
1819            .tap_ok(|_| {
1820                let elapsed = guard.stop_and_record();
1821                info!(
1822                    elapsed,
1823                    "Persisted {} objects snapshot mutations and {} deletions",
1824                    mutation_len,
1825                    deletion_len
1826                );
1827            })
1828            .tap_err(|e| {
1829                tracing::error!(
1830                    "Failed to persist object snapshot mutation or deletion chunks: {:?}",
1831                    e
1832                )
1833            })?;
1834        Ok(())
1835    }
1836
1837    async fn persist_object_history(
1838        &self,
1839        object_changes: Vec<TransactionObjectChangesToCommit>,
1840    ) -> Result<(), IndexerError> {
1841        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1842            .map(|val| val.eq_ignore_ascii_case("true"))
1843            .unwrap_or(false);
1844        if skip_history {
1845            info!("skipping object history");
1846            return Ok(());
1847        }
1848
1849        if object_changes.is_empty() {
1850            return Ok(());
1851        }
1852        let objects = make_objects_history_to_commit(object_changes);
1853        let guard = self
1854            .metrics
1855            .checkpoint_db_commit_latency_objects_history
1856            .start_timer();
1857
1858        let len = objects.len();
1859        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1860        let futures = chunks
1861            .into_iter()
1862            .map(|c| self.persist_objects_history_chunk(c))
1863            .collect::<Vec<_>>();
1864
1865        futures::future::join_all(futures)
1866            .await
1867            .into_iter()
1868            .collect::<Result<Vec<_>, _>>()
1869            .map_err(|e| {
1870                IndexerError::PostgresWriteError(format!(
1871                    "Failed to persist all objects history chunks: {:?}",
1872                    e
1873                ))
1874            })?;
1875        let elapsed = guard.stop_and_record();
1876        info!(elapsed, "Persisted {} objects history", len);
1877        Ok(())
1878    }
1879
1880    // TODO: There are quite some shared boiler-plate code in all functions.
1881    // We should clean them up eventually.
1882    async fn persist_full_objects_history(
1883        &self,
1884        object_changes: Vec<TransactionObjectChangesToCommit>,
1885    ) -> Result<(), IndexerError> {
1886        let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1887            .map(|val| val.eq_ignore_ascii_case("true"))
1888            .unwrap_or(false);
1889        if skip_history {
1890            info!("skipping object history");
1891            return Ok(());
1892        }
1893
1894        if object_changes.is_empty() {
1895            return Ok(());
1896        }
1897        let objects: Vec<StoredFullHistoryObject> = object_changes
1898            .into_iter()
1899            .flat_map(|c| {
1900                let TransactionObjectChangesToCommit {
1901                    changed_objects,
1902                    deleted_objects,
1903                } = c;
1904                changed_objects
1905                    .into_iter()
1906                    .map(|o| o.into())
1907                    .chain(deleted_objects.into_iter().map(|o| o.into()))
1908            })
1909            .collect();
1910        let guard = self
1911            .metrics
1912            .checkpoint_db_commit_latency_full_objects_history
1913            .start_timer();
1914
1915        let len = objects.len();
1916        let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1917        let futures = chunks
1918            .into_iter()
1919            .map(|c| self.persist_full_objects_history_chunk(c))
1920            .collect::<Vec<_>>();
1921
1922        futures::future::join_all(futures)
1923            .await
1924            .into_iter()
1925            .collect::<Result<Vec<_>, _>>()
1926            .map_err(|e| {
1927                IndexerError::PostgresWriteError(format!(
1928                    "Failed to persist all full objects history chunks: {:?}",
1929                    e
1930                ))
1931            })?;
1932        let elapsed = guard.stop_and_record();
1933        info!(elapsed, "Persisted {} full objects history", len);
1934        Ok(())
1935    }
1936
1937    async fn persist_objects_version(
1938        &self,
1939        object_versions: Vec<StoredObjectVersion>,
1940    ) -> Result<(), IndexerError> {
1941        if object_versions.is_empty() {
1942            return Ok(());
1943        }
1944
1945        let guard = self
1946            .metrics
1947            .checkpoint_db_commit_latency_objects_version
1948            .start_timer();
1949
1950        let len = object_versions.len();
1951        let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1952        let futures = chunks
1953            .into_iter()
1954            .map(|c| self.persist_objects_version_chunk(c))
1955            .collect::<Vec<_>>();
1956
1957        futures::future::join_all(futures)
1958            .await
1959            .into_iter()
1960            .collect::<Result<Vec<_>, _>>()
1961            .map_err(|e| {
1962                IndexerError::PostgresWriteError(format!(
1963                    "Failed to persist all objects version chunks: {:?}",
1964                    e
1965                ))
1966            })?;
1967
1968        let elapsed = guard.stop_and_record();
1969        info!(elapsed, "Persisted {} object versions", len);
1970        Ok(())
1971    }
1972
1973    async fn persist_checkpoints(
1974        &self,
1975        checkpoints: Vec<IndexedCheckpoint>,
1976    ) -> Result<(), IndexerError> {
1977        self.persist_checkpoints(checkpoints).await
1978    }
1979
1980    async fn persist_transactions(
1981        &self,
1982        transactions: Vec<IndexedTransaction>,
1983    ) -> Result<(), IndexerError> {
1984        let guard = self
1985            .metrics
1986            .checkpoint_db_commit_latency_transactions
1987            .start_timer();
1988        let len = transactions.len();
1989
1990        let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1991        let futures = chunks
1992            .into_iter()
1993            .map(|c| self.persist_transactions_chunk(c))
1994            .collect::<Vec<_>>();
1995
1996        futures::future::join_all(futures)
1997            .await
1998            .into_iter()
1999            .collect::<Result<Vec<_>, _>>()
2000            .map_err(|e| {
2001                IndexerError::PostgresWriteError(format!(
2002                    "Failed to persist all transactions chunks: {:?}",
2003                    e
2004                ))
2005            })?;
2006        let elapsed = guard.stop_and_record();
2007        info!(elapsed, "Persisted {} transactions", len);
2008        Ok(())
2009    }
2010
2011    async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2012        if events.is_empty() {
2013            return Ok(());
2014        }
2015        let len = events.len();
2016        let guard = self
2017            .metrics
2018            .checkpoint_db_commit_latency_events
2019            .start_timer();
2020        let chunks = chunk!(events, self.config.parallel_chunk_size);
2021        let futures = chunks
2022            .into_iter()
2023            .map(|c| self.persist_events_chunk(c))
2024            .collect::<Vec<_>>();
2025
2026        futures::future::join_all(futures)
2027            .await
2028            .into_iter()
2029            .collect::<Result<Vec<_>, _>>()
2030            .map_err(|e| {
2031                IndexerError::PostgresWriteError(format!(
2032                    "Failed to persist all events chunks: {:?}",
2033                    e
2034                ))
2035            })?;
2036        let elapsed = guard.stop_and_record();
2037        info!(elapsed, "Persisted {} events", len);
2038        Ok(())
2039    }
2040
2041    async fn persist_displays(
2042        &self,
2043        display_updates: BTreeMap<String, StoredDisplay>,
2044    ) -> Result<(), IndexerError> {
2045        if display_updates.is_empty() {
2046            return Ok(());
2047        }
2048        self.persist_display_updates(display_updates.values().cloned().collect::<Vec<_>>())
2049            .await
2050    }
2051
2052    async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2053        if packages.is_empty() {
2054            return Ok(());
2055        }
2056        self.persist_packages(packages).await
2057    }
2058
2059    async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2060        if indices.is_empty() {
2061            return Ok(());
2062        }
2063        let len = indices.len();
2064        let guard = self
2065            .metrics
2066            .checkpoint_db_commit_latency_event_indices
2067            .start_timer();
2068        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2069
2070        let futures = chunks
2071            .into_iter()
2072            .map(|chunk| self.persist_event_indices_chunk(chunk))
2073            .collect::<Vec<_>>();
2074        futures::future::join_all(futures)
2075            .await
2076            .into_iter()
2077            .collect::<Result<Vec<_>, _>>()
2078            .map_err(|e| {
2079                IndexerError::PostgresWriteError(format!(
2080                    "Failed to persist all event_indices chunks: {:?}",
2081                    e
2082                ))
2083            })
2084            .tap_ok(|_| {
2085                let elapsed = guard.stop_and_record();
2086                info!(elapsed, "Persisted {} event_indices chunks", len);
2087            })
2088            .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?;
2089        Ok(())
2090    }
2091
2092    async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2093        if indices.is_empty() {
2094            return Ok(());
2095        }
2096        let len = indices.len();
2097        let guard = self
2098            .metrics
2099            .checkpoint_db_commit_latency_tx_indices
2100            .start_timer();
2101        let chunks = chunk!(indices, self.config.parallel_chunk_size);
2102
2103        let futures = chunks
2104            .into_iter()
2105            .map(|chunk| self.persist_tx_indices_chunk(chunk))
2106            .collect::<Vec<_>>();
2107        futures::future::join_all(futures)
2108            .await
2109            .into_iter()
2110            .collect::<Result<Vec<_>, _>>()
2111            .map_err(|e| {
2112                IndexerError::PostgresWriteError(format!(
2113                    "Failed to persist all tx_indices chunks: {:?}",
2114                    e
2115                ))
2116            })
2117            .tap_ok(|_| {
2118                let elapsed = guard.stop_and_record();
2119                info!(elapsed, "Persisted {} tx_indices chunks", len);
2120            })
2121            .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?;
2122        Ok(())
2123    }
2124
2125    async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2126        self.persist_epoch(epoch).await
2127    }
2128
2129    async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2130        self.advance_epoch(epoch).await
2131    }
2132
2133    async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2134        let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch).await? {
2135            (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2136            _ => Err(IndexerError::PostgresReadError(format!(
2137                "Failed to get checkpoint range for epoch {}",
2138                epoch
2139            ))),
2140        }?;
2141
2142        // NOTE: for disaster recovery, min_cp is the min cp of the current epoch, which is likely
2143        // partially pruned already. min_prunable_cp is the min cp to be pruned.
2144        // By std::cmp::max, we will resume the pruning process from the next checkpoint, instead of
2145        // the first cp of the current epoch.
2146        let min_prunable_cp = self.get_min_prunable_checkpoint().await?;
2147        min_cp = std::cmp::max(min_cp, min_prunable_cp);
2148        for cp in min_cp..=max_cp {
2149            // NOTE: the order of pruning tables is crucial:
2150            // 1. prune tx_* tables;
2151            // 2. prune event_* tables;
2152            // 3. then prune pruner_cp_watermark table, which is the checkpoint pruning watermark table and also tx seq source
2153            // of a checkpoint to prune tx_* tables;
2154            // 4. lastly prune checkpoints table, because wait_for_graphql_checkpoint_pruned
2155            // uses this table as the pruning watermark table.
2156            info!(
2157                "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2158                cp, epoch, min_prunable_cp
2159            );
2160
2161            let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp).await?;
2162            self.prune_tx_indices_table(min_tx, max_tx).await?;
2163            info!(
2164                "Pruned transactions for checkpoint {} from tx {} to tx {}",
2165                cp, min_tx, max_tx
2166            );
2167            self.prune_event_indices_table(min_tx, max_tx).await?;
2168            info!(
2169                "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2170                cp, min_tx, max_tx
2171            );
2172            self.metrics.last_pruned_transaction.set(max_tx as i64);
2173
2174            self.prune_cp_tx_table(cp).await?;
2175            // NOTE: prune checkpoints table last b/c wait_for_graphql_checkpoint_pruned
2176            // uses this table as the watermark table.
2177            self.prune_checkpoints_table(cp).await?;
2178
2179            info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2180            self.metrics.last_pruned_checkpoint.set(cp as i64);
2181        }
2182
2183        Ok(())
2184    }
2185
2186    async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> {
2187        use diesel_async::RunQueryDsl;
2188        let mut connection = self.pool.get().await?;
2189        let mut buffer = Cursor::new(Vec::new());
2190        {
2191            let mut writer = Writer::from_writer(&mut buffer);
2192            let displays = display::table
2193                .load::<StoredDisplay>(&mut connection)
2194                .await
2195                .map_err(Into::into)
2196                .context("Failed to get display from database")?;
2197            info!("Read {} displays", displays.len());
2198            writer
2199                .write_record(["object_type", "id", "version", "bcs"])
2200                .map_err(|_| {
2201                    IndexerError::GcsError("Failed to write display to csv".to_string())
2202                })?;
2203            for display in displays {
2204                writer
2205                    .write_record(&[
2206                        display.object_type,
2207                        hex::encode(display.id),
2208                        display.version.to_string(),
2209                        hex::encode(display.bcs),
2210                    ])
2211                    .map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?;
2212            }
2213            writer
2214                .flush()
2215                .map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?;
2216        }
2217
2218        if let (Some(cred_path), Some(bucket)) = (
2219            self.config.gcs_cred_path.clone(),
2220            self.config.gcs_display_bucket.clone(),
2221        ) {
2222            let remote_store_config = ObjectStoreConfig {
2223                object_store: Some(ObjectStoreType::GCS),
2224                bucket: Some(bucket),
2225                google_service_account: Some(cred_path),
2226                object_store_connection_limit: 200,
2227                no_sign_request: false,
2228                ..Default::default()
2229            };
2230            let remote_store = remote_store_config.make().map_err(|e| {
2231                IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
2232            })?;
2233            let path = Path::from(format!("display_{}.csv", epoch_number).as_str());
2234            put(&remote_store, &path, buffer.into_inner().into())
2235                .await
2236                .map_err(|e| IndexerError::GcsError(format!("Failed to put to GCS: {}", e)))?;
2237        } else {
2238            warn!("Either GCS cred path or bucket is not set, skipping display upload.");
2239        }
2240        Ok(())
2241    }
2242
2243    async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError> {
2244        let cursor = Cursor::new(bytes);
2245        let mut csv_reader = ReaderBuilder::new().has_headers(true).from_reader(cursor);
2246        let displays = csv_reader
2247            .deserialize()
2248            .collect::<Result<Vec<StoredDisplay>, csv::Error>>()
2249            .map_err(|e| {
2250                IndexerError::GcsError(format!("Failed to deserialize display records: {}", e))
2251            })?;
2252        self.persist_display_updates(displays).await
2253    }
2254
2255    async fn get_network_total_transactions_by_end_of_epoch(
2256        &self,
2257        epoch: u64,
2258    ) -> Result<Option<u64>, IndexerError> {
2259        self.get_network_total_transactions_by_end_of_epoch(epoch)
2260            .await
2261    }
2262
2263    /// Persist protocol configs and feature flags until the protocol version for the latest epoch
2264    /// we have stored in the db, inclusive.
2265    async fn persist_protocol_configs_and_feature_flags(
2266        &self,
2267        chain_id: Vec<u8>,
2268    ) -> Result<(), IndexerError> {
2269        use diesel_async::RunQueryDsl;
2270
2271        let chain_id = ChainIdentifier::from(
2272            CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2273        );
2274
2275        let mut all_configs = vec![];
2276        let mut all_flags = vec![];
2277
2278        let (start_version, end_version) = self.get_protocol_version_index_range().await?;
2279        info!(
2280            "Persisting protocol configs with start_version: {}, end_version: {}",
2281            start_version, end_version
2282        );
2283
2284        // Gather all protocol configs and feature flags for all versions between start and end.
2285        for version in start_version..=end_version {
2286            let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2287                (version as u64).into(),
2288                chain_id.chain(),
2289            )
2290            .ok_or(IndexerError::GenericError(format!(
2291                "Unable to fetch protocol version {} and chain {:?}",
2292                version,
2293                chain_id.chain()
2294            )))?;
2295            let configs_vec = protocol_configs
2296                .attr_map()
2297                .into_iter()
2298                .map(|(k, v)| StoredProtocolConfig {
2299                    protocol_version: version,
2300                    config_name: k,
2301                    config_value: v.map(|v| v.to_string()),
2302                })
2303                .collect::<Vec<_>>();
2304            all_configs.extend(configs_vec);
2305
2306            let feature_flags = protocol_configs
2307                .feature_map()
2308                .into_iter()
2309                .map(|(k, v)| StoredFeatureFlag {
2310                    protocol_version: version,
2311                    flag_name: k,
2312                    flag_value: v,
2313                })
2314                .collect::<Vec<_>>();
2315            all_flags.extend(feature_flags);
2316        }
2317
2318        // Now insert all of them into the db.
2319        // TODO: right now the size of these updates is manageable but later we may consider batching.
2320        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
2321            async {
2322                for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2323                    diesel::insert_into(protocol_configs::table)
2324                        .values(config_chunk)
2325                        .on_conflict_do_nothing()
2326                        .execute(conn)
2327                        .await
2328                        .map_err(IndexerError::from)
2329                        .context("Failed to write to protocol_configs table")?;
2330                }
2331
2332                for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2333                    diesel::insert_into(feature_flags::table)
2334                        .values(flag_chunk)
2335                        .on_conflict_do_nothing()
2336                        .execute(conn)
2337                        .await
2338                        .map_err(IndexerError::from)
2339                        .context("Failed to write to feature_flags table")?;
2340                }
2341                Ok::<(), IndexerError>(())
2342            }
2343            .scope_boxed()
2344        })
2345        .await?;
2346        Ok(())
2347    }
2348
2349    async fn persist_chain_identifier(
2350        &self,
2351        checkpoint_digest: Vec<u8>,
2352    ) -> Result<(), IndexerError> {
2353        use diesel_async::RunQueryDsl;
2354
2355        transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
2356            async {
2357                diesel::insert_into(chain_identifier::table)
2358                    .values(StoredChainIdentifier { checkpoint_digest })
2359                    .on_conflict_do_nothing()
2360                    .execute(conn)
2361                    .await
2362                    .map_err(IndexerError::from)
2363                    .context("failed to write to chain_identifier table")?;
2364                Ok::<(), IndexerError>(())
2365            }
2366            .scope_boxed()
2367        })
2368        .await?;
2369        Ok(())
2370    }
2371
2372    async fn persist_raw_checkpoints(
2373        &self,
2374        checkpoints: Vec<StoredRawCheckpoint>,
2375    ) -> Result<(), IndexerError> {
2376        self.persist_raw_checkpoints_impl(&checkpoints).await
2377    }
2378
2379    async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2380        &self,
2381        watermark: CommitterWatermark,
2382    ) -> Result<(), IndexerError>
2383    where
2384        E::Iterator: Iterator<Item: AsRef<str>>,
2385    {
2386        self.update_watermarks_upper_bound::<E>(watermark).await
2387    }
2388
2389    async fn update_watermarks_lower_bound(
2390        &self,
2391        watermarks: Vec<(PrunableTable, u64)>,
2392    ) -> Result<(), IndexerError> {
2393        self.update_watermarks_lower_bound(watermarks).await
2394    }
2395
2396    async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2397        self.get_watermarks().await
2398    }
2399}
2400
2401fn make_objects_history_to_commit(
2402    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2403) -> Vec<StoredHistoryObject> {
2404    let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2405        .clone()
2406        .into_iter()
2407        .flat_map(|changes| changes.deleted_objects)
2408        .map(|o| o.into())
2409        .collect();
2410    let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2411        .into_iter()
2412        .flat_map(|changes| changes.changed_objects)
2413        .map(|o| o.into())
2414        .collect();
2415    deleted_objects.into_iter().chain(mutated_objects).collect()
2416}
2417
2418// Partition object changes into deletions and mutations,
2419// within partition of mutations or deletions, retain the latest with highest version;
2420// For overlappings of mutations and deletions, only keep one with higher version.
2421// This is necessary b/c after this step, DB commit will be done in parallel and not in order.
2422fn retain_latest_indexed_objects(
2423    tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2424) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2425    // Only the last deleted / mutated object will be in the map,
2426    // b/c tx_object_changes are in order and versions always increment,
2427    let (mutations, deletions) = tx_object_changes
2428        .into_iter()
2429        .flat_map(|change| {
2430            change
2431                .changed_objects
2432                .into_iter()
2433                .map(Either::Left)
2434                .chain(
2435                    change
2436                        .deleted_objects
2437                        .into_iter()
2438                        .map(Either::Right),
2439                )
2440        })
2441        .fold(
2442            (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2443            |(mut mutations, mut deletions), either_change| {
2444                match either_change {
2445                    // Remove mutation / deletion with a following deletion / mutation,
2446                    // b/c following deletion / mutation always has a higher version.
2447                    // Technically, assertions below are not required, double check just in case.
2448                    Either::Left(mutation) => {
2449                        let id = mutation.object.id();
2450                        let mutation_version = mutation.object.version();
2451                        if let Some(existing) = deletions.remove(&id) {
2452                            assert!(
2453                                existing.object_version < mutation_version.value(),
2454                                "Mutation version ({:?}) should be greater than existing deletion version ({:?}) for object {:?}",
2455                                mutation_version,
2456                                existing.object_version,
2457                                id
2458                            );
2459                        }
2460                        if let Some(existing) = mutations.insert(id, mutation) {
2461                            assert!(
2462                                existing.object.version() < mutation_version,
2463                                "Mutation version ({:?}) should be greater than existing mutation version ({:?}) for object {:?}",
2464                                mutation_version,
2465                                existing.object.version(),
2466                                id
2467                            );
2468                        }
2469                    }
2470                    Either::Right(deletion) => {
2471                        let id = deletion.object_id;
2472                        let deletion_version = deletion.object_version;
2473                        if let Some(existing) = mutations.remove(&id) {
2474                            assert!(
2475                                existing.object.version().value() < deletion_version,
2476                                "Deletion version ({:?}) should be greater than existing mutation version ({:?}) for object {:?}",
2477                                deletion_version,
2478                                existing.object.version(),
2479                                id
2480                            );
2481                        }
2482                        if let Some(existing) = deletions.insert(id, deletion) {
2483                            assert!(
2484                                existing.object_version < deletion_version,
2485                                "Deletion version ({:?}) should be greater than existing deletion version ({:?}) for object {:?}",
2486                                deletion_version,
2487                                existing.object_version,
2488                                id
2489                            );
2490                        }
2491                    }
2492                }
2493                (mutations, deletions)
2494            },
2495        );
2496    (
2497        mutations.into_values().collect(),
2498        deletions.into_values().collect(),
2499    )
2500}