1use std::collections::{BTreeMap, HashMap};
5use std::io::Cursor;
6use std::time::Duration;
7
8use async_trait::async_trait;
9use core::result::Result::Ok;
10use csv::{ReaderBuilder, Writer};
11use diesel::ExpressionMethods;
12use diesel::OptionalExtension;
13use diesel::QueryDsl;
14use diesel::dsl::{max, min};
15use diesel_async::scoped_futures::ScopedFutureExt;
16use futures::future::Either;
17use itertools::Itertools;
18use object_store::path::Path;
19use strum::IntoEnumIterator;
20use sui_types::base_types::ObjectID;
21use tap::TapFallible;
22use tracing::{info, warn};
23
24use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
25use sui_protocol_config::ProtocolConfig;
26use sui_storage::object_store::util::put;
27
28use crate::config::UploadOptions;
29use crate::database::ConnectionPool;
30use crate::errors::{Context, IndexerError};
31use crate::handlers::TransactionObjectChangesToCommit;
32use crate::handlers::pruner::PrunableTable;
33use crate::handlers::{CommitterWatermark, EpochToCommit};
34use crate::metrics::IndexerMetrics;
35use crate::models::checkpoints::StoredChainIdentifier;
36use crate::models::checkpoints::StoredCheckpoint;
37use crate::models::checkpoints::StoredCpTx;
38use crate::models::display::StoredDisplay;
39use crate::models::epoch::StoredEpochInfo;
40use crate::models::epoch::{StoredFeatureFlag, StoredProtocolConfig};
41use crate::models::events::StoredEvent;
42use crate::models::obj_indices::StoredObjectVersion;
43use crate::models::objects::{
44 StoredDeletedObject, StoredFullHistoryObject, StoredHistoryObject, StoredObject,
45 StoredObjectSnapshot,
46};
47use crate::models::packages::StoredPackage;
48use crate::models::transactions::StoredTransaction;
49use crate::models::watermarks::StoredWatermark;
50use crate::schema::{
51 chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
52 event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
53 event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
54 objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
55 raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun,
56 tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
57 watermarks,
58};
59use crate::store::{read_with_retry, transaction_with_retry};
60use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject};
61use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex};
62
63use super::IndexerStore;
64use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
65
66use crate::models::raw_checkpoints::StoredRawCheckpoint;
67use diesel::upsert::excluded;
68use sui_types::digests::{ChainIdentifier, CheckpointDigest};
69
70#[macro_export]
71macro_rules! chunk {
72 ($data: expr, $size: expr) => {{
73 $data
74 .into_iter()
75 .chunks($size)
76 .into_iter()
77 .map(|c| c.collect())
78 .collect::<Vec<Vec<_>>>()
79 }};
80}
81
82const PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX: usize = 1000;
87const PG_COMMIT_PARALLEL_CHUNK_SIZE: usize = 100;
89const PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE: usize = 500;
93const PG_DB_COMMIT_SLEEP_DURATION: Duration = Duration::from_secs(3600);
94
95#[derive(Clone)]
96pub struct PgIndexerStoreConfig {
97 pub parallel_chunk_size: usize,
98 pub parallel_objects_chunk_size: usize,
99 pub gcs_cred_path: Option<String>,
100 pub gcs_display_bucket: Option<String>,
101}
102
103#[derive(Clone)]
104pub struct PgIndexerStore {
105 pool: ConnectionPool,
106 metrics: IndexerMetrics,
107 partition_manager: PgPartitionManager,
108 config: PgIndexerStoreConfig,
109}
110
111impl PgIndexerStore {
112 pub fn new(
113 pool: ConnectionPool,
114 upload_options: UploadOptions,
115 metrics: IndexerMetrics,
116 ) -> Self {
117 let parallel_chunk_size = std::env::var("PG_COMMIT_PARALLEL_CHUNK_SIZE")
118 .unwrap_or_else(|_e| PG_COMMIT_PARALLEL_CHUNK_SIZE.to_string())
119 .parse::<usize>()
120 .unwrap();
121 let parallel_objects_chunk_size = std::env::var("PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE")
122 .unwrap_or_else(|_e| PG_COMMIT_OBJECTS_PARALLEL_CHUNK_SIZE.to_string())
123 .parse::<usize>()
124 .unwrap();
125 let partition_manager =
126 PgPartitionManager::new(pool.clone()).expect("Failed to initialize partition manager");
127 let config = PgIndexerStoreConfig {
128 parallel_chunk_size,
129 parallel_objects_chunk_size,
130 gcs_cred_path: upload_options.gcs_cred_path,
131 gcs_display_bucket: upload_options.gcs_display_bucket,
132 };
133
134 Self {
135 pool,
136 metrics,
137 partition_manager,
138 config,
139 }
140 }
141
142 pub fn pool(&self) -> ConnectionPool {
143 self.pool.clone()
144 }
145
146 pub async fn get_protocol_version_index_range(&self) -> Result<(i64, i64), IndexerError> {
148 use diesel_async::RunQueryDsl;
149
150 let mut connection = self.pool.get().await?;
151 let start = protocol_configs::table
153 .select(max(protocol_configs::protocol_version))
154 .first::<Option<i64>>(&mut connection)
155 .await
156 .map_err(Into::into)
157 .context("Failed reading latest protocol version from PostgresDB")?
158 .map_or(1, |v| v + 1);
159
160 let end = epochs::table
162 .select(max(epochs::protocol_version))
163 .first::<Option<i64>>(&mut connection)
164 .await
165 .map_err(Into::into)
166 .context("Failed reading latest epoch protocol version from PostgresDB")?
167 .unwrap_or(1);
168 Ok((start, end))
169 }
170
171 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
172 use diesel_async::RunQueryDsl;
173
174 let mut connection = self.pool.get().await?;
175
176 chain_identifier::table
177 .select(chain_identifier::checkpoint_digest)
178 .first::<Vec<u8>>(&mut connection)
179 .await
180 .optional()
181 .map_err(Into::into)
182 .context("Failed reading chain id from PostgresDB")
183 }
184
185 pub async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
187 use diesel_async::RunQueryDsl;
188
189 let mut connection = self.pool.get().await?;
190
191 checkpoints::table
192 .select(max(checkpoints::sequence_number))
193 .first::<Option<i64>>(&mut connection)
194 .await
195 .map_err(Into::into)
196 .map(|v| v.map(|v| v as u64))
197 .context("Failed reading latest checkpoint sequence number from PostgresDB")
198 }
199
200 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
201 use diesel_async::RunQueryDsl;
202
203 let mut connection = self.pool.get().await?;
204
205 checkpoints::table
206 .select((
207 min(checkpoints::sequence_number),
208 max(checkpoints::sequence_number),
209 ))
210 .first::<(Option<i64>, Option<i64>)>(&mut connection)
211 .await
212 .map_err(Into::into)
213 .map(|(min, max)| {
214 (
215 min.unwrap_or_default() as u64,
216 max.unwrap_or_default() as u64,
217 )
218 })
219 .context("Failed reading min and max checkpoint sequence numbers from PostgresDB")
220 }
221
222 async fn get_prunable_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
223 use diesel_async::RunQueryDsl;
224
225 let mut connection = self.pool.get().await?;
226
227 epochs::table
228 .select((min(epochs::epoch), max(epochs::epoch)))
229 .first::<(Option<i64>, Option<i64>)>(&mut connection)
230 .await
231 .map_err(Into::into)
232 .map(|(min, max)| {
233 (
234 min.unwrap_or_default() as u64,
235 max.unwrap_or_default() as u64,
236 )
237 })
238 .context("Failed reading min and max epoch numbers from PostgresDB")
239 }
240
241 async fn get_min_prunable_checkpoint(&self) -> Result<u64, IndexerError> {
242 use diesel_async::RunQueryDsl;
243
244 let mut connection = self.pool.get().await?;
245
246 pruner_cp_watermark::table
247 .select(min(pruner_cp_watermark::checkpoint_sequence_number))
248 .first::<Option<i64>>(&mut connection)
249 .await
250 .map_err(Into::into)
251 .map(|v| v.unwrap_or_default() as u64)
252 .context("Failed reading min prunable checkpoint sequence number from PostgresDB")
253 }
254
255 pub async fn get_checkpoint_range_for_epoch(
256 &self,
257 epoch: u64,
258 ) -> Result<(u64, Option<u64>), IndexerError> {
259 use diesel_async::RunQueryDsl;
260
261 let mut connection = self.pool.get().await?;
262
263 epochs::table
264 .select((epochs::first_checkpoint_id, epochs::last_checkpoint_id))
265 .filter(epochs::epoch.eq(epoch as i64))
266 .first::<(i64, Option<i64>)>(&mut connection)
267 .await
268 .map_err(Into::into)
269 .map(|(min, max)| (min as u64, max.map(|v| v as u64)))
270 .context("Failed reading checkpoint range from PostgresDB")
271 }
272
273 pub async fn get_transaction_range_for_checkpoint(
274 &self,
275 checkpoint: u64,
276 ) -> Result<(u64, u64), IndexerError> {
277 use diesel_async::RunQueryDsl;
278
279 let mut connection = self.pool.get().await?;
280
281 pruner_cp_watermark::table
282 .select((
283 pruner_cp_watermark::min_tx_sequence_number,
284 pruner_cp_watermark::max_tx_sequence_number,
285 ))
286 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(checkpoint as i64))
287 .first::<(i64, i64)>(&mut connection)
288 .await
289 .map_err(Into::into)
290 .map(|(min, max)| (min as u64, max as u64))
291 .context("Failed reading transaction range from PostgresDB")
292 }
293
294 pub async fn get_latest_object_snapshot_checkpoint_sequence_number(
295 &self,
296 ) -> Result<Option<u64>, IndexerError> {
297 use diesel_async::RunQueryDsl;
298
299 let mut connection = self.pool.get().await?;
300
301 objects_snapshot::table
302 .select(max(objects_snapshot::checkpoint_sequence_number))
303 .first::<Option<i64>>(&mut connection)
304 .await
305 .map_err(Into::into)
306 .map(|v| v.map(|v| v as u64))
307 .context(
308 "Failed reading latest object snapshot checkpoint sequence number from PostgresDB",
309 )
310 }
311
312 async fn persist_display_updates(
313 &self,
314 display_updates: Vec<StoredDisplay>,
315 ) -> Result<(), IndexerError> {
316 use diesel_async::RunQueryDsl;
317
318 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
319 async {
320 diesel::insert_into(display::table)
321 .values(display_updates)
322 .on_conflict(display::object_type)
323 .do_update()
324 .set((
325 display::id.eq(excluded(display::id)),
326 display::version.eq(excluded(display::version)),
327 display::bcs.eq(excluded(display::bcs)),
328 ))
329 .execute(conn)
330 .await?;
331
332 Ok::<(), IndexerError>(())
333 }
334 .scope_boxed()
335 })
336 .await?;
337
338 Ok(())
339 }
340
341 async fn persist_object_mutation_chunk(
342 &self,
343 mutated_object_mutation_chunk: Vec<StoredObject>,
344 ) -> Result<(), IndexerError> {
345 use diesel_async::RunQueryDsl;
346
347 let guard = self
348 .metrics
349 .checkpoint_db_commit_latency_objects_chunks
350 .start_timer();
351 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
352 async {
353 diesel::insert_into(objects::table)
354 .values(mutated_object_mutation_chunk.clone())
355 .on_conflict(objects::object_id)
356 .do_update()
357 .set((
358 objects::object_id.eq(excluded(objects::object_id)),
359 objects::object_version.eq(excluded(objects::object_version)),
360 objects::object_digest.eq(excluded(objects::object_digest)),
361 objects::owner_type.eq(excluded(objects::owner_type)),
362 objects::owner_id.eq(excluded(objects::owner_id)),
363 objects::object_type.eq(excluded(objects::object_type)),
364 objects::serialized_object.eq(excluded(objects::serialized_object)),
365 objects::coin_type.eq(excluded(objects::coin_type)),
366 objects::coin_balance.eq(excluded(objects::coin_balance)),
367 objects::df_kind.eq(excluded(objects::df_kind)),
368 ))
369 .execute(conn)
370 .await?;
371 Ok::<(), IndexerError>(())
372 }
373 .scope_boxed()
374 })
375 .await
376 .tap_ok(|_| {
377 guard.stop_and_record();
378 })
379 .tap_err(|e| {
380 tracing::error!("Failed to persist object mutations with error: {}", e);
381 })
382 }
383
384 async fn persist_object_deletion_chunk(
385 &self,
386 deleted_objects_chunk: Vec<StoredDeletedObject>,
387 ) -> Result<(), IndexerError> {
388 use diesel_async::RunQueryDsl;
389 let guard = self
390 .metrics
391 .checkpoint_db_commit_latency_objects_chunks
392 .start_timer();
393 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
394 async {
395 diesel::delete(
396 objects::table.filter(
397 objects::object_id.eq_any(
398 deleted_objects_chunk
399 .iter()
400 .map(|o| o.object_id.clone())
401 .collect::<Vec<_>>(),
402 ),
403 ),
404 )
405 .execute(conn)
406 .await
407 .map_err(IndexerError::from)
408 .context("Failed to write object deletion to PostgresDB")?;
409
410 Ok::<(), IndexerError>(())
411 }
412 .scope_boxed()
413 })
414 .await
415 .tap_ok(|_| {
416 guard.stop_and_record();
417 })
418 .tap_err(|e| {
419 tracing::error!("Failed to persist object deletions with error: {}", e);
420 })
421 }
422
423 async fn persist_object_snapshot_mutation_chunk(
424 &self,
425 objects_snapshot_mutations: Vec<StoredObjectSnapshot>,
426 ) -> Result<(), IndexerError> {
427 use diesel_async::RunQueryDsl;
428 let guard = self
429 .metrics
430 .checkpoint_db_commit_latency_objects_snapshot_chunks
431 .start_timer();
432 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
433 async {
434 for mutation_chunk in
435 objects_snapshot_mutations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
436 {
437 diesel::insert_into(objects_snapshot::table)
438 .values(mutation_chunk)
439 .on_conflict(objects_snapshot::object_id)
440 .do_update()
441 .set((
442 objects_snapshot::object_version
443 .eq(excluded(objects_snapshot::object_version)),
444 objects_snapshot::object_status
445 .eq(excluded(objects_snapshot::object_status)),
446 objects_snapshot::object_digest
447 .eq(excluded(objects_snapshot::object_digest)),
448 objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
449 objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
450 objects_snapshot::object_type_package
451 .eq(excluded(objects_snapshot::object_type_package)),
452 objects_snapshot::object_type_module
453 .eq(excluded(objects_snapshot::object_type_module)),
454 objects_snapshot::object_type_name
455 .eq(excluded(objects_snapshot::object_type_name)),
456 objects_snapshot::object_type
457 .eq(excluded(objects_snapshot::object_type)),
458 objects_snapshot::serialized_object
459 .eq(excluded(objects_snapshot::serialized_object)),
460 objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
461 objects_snapshot::coin_balance
462 .eq(excluded(objects_snapshot::coin_balance)),
463 objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
464 objects_snapshot::checkpoint_sequence_number
465 .eq(excluded(objects_snapshot::checkpoint_sequence_number)),
466 ))
467 .execute(conn)
468 .await?;
469 }
470 Ok::<(), IndexerError>(())
471 }
472 .scope_boxed()
473 })
474 .await
475 .tap_ok(|_| {
476 guard.stop_and_record();
477 })
478 .tap_err(|e| {
479 tracing::error!("Failed to persist object snapshot with error: {}", e);
480 })
481 }
482
483 async fn persist_object_snapshot_deletion_chunk(
484 &self,
485 objects_snapshot_deletions: Vec<StoredObjectSnapshot>,
486 ) -> Result<(), IndexerError> {
487 use diesel_async::RunQueryDsl;
488 let guard = self
489 .metrics
490 .checkpoint_db_commit_latency_objects_snapshot_chunks
491 .start_timer();
492
493 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
494 async {
495 for deletion_chunk in
496 objects_snapshot_deletions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
497 {
498 diesel::delete(
499 objects_snapshot::table.filter(
500 objects_snapshot::object_id.eq_any(
501 deletion_chunk
502 .iter()
503 .map(|o| o.object_id.clone())
504 .collect::<Vec<_>>(),
505 ),
506 ),
507 )
508 .execute(conn)
509 .await
510 .map_err(IndexerError::from)
511 .context("Failed to write object deletion to PostgresDB")?;
512 }
513 Ok::<(), IndexerError>(())
514 }
515 .scope_boxed()
516 })
517 .await
518 .tap_ok(|_| {
519 let elapsed = guard.stop_and_record();
520 info!(
521 elapsed,
522 "Deleted {} chunked object snapshots",
523 objects_snapshot_deletions.len(),
524 );
525 })
526 .tap_err(|e| {
527 tracing::error!(
528 "Failed to persist object snapshot deletions with error: {}",
529 e
530 );
531 })
532 }
533
534 async fn persist_objects_history_chunk(
535 &self,
536 stored_objects_history: Vec<StoredHistoryObject>,
537 ) -> Result<(), IndexerError> {
538 use diesel_async::RunQueryDsl;
539 let guard = self
540 .metrics
541 .checkpoint_db_commit_latency_objects_history_chunks
542 .start_timer();
543 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
544 async {
545 for stored_objects_history_chunk in
546 stored_objects_history.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
547 {
548 let error_message = concat!(
549 "Failed to write to ",
550 stringify!((objects_history::table)),
551 " DB"
552 );
553 diesel::insert_into(objects_history::table)
554 .values(stored_objects_history_chunk)
555 .on_conflict_do_nothing()
556 .execute(conn)
557 .await
558 .map_err(IndexerError::from)
559 .context(error_message)?;
560 }
561 Ok::<(), IndexerError>(())
562 }
563 .scope_boxed()
564 })
565 .await
566 .tap_ok(|_| {
567 guard.stop_and_record();
568 })
569 .tap_err(|e| {
570 tracing::error!("Failed to persist object history with error: {}", e);
571 })
572 }
573
574 async fn persist_full_objects_history_chunk(
575 &self,
576 objects: Vec<StoredFullHistoryObject>,
577 ) -> Result<(), IndexerError> {
578 use diesel_async::RunQueryDsl;
579
580 let guard = self
581 .metrics
582 .checkpoint_db_commit_latency_full_objects_history_chunks
583 .start_timer();
584
585 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
586 async {
587 for objects_chunk in objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
588 diesel::insert_into(full_objects_history::table)
589 .values(objects_chunk)
590 .on_conflict_do_nothing()
591 .execute(conn)
592 .await
593 .map_err(IndexerError::from)
594 .context("Failed to write to full_objects_history table")?;
595 }
596
597 Ok(())
598 }
599 .scope_boxed()
600 })
601 .await
602 .tap_ok(|_| {
603 let elapsed = guard.stop_and_record();
604 info!(
605 elapsed,
606 "Persisted {} chunked full objects history",
607 objects.len(),
608 );
609 })
610 .tap_err(|e| {
611 tracing::error!("Failed to persist full object history with error: {}", e);
612 })
613 }
614
615 async fn persist_objects_version_chunk(
616 &self,
617 object_versions: Vec<StoredObjectVersion>,
618 ) -> Result<(), IndexerError> {
619 use diesel_async::RunQueryDsl;
620
621 let guard = self
622 .metrics
623 .checkpoint_db_commit_latency_objects_version_chunks
624 .start_timer();
625
626 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
627 async {
628 for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
629 {
630 diesel::insert_into(objects_version::table)
631 .values(object_version_chunk)
632 .on_conflict_do_nothing()
633 .execute(conn)
634 .await
635 .map_err(IndexerError::from)
636 .context("Failed to write to objects_version table")?;
637 }
638 Ok::<(), IndexerError>(())
639 }
640 .scope_boxed()
641 })
642 .await
643 .tap_ok(|_| {
644 let elapsed = guard.stop_and_record();
645 info!(
646 elapsed,
647 "Persisted {} chunked object versions",
648 object_versions.len(),
649 );
650 })
651 .tap_err(|e| {
652 tracing::error!("Failed to persist object versions with error: {}", e);
653 })
654 }
655
656 async fn persist_raw_checkpoints_impl(
657 &self,
658 raw_checkpoints: &[StoredRawCheckpoint],
659 ) -> Result<(), IndexerError> {
660 use diesel_async::RunQueryDsl;
661
662 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
663 async {
664 diesel::insert_into(raw_checkpoints::table)
665 .values(raw_checkpoints)
666 .on_conflict_do_nothing()
667 .execute(conn)
668 .await
669 .map_err(IndexerError::from)
670 .context("Failed to write to raw_checkpoints table")?;
671 Ok::<(), IndexerError>(())
672 }
673 .scope_boxed()
674 })
675 .await
676 }
677
678 async fn persist_checkpoints(
679 &self,
680 checkpoints: Vec<IndexedCheckpoint>,
681 ) -> Result<(), IndexerError> {
682 use diesel_async::RunQueryDsl;
683
684 let Some(first_checkpoint) = checkpoints.as_slice().first() else {
685 return Ok(());
686 };
687
688 if first_checkpoint.sequence_number == 0 {
691 let checkpoint_digest = first_checkpoint.checkpoint_digest.into_inner().to_vec();
692 self.persist_protocol_configs_and_feature_flags(checkpoint_digest.clone())
693 .await?;
694 self.persist_chain_identifier(checkpoint_digest).await?;
695 }
696 let guard = self
697 .metrics
698 .checkpoint_db_commit_latency_checkpoints
699 .start_timer();
700
701 let stored_cp_txs = checkpoints.iter().map(StoredCpTx::from).collect::<Vec<_>>();
702 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
703 async {
704 for stored_cp_tx_chunk in stored_cp_txs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
705 diesel::insert_into(pruner_cp_watermark::table)
706 .values(stored_cp_tx_chunk)
707 .on_conflict_do_nothing()
708 .execute(conn)
709 .await
710 .map_err(IndexerError::from)
711 .context("Failed to write to pruner_cp_watermark table")?;
712 }
713 Ok::<(), IndexerError>(())
714 }
715 .scope_boxed()
716 })
717 .await
718 .tap_ok(|_| {
719 info!(
720 "Persisted {} pruner_cp_watermark rows.",
721 stored_cp_txs.len(),
722 );
723 })
724 .tap_err(|e| {
725 tracing::error!("Failed to persist pruner_cp_watermark with error: {}", e);
726 })?;
727
728 let stored_checkpoints = checkpoints
729 .iter()
730 .map(StoredCheckpoint::from)
731 .collect::<Vec<_>>();
732 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
733 async {
734 for stored_checkpoint_chunk in
735 stored_checkpoints.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
736 {
737 diesel::insert_into(checkpoints::table)
738 .values(stored_checkpoint_chunk)
739 .on_conflict_do_nothing()
740 .execute(conn)
741 .await
742 .map_err(IndexerError::from)
743 .context("Failed to write to checkpoints table")?;
744 let time_now_ms = chrono::Utc::now().timestamp_millis();
745 for stored_checkpoint in stored_checkpoint_chunk {
746 self.metrics
747 .db_commit_lag_ms
748 .set(time_now_ms - stored_checkpoint.timestamp_ms);
749 self.metrics
750 .max_committed_checkpoint_sequence_number
751 .set(stored_checkpoint.sequence_number);
752 self.metrics
753 .committed_checkpoint_timestamp_ms
754 .set(stored_checkpoint.timestamp_ms);
755 }
756
757 for stored_checkpoint in stored_checkpoint_chunk {
758 info!(
759 "Indexer lag: \
760 persisted checkpoint {} with time now {} and checkpoint time {}",
761 stored_checkpoint.sequence_number,
762 time_now_ms,
763 stored_checkpoint.timestamp_ms
764 );
765 }
766 }
767 Ok::<(), IndexerError>(())
768 }
769 .scope_boxed()
770 })
771 .await
772 .tap_ok(|_| {
773 let elapsed = guard.stop_and_record();
774 info!(
775 elapsed,
776 "Persisted {} checkpoints",
777 stored_checkpoints.len()
778 );
779 })
780 .tap_err(|e| {
781 tracing::error!("Failed to persist checkpoints with error: {}", e);
782 })
783 }
784
785 async fn persist_transactions_chunk(
786 &self,
787 transactions: Vec<IndexedTransaction>,
788 ) -> Result<(), IndexerError> {
789 use diesel_async::RunQueryDsl;
790 let guard = self
791 .metrics
792 .checkpoint_db_commit_latency_transactions_chunks
793 .start_timer();
794 let transformation_guard = self
795 .metrics
796 .checkpoint_db_commit_latency_transactions_chunks_transformation
797 .start_timer();
798 let transactions = transactions
799 .iter()
800 .map(StoredTransaction::from)
801 .collect::<Vec<_>>();
802 drop(transformation_guard);
803
804 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
805 async {
806 for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
807 let error_message = concat!(
808 "Failed to write to ",
809 stringify!((transactions::table)),
810 " DB"
811 );
812 diesel::insert_into(transactions::table)
813 .values(transaction_chunk)
814 .on_conflict_do_nothing()
815 .execute(conn)
816 .await
817 .map_err(IndexerError::from)
818 .context(error_message)?;
819 }
820 Ok::<(), IndexerError>(())
821 }
822 .scope_boxed()
823 })
824 .await
825 .tap_ok(|_| {
826 let elapsed = guard.stop_and_record();
827 info!(
828 elapsed,
829 "Persisted {} chunked transactions",
830 transactions.len()
831 );
832 })
833 .tap_err(|e| {
834 tracing::error!("Failed to persist transactions with error: {}", e);
835 })
836 }
837
838 async fn persist_events_chunk(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
839 use diesel_async::RunQueryDsl;
840 let guard = self
841 .metrics
842 .checkpoint_db_commit_latency_events_chunks
843 .start_timer();
844 let len = events.len();
845 let events = events
846 .into_iter()
847 .map(StoredEvent::from)
848 .collect::<Vec<_>>();
849
850 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
851 async {
852 for event_chunk in events.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
853 let error_message =
854 concat!("Failed to write to ", stringify!((events::table)), " DB");
855 diesel::insert_into(events::table)
856 .values(event_chunk)
857 .on_conflict_do_nothing()
858 .execute(conn)
859 .await
860 .map_err(IndexerError::from)
861 .context(error_message)?;
862 }
863 Ok::<(), IndexerError>(())
864 }
865 .scope_boxed()
866 })
867 .await
868 .tap_ok(|_| {
869 let elapsed = guard.stop_and_record();
870 info!(elapsed, "Persisted {} chunked events", len);
871 })
872 .tap_err(|e| {
873 tracing::error!("Failed to persist events with error: {}", e);
874 })
875 }
876
877 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
878 use diesel_async::RunQueryDsl;
879 if packages.is_empty() {
880 return Ok(());
881 }
882 let guard = self
883 .metrics
884 .checkpoint_db_commit_latency_packages
885 .start_timer();
886 let packages = packages
887 .into_iter()
888 .map(StoredPackage::from)
889 .collect::<Vec<_>>();
890 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
891 async {
892 for packages_chunk in packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
893 diesel::insert_into(packages::table)
894 .values(packages_chunk)
895 .on_conflict(packages::package_id)
896 .do_update()
897 .set((
898 packages::package_id.eq(excluded(packages::package_id)),
899 packages::package_version.eq(excluded(packages::package_version)),
900 packages::move_package.eq(excluded(packages::move_package)),
901 packages::checkpoint_sequence_number
902 .eq(excluded(packages::checkpoint_sequence_number)),
903 ))
904 .execute(conn)
905 .await?;
906 }
907 Ok::<(), IndexerError>(())
908 }
909 .scope_boxed()
910 })
911 .await
912 .tap_ok(|_| {
913 let elapsed = guard.stop_and_record();
914 info!(elapsed, "Persisted {} packages", packages.len());
915 })
916 .tap_err(|e| {
917 tracing::error!("Failed to persist packages with error: {}", e);
918 })
919 }
920
921 async fn persist_event_indices_chunk(
922 &self,
923 indices: Vec<EventIndex>,
924 ) -> Result<(), IndexerError> {
925 use diesel_async::RunQueryDsl;
926
927 let guard = self
928 .metrics
929 .checkpoint_db_commit_latency_event_indices_chunks
930 .start_timer();
931 let len = indices.len();
932 let (
933 event_emit_packages,
934 event_emit_modules,
935 event_senders,
936 event_struct_packages,
937 event_struct_modules,
938 event_struct_names,
939 event_struct_instantiations,
940 ) = indices.into_iter().map(|i| i.split()).fold(
941 (
942 Vec::new(),
943 Vec::new(),
944 Vec::new(),
945 Vec::new(),
946 Vec::new(),
947 Vec::new(),
948 Vec::new(),
949 ),
950 |(
951 mut event_emit_packages,
952 mut event_emit_modules,
953 mut event_senders,
954 mut event_struct_packages,
955 mut event_struct_modules,
956 mut event_struct_names,
957 mut event_struct_instantiations,
958 ),
959 index| {
960 event_emit_packages.push(index.0);
961 event_emit_modules.push(index.1);
962 event_senders.push(index.2);
963 event_struct_packages.push(index.3);
964 event_struct_modules.push(index.4);
965 event_struct_names.push(index.5);
966 event_struct_instantiations.push(index.6);
967 (
968 event_emit_packages,
969 event_emit_modules,
970 event_senders,
971 event_struct_packages,
972 event_struct_modules,
973 event_struct_names,
974 event_struct_instantiations,
975 )
976 },
977 );
978
979 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
980 async {
981 for event_emit_packages_chunk in
982 event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
983 {
984 diesel::insert_into(event_emit_package::table)
985 .values(event_emit_packages_chunk)
986 .on_conflict_do_nothing()
987 .execute(conn)
988 .await?;
989 }
990
991 for event_emit_modules_chunk in
992 event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
993 {
994 diesel::insert_into(event_emit_module::table)
995 .values(event_emit_modules_chunk)
996 .on_conflict_do_nothing()
997 .execute(conn)
998 .await?;
999 }
1000
1001 for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1002 diesel::insert_into(event_senders::table)
1003 .values(event_senders_chunk)
1004 .on_conflict_do_nothing()
1005 .execute(conn)
1006 .await?;
1007 }
1008
1009 for event_struct_packages_chunk in
1010 event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1011 {
1012 diesel::insert_into(event_struct_package::table)
1013 .values(event_struct_packages_chunk)
1014 .on_conflict_do_nothing()
1015 .execute(conn)
1016 .await?;
1017 }
1018
1019 for event_struct_modules_chunk in
1020 event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1021 {
1022 diesel::insert_into(event_struct_module::table)
1023 .values(event_struct_modules_chunk)
1024 .on_conflict_do_nothing()
1025 .execute(conn)
1026 .await?;
1027 }
1028
1029 for event_struct_names_chunk in
1030 event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1031 {
1032 diesel::insert_into(event_struct_name::table)
1033 .values(event_struct_names_chunk)
1034 .on_conflict_do_nothing()
1035 .execute(conn)
1036 .await?;
1037 }
1038
1039 for event_struct_instantiations_chunk in
1040 event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1041 {
1042 diesel::insert_into(event_struct_instantiation::table)
1043 .values(event_struct_instantiations_chunk)
1044 .on_conflict_do_nothing()
1045 .execute(conn)
1046 .await?;
1047 }
1048 Ok(())
1049 }
1050 .scope_boxed()
1051 })
1052 .await?;
1053
1054 let elapsed = guard.stop_and_record();
1055 info!(elapsed, "Persisted {} chunked event indices", len);
1056 Ok(())
1057 }
1058
1059 async fn persist_tx_indices_chunk(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
1060 use diesel_async::RunQueryDsl;
1061
1062 let guard = self
1063 .metrics
1064 .checkpoint_db_commit_latency_tx_indices_chunks
1065 .start_timer();
1066 let len = indices.len();
1067 let (
1068 affected_addresses,
1069 affected_objects,
1070 input_objects,
1071 changed_objects,
1072 pkgs,
1073 mods,
1074 funs,
1075 digests,
1076 kinds,
1077 ) = indices.into_iter().map(|i| i.split()).fold(
1078 (
1079 Vec::new(),
1080 Vec::new(),
1081 Vec::new(),
1082 Vec::new(),
1083 Vec::new(),
1084 Vec::new(),
1085 Vec::new(),
1086 Vec::new(),
1087 Vec::new(),
1088 ),
1089 |(
1090 mut tx_affected_addresses,
1091 mut tx_affected_objects,
1092 mut tx_input_objects,
1093 mut tx_changed_objects,
1094 mut tx_pkgs,
1095 mut tx_mods,
1096 mut tx_funs,
1097 mut tx_digests,
1098 mut tx_kinds,
1099 ),
1100 index| {
1101 tx_affected_addresses.extend(index.0);
1102 tx_affected_objects.extend(index.1);
1103 tx_input_objects.extend(index.2);
1104 tx_changed_objects.extend(index.3);
1105 tx_pkgs.extend(index.4);
1106 tx_mods.extend(index.5);
1107 tx_funs.extend(index.6);
1108 tx_digests.extend(index.7);
1109 tx_kinds.extend(index.8);
1110 (
1111 tx_affected_addresses,
1112 tx_affected_objects,
1113 tx_input_objects,
1114 tx_changed_objects,
1115 tx_pkgs,
1116 tx_mods,
1117 tx_funs,
1118 tx_digests,
1119 tx_kinds,
1120 )
1121 },
1122 );
1123
1124 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1125 async {
1126 for affected_addresses_chunk in
1127 affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1128 {
1129 diesel::insert_into(tx_affected_addresses::table)
1130 .values(affected_addresses_chunk)
1131 .on_conflict_do_nothing()
1132 .execute(conn)
1133 .await?;
1134 }
1135
1136 for affected_objects_chunk in
1137 affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1138 {
1139 diesel::insert_into(tx_affected_objects::table)
1140 .values(affected_objects_chunk)
1141 .on_conflict_do_nothing()
1142 .execute(conn)
1143 .await?;
1144 }
1145
1146 for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1147 diesel::insert_into(tx_input_objects::table)
1148 .values(input_objects_chunk)
1149 .on_conflict_do_nothing()
1150 .execute(conn)
1151 .await?;
1152 }
1153
1154 for changed_objects_chunk in
1155 changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
1156 {
1157 diesel::insert_into(tx_changed_objects::table)
1158 .values(changed_objects_chunk)
1159 .on_conflict_do_nothing()
1160 .execute(conn)
1161 .await?;
1162 }
1163
1164 for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1165 diesel::insert_into(tx_calls_pkg::table)
1166 .values(pkgs_chunk)
1167 .on_conflict_do_nothing()
1168 .execute(conn)
1169 .await?;
1170 }
1171
1172 for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1173 diesel::insert_into(tx_calls_mod::table)
1174 .values(mods_chunk)
1175 .on_conflict_do_nothing()
1176 .execute(conn)
1177 .await?;
1178 }
1179
1180 for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1181 diesel::insert_into(tx_calls_fun::table)
1182 .values(funs_chunk)
1183 .on_conflict_do_nothing()
1184 .execute(conn)
1185 .await?;
1186 }
1187
1188 for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1189 diesel::insert_into(tx_digests::table)
1190 .values(digests_chunk)
1191 .on_conflict_do_nothing()
1192 .execute(conn)
1193 .await?;
1194 }
1195
1196 for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
1197 diesel::insert_into(tx_kinds::table)
1198 .values(kinds_chunk)
1199 .on_conflict_do_nothing()
1200 .execute(conn)
1201 .await?;
1202 }
1203
1204 Ok(())
1205 }
1206 .scope_boxed()
1207 })
1208 .await?;
1209
1210 let elapsed = guard.stop_and_record();
1211 info!(elapsed, "Persisted {} chunked tx_indices", len);
1212 Ok(())
1213 }
1214
1215 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
1216 use diesel_async::RunQueryDsl;
1217 let guard = self
1218 .metrics
1219 .checkpoint_db_commit_latency_epoch
1220 .start_timer();
1221 let epoch_id = epoch.new_epoch.epoch;
1222
1223 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1224 async {
1225 if let Some(last_epoch) = &epoch.last_epoch {
1226 let last_epoch_id = last_epoch.epoch;
1227
1228 info!(last_epoch_id, "Persisting epoch end data.");
1229 diesel::update(epochs::table.filter(epochs::epoch.eq(last_epoch_id)))
1230 .set(last_epoch)
1231 .execute(conn)
1232 .await?;
1233 }
1234
1235 let epoch_id = epoch.new_epoch.epoch;
1236 info!(epoch_id, "Persisting epoch beginning info");
1237 let error_message =
1238 concat!("Failed to write to ", stringify!((epochs::table)), " DB");
1239 diesel::insert_into(epochs::table)
1240 .values(epoch.new_epoch)
1241 .on_conflict_do_nothing()
1242 .execute(conn)
1243 .await
1244 .map_err(IndexerError::from)
1245 .context(error_message)?;
1246 Ok::<(), IndexerError>(())
1247 }
1248 .scope_boxed()
1249 })
1250 .await
1251 .tap_ok(|_| {
1252 let elapsed = guard.stop_and_record();
1253 info!(elapsed, epoch_id, "Persisted epoch beginning info");
1254 })
1255 .tap_err(|e| {
1256 tracing::error!("Failed to persist epoch with error: {}", e);
1257 })
1258 }
1259
1260 async fn advance_epoch(&self, epoch_to_commit: EpochToCommit) -> Result<(), IndexerError> {
1261 use diesel_async::RunQueryDsl;
1262
1263 let mut connection = self.pool.get().await?;
1264
1265 let last_epoch_id = epoch_to_commit.last_epoch.as_ref().map(|e| e.epoch);
1266 if let Some(last_epoch_id) = last_epoch_id {
1268 let last_db_epoch: Option<StoredEpochInfo> = epochs::table
1269 .filter(epochs::epoch.eq(last_epoch_id))
1270 .first::<StoredEpochInfo>(&mut connection)
1271 .await
1272 .optional()
1273 .map_err(Into::into)
1274 .context("Failed to read last epoch from PostgresDB")?;
1275 if let Some(last_epoch) = last_db_epoch {
1276 let epoch_partition_data =
1277 EpochPartitionData::compose_data(epoch_to_commit, last_epoch);
1278 let table_partitions = self.partition_manager.get_table_partitions().await?;
1279 for (table, (_, last_partition)) in table_partitions {
1280 if !self
1282 .partition_manager
1283 .get_strategy(&table)
1284 .is_epoch_partitioned()
1285 {
1286 continue;
1287 }
1288 let guard = self.metrics.advance_epoch_latency.start_timer();
1289 self.partition_manager
1290 .advance_epoch(table.clone(), last_partition, &epoch_partition_data)
1291 .await?;
1292 let elapsed = guard.stop_and_record();
1293 info!(
1294 elapsed,
1295 "Advanced epoch partition {} for table {}",
1296 last_partition,
1297 table.clone()
1298 );
1299 }
1300 } else {
1301 tracing::error!("Last epoch: {} from PostgresDB is None.", last_epoch_id);
1302 }
1303 }
1304
1305 Ok(())
1306 }
1307
1308 async fn prune_checkpoints_table(&self, cp: u64) -> Result<(), IndexerError> {
1309 use diesel_async::RunQueryDsl;
1310 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1311 async {
1312 diesel::delete(
1313 checkpoints::table.filter(checkpoints::sequence_number.eq(cp as i64)),
1314 )
1315 .execute(conn)
1316 .await
1317 .map_err(IndexerError::from)
1318 .context("Failed to prune checkpoints table")?;
1319
1320 Ok::<(), IndexerError>(())
1321 }
1322 .scope_boxed()
1323 })
1324 .await
1325 }
1326
1327 async fn prune_event_indices_table(
1328 &self,
1329 min_tx: u64,
1330 max_tx: u64,
1331 ) -> Result<(), IndexerError> {
1332 use diesel_async::RunQueryDsl;
1333
1334 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1335 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1336 async {
1337 diesel::delete(
1338 event_emit_module::table
1339 .filter(event_emit_module::tx_sequence_number.between(min_tx, max_tx)),
1340 )
1341 .execute(conn)
1342 .await?;
1343
1344 diesel::delete(
1345 event_emit_package::table
1346 .filter(event_emit_package::tx_sequence_number.between(min_tx, max_tx)),
1347 )
1348 .execute(conn)
1349 .await?;
1350
1351 diesel::delete(
1352 event_senders::table
1353 .filter(event_senders::tx_sequence_number.between(min_tx, max_tx)),
1354 )
1355 .execute(conn)
1356 .await?;
1357
1358 diesel::delete(event_struct_instantiation::table.filter(
1359 event_struct_instantiation::tx_sequence_number.between(min_tx, max_tx),
1360 ))
1361 .execute(conn)
1362 .await?;
1363
1364 diesel::delete(
1365 event_struct_module::table
1366 .filter(event_struct_module::tx_sequence_number.between(min_tx, max_tx)),
1367 )
1368 .execute(conn)
1369 .await?;
1370
1371 diesel::delete(
1372 event_struct_name::table
1373 .filter(event_struct_name::tx_sequence_number.between(min_tx, max_tx)),
1374 )
1375 .execute(conn)
1376 .await?;
1377
1378 diesel::delete(
1379 event_struct_package::table
1380 .filter(event_struct_package::tx_sequence_number.between(min_tx, max_tx)),
1381 )
1382 .execute(conn)
1383 .await?;
1384
1385 Ok(())
1386 }
1387 .scope_boxed()
1388 })
1389 .await
1390 }
1391
1392 async fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> {
1393 use diesel_async::RunQueryDsl;
1394
1395 let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
1396 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1397 async {
1398 diesel::delete(
1399 tx_affected_addresses::table
1400 .filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)),
1401 )
1402 .execute(conn)
1403 .await?;
1404
1405 diesel::delete(
1406 tx_affected_objects::table
1407 .filter(tx_affected_objects::tx_sequence_number.between(min_tx, max_tx)),
1408 )
1409 .execute(conn)
1410 .await?;
1411
1412 diesel::delete(
1413 tx_input_objects::table
1414 .filter(tx_input_objects::tx_sequence_number.between(min_tx, max_tx)),
1415 )
1416 .execute(conn)
1417 .await?;
1418
1419 diesel::delete(
1420 tx_changed_objects::table
1421 .filter(tx_changed_objects::tx_sequence_number.between(min_tx, max_tx)),
1422 )
1423 .execute(conn)
1424 .await?;
1425
1426 diesel::delete(
1427 tx_calls_pkg::table
1428 .filter(tx_calls_pkg::tx_sequence_number.between(min_tx, max_tx)),
1429 )
1430 .execute(conn)
1431 .await?;
1432
1433 diesel::delete(
1434 tx_calls_mod::table
1435 .filter(tx_calls_mod::tx_sequence_number.between(min_tx, max_tx)),
1436 )
1437 .execute(conn)
1438 .await?;
1439
1440 diesel::delete(
1441 tx_calls_fun::table
1442 .filter(tx_calls_fun::tx_sequence_number.between(min_tx, max_tx)),
1443 )
1444 .execute(conn)
1445 .await?;
1446
1447 diesel::delete(
1448 tx_digests::table
1449 .filter(tx_digests::tx_sequence_number.between(min_tx, max_tx)),
1450 )
1451 .execute(conn)
1452 .await?;
1453
1454 Ok(())
1455 }
1456 .scope_boxed()
1457 })
1458 .await
1459 }
1460
1461 async fn prune_cp_tx_table(&self, cp: u64) -> Result<(), IndexerError> {
1462 use diesel_async::RunQueryDsl;
1463
1464 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1465 async {
1466 diesel::delete(
1467 pruner_cp_watermark::table
1468 .filter(pruner_cp_watermark::checkpoint_sequence_number.eq(cp as i64)),
1469 )
1470 .execute(conn)
1471 .await
1472 .map_err(IndexerError::from)
1473 .context("Failed to prune pruner_cp_watermark table")?;
1474 Ok(())
1475 }
1476 .scope_boxed()
1477 })
1478 .await
1479 }
1480
1481 async fn get_network_total_transactions_by_end_of_epoch(
1482 &self,
1483 epoch: u64,
1484 ) -> Result<Option<u64>, IndexerError> {
1485 use diesel_async::RunQueryDsl;
1486
1487 let mut connection = self.pool.get().await?;
1488
1489 Ok(Some(
1492 checkpoints::table
1493 .filter(checkpoints::epoch.eq(epoch as i64))
1494 .select(checkpoints::network_total_transactions)
1495 .order_by(checkpoints::sequence_number.desc())
1496 .first::<i64>(&mut connection)
1497 .await
1498 .map_err(Into::into)
1499 .context("Failed to get network total transactions in epoch")
1500 .map(|v| v as u64)?,
1501 ))
1502 }
1503
1504 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
1505 &self,
1506 watermark: CommitterWatermark,
1507 ) -> Result<(), IndexerError>
1508 where
1509 E::Iterator: Iterator<Item: AsRef<str>>,
1510 {
1511 use diesel_async::RunQueryDsl;
1512
1513 let guard = self
1514 .metrics
1515 .checkpoint_db_commit_latency_watermarks
1516 .start_timer();
1517
1518 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1519 let upper_bound_updates = E::iter()
1520 .map(|table| StoredWatermark::from_upper_bound_update(table.as_ref(), watermark))
1521 .collect::<Vec<_>>();
1522 async {
1523 diesel::insert_into(watermarks::table)
1524 .values(upper_bound_updates)
1525 .on_conflict(watermarks::pipeline)
1526 .do_update()
1527 .set((
1528 watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
1529 watermarks::checkpoint_hi_inclusive
1530 .eq(excluded(watermarks::checkpoint_hi_inclusive)),
1531 watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
1532 ))
1533 .execute(conn)
1534 .await
1535 .map_err(IndexerError::from)
1536 .context("Failed to update watermarks upper bound")?;
1537
1538 Ok::<(), IndexerError>(())
1539 }
1540 .scope_boxed()
1541 })
1542 .await
1543 .tap_ok(|_| {
1544 let elapsed = guard.stop_and_record();
1545 info!(elapsed, "Persisted watermarks");
1546 })
1547 .tap_err(|e| {
1548 tracing::error!("Failed to persist watermarks with error: {}", e);
1549 })
1550 }
1551
1552 async fn map_epochs_to_cp_tx(
1553 &self,
1554 epochs: &[u64],
1555 ) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
1556 use diesel_async::RunQueryDsl;
1557
1558 let mut connection = self.pool.get().await?;
1559
1560 let results: Vec<(i64, i64, Option<i64>)> = epochs::table
1561 .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
1562 .select((
1563 epochs::epoch,
1564 epochs::first_checkpoint_id,
1565 epochs::first_tx_sequence_number,
1566 ))
1567 .load::<(i64, i64, Option<i64>)>(&mut connection)
1568 .await
1569 .map_err(Into::into)
1570 .context("Failed to fetch first checkpoint and tx seq num for epochs")?;
1571
1572 Ok(results
1573 .into_iter()
1574 .map(|(epoch, checkpoint, tx)| {
1575 (
1576 epoch as u64,
1577 (checkpoint as u64, tx.unwrap_or_default() as u64),
1578 )
1579 })
1580 .collect())
1581 }
1582
1583 async fn update_watermarks_lower_bound(
1584 &self,
1585 watermarks: Vec<(PrunableTable, u64)>,
1586 ) -> Result<(), IndexerError> {
1587 use diesel_async::RunQueryDsl;
1588
1589 let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
1590 let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?;
1591 let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
1592 .into_iter()
1593 .map(|(table, epoch)| {
1594 let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
1595 IndexerError::PersistentStorageDataCorruptionError(format!(
1596 "Epoch {} not found in epoch mapping",
1597 epoch
1598 ))
1599 })?;
1600
1601 Ok(StoredWatermark::from_lower_bound_update(
1602 table.as_ref(),
1603 epoch,
1604 table.select_reader_lo(*checkpoint, *tx),
1605 ))
1606 })
1607 .collect();
1608 let lower_bound_updates = lookups?;
1609
1610 let guard = self
1611 .metrics
1612 .checkpoint_db_commit_latency_watermarks
1613 .start_timer();
1614
1615 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1616 async {
1617 use diesel::dsl::sql;
1618 use diesel::query_dsl::methods::FilterDsl;
1619
1620 diesel::insert_into(watermarks::table)
1621 .values(lower_bound_updates)
1622 .on_conflict(watermarks::pipeline)
1623 .do_update()
1624 .set((
1625 watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
1626 watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
1627 watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
1628 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1629 )),
1630 ))
1631 .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
1632 .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
1633 .filter(
1634 diesel::dsl::sql::<diesel::sql_types::BigInt>(
1635 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1636 )
1637 .gt(watermarks::timestamp_ms),
1638 )
1639 .execute(conn)
1640 .await?;
1641
1642 Ok::<(), IndexerError>(())
1643 }
1644 .scope_boxed()
1645 })
1646 .await
1647 .tap_ok(|_| {
1648 let elapsed = guard.stop_and_record();
1649 info!(elapsed, "Persisted watermarks");
1650 })
1651 .tap_err(|e| {
1652 tracing::error!("Failed to persist watermarks with error: {}", e);
1653 })
1654 }
1655
1656 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
1657 use diesel_async::RunQueryDsl;
1658
1659 read_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
1662 async {
1663 let stored = watermarks::table
1664 .load::<StoredWatermark>(conn)
1665 .await
1666 .map_err(Into::into)
1667 .context("Failed reading watermarks from PostgresDB")?;
1668
1669 let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
1670 "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
1671 ))
1672 .get_result(conn)
1673 .await
1674 .map_err(Into::into)
1675 .context("Failed reading current timestamp from PostgresDB")?;
1676
1677 Ok((stored, timestamp))
1678 }
1679 .scope_boxed()
1680 })
1681 .await
1682 }
1683}
1684
1685#[async_trait]
1686impl IndexerStore for PgIndexerStore {
1687 async fn get_latest_checkpoint_sequence_number(&self) -> Result<Option<u64>, IndexerError> {
1688 self.get_latest_checkpoint_sequence_number().await
1689 }
1690
1691 async fn get_available_epoch_range(&self) -> Result<(u64, u64), IndexerError> {
1692 self.get_prunable_epoch_range().await
1693 }
1694
1695 async fn get_available_checkpoint_range(&self) -> Result<(u64, u64), IndexerError> {
1696 self.get_available_checkpoint_range().await
1697 }
1698
1699 async fn get_chain_identifier(&self) -> Result<Option<Vec<u8>>, IndexerError> {
1700 self.get_chain_identifier().await
1701 }
1702
1703 async fn get_latest_object_snapshot_checkpoint_sequence_number(
1704 &self,
1705 ) -> Result<Option<u64>, IndexerError> {
1706 self.get_latest_object_snapshot_checkpoint_sequence_number()
1707 .await
1708 }
1709
1710 async fn persist_objects(
1711 &self,
1712 object_changes: Vec<TransactionObjectChangesToCommit>,
1713 ) -> Result<(), IndexerError> {
1714 if object_changes.is_empty() {
1715 return Ok(());
1716 }
1717 let guard = self
1718 .metrics
1719 .checkpoint_db_commit_latency_objects
1720 .start_timer();
1721 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1722 let object_mutations = indexed_mutations
1723 .into_iter()
1724 .map(StoredObject::from)
1725 .collect::<Vec<_>>();
1726 let object_deletions = indexed_deletions
1727 .into_iter()
1728 .map(StoredDeletedObject::from)
1729 .collect::<Vec<_>>();
1730 let mutation_len = object_mutations.len();
1731 let deletion_len = object_deletions.len();
1732
1733 let object_mutation_chunks =
1734 chunk!(object_mutations, self.config.parallel_objects_chunk_size);
1735 let object_deletion_chunks =
1736 chunk!(object_deletions, self.config.parallel_objects_chunk_size);
1737 let mutation_futures = object_mutation_chunks
1738 .into_iter()
1739 .map(|c| self.persist_object_mutation_chunk(c))
1740 .map(Either::Left);
1741 let deletion_futures = object_deletion_chunks
1742 .into_iter()
1743 .map(|c| self.persist_object_deletion_chunk(c))
1744 .map(Either::Right);
1745 let all_futures = mutation_futures.chain(deletion_futures).collect::<Vec<_>>();
1746
1747 futures::future::join_all(all_futures)
1748 .await
1749 .into_iter()
1750 .collect::<Result<Vec<_>, _>>()
1751 .map_err(|e| {
1752 IndexerError::PostgresWriteError(format!(
1753 "Failed to persist all object mutation or deletion chunks: {:?}",
1754 e
1755 ))
1756 })?;
1757 let elapsed = guard.stop_and_record();
1758 info!(
1759 elapsed,
1760 "Persisted {} objects mutations and {} deletions", mutation_len, deletion_len
1761 );
1762 Ok(())
1763 }
1764
1765 async fn persist_objects_snapshot(
1766 &self,
1767 object_changes: Vec<TransactionObjectChangesToCommit>,
1768 ) -> Result<(), IndexerError> {
1769 if object_changes.is_empty() {
1770 return Ok(());
1771 }
1772 let guard = self
1773 .metrics
1774 .checkpoint_db_commit_latency_objects_snapshot
1775 .start_timer();
1776 let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
1777 let object_snapshot_mutations: Vec<StoredObjectSnapshot> = indexed_mutations
1778 .into_iter()
1779 .map(StoredObjectSnapshot::from)
1780 .collect();
1781 let object_snapshot_deletions: Vec<StoredObjectSnapshot> = indexed_deletions
1782 .into_iter()
1783 .map(StoredObjectSnapshot::from)
1784 .collect();
1785 let mutation_len = object_snapshot_mutations.len();
1786 let deletion_len = object_snapshot_deletions.len();
1787 let object_snapshot_mutation_chunks = chunk!(
1788 object_snapshot_mutations,
1789 self.config.parallel_objects_chunk_size
1790 );
1791 let object_snapshot_deletion_chunks = chunk!(
1792 object_snapshot_deletions,
1793 self.config.parallel_objects_chunk_size
1794 );
1795 let mutation_futures = object_snapshot_mutation_chunks
1796 .into_iter()
1797 .map(|c| self.persist_object_snapshot_mutation_chunk(c))
1798 .map(Either::Left)
1799 .collect::<Vec<_>>();
1800 let deletion_futures = object_snapshot_deletion_chunks
1801 .into_iter()
1802 .map(|c| self.persist_object_snapshot_deletion_chunk(c))
1803 .map(Either::Right)
1804 .collect::<Vec<_>>();
1805 let all_futures = mutation_futures
1806 .into_iter()
1807 .chain(deletion_futures)
1808 .collect::<Vec<_>>();
1809 futures::future::join_all(all_futures)
1810 .await
1811 .into_iter()
1812 .collect::<Result<Vec<_>, _>>()
1813 .map_err(|e| {
1814 IndexerError::PostgresWriteError(format!(
1815 "Failed to persist object snapshot mutation or deletion chunks: {:?}",
1816 e
1817 ))
1818 })
1819 .tap_ok(|_| {
1820 let elapsed = guard.stop_and_record();
1821 info!(
1822 elapsed,
1823 "Persisted {} objects snapshot mutations and {} deletions",
1824 mutation_len,
1825 deletion_len
1826 );
1827 })
1828 .tap_err(|e| {
1829 tracing::error!(
1830 "Failed to persist object snapshot mutation or deletion chunks: {:?}",
1831 e
1832 )
1833 })?;
1834 Ok(())
1835 }
1836
1837 async fn persist_object_history(
1838 &self,
1839 object_changes: Vec<TransactionObjectChangesToCommit>,
1840 ) -> Result<(), IndexerError> {
1841 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1842 .map(|val| val.eq_ignore_ascii_case("true"))
1843 .unwrap_or(false);
1844 if skip_history {
1845 info!("skipping object history");
1846 return Ok(());
1847 }
1848
1849 if object_changes.is_empty() {
1850 return Ok(());
1851 }
1852 let objects = make_objects_history_to_commit(object_changes);
1853 let guard = self
1854 .metrics
1855 .checkpoint_db_commit_latency_objects_history
1856 .start_timer();
1857
1858 let len = objects.len();
1859 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1860 let futures = chunks
1861 .into_iter()
1862 .map(|c| self.persist_objects_history_chunk(c))
1863 .collect::<Vec<_>>();
1864
1865 futures::future::join_all(futures)
1866 .await
1867 .into_iter()
1868 .collect::<Result<Vec<_>, _>>()
1869 .map_err(|e| {
1870 IndexerError::PostgresWriteError(format!(
1871 "Failed to persist all objects history chunks: {:?}",
1872 e
1873 ))
1874 })?;
1875 let elapsed = guard.stop_and_record();
1876 info!(elapsed, "Persisted {} objects history", len);
1877 Ok(())
1878 }
1879
1880 async fn persist_full_objects_history(
1883 &self,
1884 object_changes: Vec<TransactionObjectChangesToCommit>,
1885 ) -> Result<(), IndexerError> {
1886 let skip_history = std::env::var("SKIP_OBJECT_HISTORY")
1887 .map(|val| val.eq_ignore_ascii_case("true"))
1888 .unwrap_or(false);
1889 if skip_history {
1890 info!("skipping object history");
1891 return Ok(());
1892 }
1893
1894 if object_changes.is_empty() {
1895 return Ok(());
1896 }
1897 let objects: Vec<StoredFullHistoryObject> = object_changes
1898 .into_iter()
1899 .flat_map(|c| {
1900 let TransactionObjectChangesToCommit {
1901 changed_objects,
1902 deleted_objects,
1903 } = c;
1904 changed_objects
1905 .into_iter()
1906 .map(|o| o.into())
1907 .chain(deleted_objects.into_iter().map(|o| o.into()))
1908 })
1909 .collect();
1910 let guard = self
1911 .metrics
1912 .checkpoint_db_commit_latency_full_objects_history
1913 .start_timer();
1914
1915 let len = objects.len();
1916 let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
1917 let futures = chunks
1918 .into_iter()
1919 .map(|c| self.persist_full_objects_history_chunk(c))
1920 .collect::<Vec<_>>();
1921
1922 futures::future::join_all(futures)
1923 .await
1924 .into_iter()
1925 .collect::<Result<Vec<_>, _>>()
1926 .map_err(|e| {
1927 IndexerError::PostgresWriteError(format!(
1928 "Failed to persist all full objects history chunks: {:?}",
1929 e
1930 ))
1931 })?;
1932 let elapsed = guard.stop_and_record();
1933 info!(elapsed, "Persisted {} full objects history", len);
1934 Ok(())
1935 }
1936
1937 async fn persist_objects_version(
1938 &self,
1939 object_versions: Vec<StoredObjectVersion>,
1940 ) -> Result<(), IndexerError> {
1941 if object_versions.is_empty() {
1942 return Ok(());
1943 }
1944
1945 let guard = self
1946 .metrics
1947 .checkpoint_db_commit_latency_objects_version
1948 .start_timer();
1949
1950 let len = object_versions.len();
1951 let chunks = chunk!(object_versions, self.config.parallel_objects_chunk_size);
1952 let futures = chunks
1953 .into_iter()
1954 .map(|c| self.persist_objects_version_chunk(c))
1955 .collect::<Vec<_>>();
1956
1957 futures::future::join_all(futures)
1958 .await
1959 .into_iter()
1960 .collect::<Result<Vec<_>, _>>()
1961 .map_err(|e| {
1962 IndexerError::PostgresWriteError(format!(
1963 "Failed to persist all objects version chunks: {:?}",
1964 e
1965 ))
1966 })?;
1967
1968 let elapsed = guard.stop_and_record();
1969 info!(elapsed, "Persisted {} object versions", len);
1970 Ok(())
1971 }
1972
1973 async fn persist_checkpoints(
1974 &self,
1975 checkpoints: Vec<IndexedCheckpoint>,
1976 ) -> Result<(), IndexerError> {
1977 self.persist_checkpoints(checkpoints).await
1978 }
1979
1980 async fn persist_transactions(
1981 &self,
1982 transactions: Vec<IndexedTransaction>,
1983 ) -> Result<(), IndexerError> {
1984 let guard = self
1985 .metrics
1986 .checkpoint_db_commit_latency_transactions
1987 .start_timer();
1988 let len = transactions.len();
1989
1990 let chunks = chunk!(transactions, self.config.parallel_chunk_size);
1991 let futures = chunks
1992 .into_iter()
1993 .map(|c| self.persist_transactions_chunk(c))
1994 .collect::<Vec<_>>();
1995
1996 futures::future::join_all(futures)
1997 .await
1998 .into_iter()
1999 .collect::<Result<Vec<_>, _>>()
2000 .map_err(|e| {
2001 IndexerError::PostgresWriteError(format!(
2002 "Failed to persist all transactions chunks: {:?}",
2003 e
2004 ))
2005 })?;
2006 let elapsed = guard.stop_and_record();
2007 info!(elapsed, "Persisted {} transactions", len);
2008 Ok(())
2009 }
2010
2011 async fn persist_events(&self, events: Vec<IndexedEvent>) -> Result<(), IndexerError> {
2012 if events.is_empty() {
2013 return Ok(());
2014 }
2015 let len = events.len();
2016 let guard = self
2017 .metrics
2018 .checkpoint_db_commit_latency_events
2019 .start_timer();
2020 let chunks = chunk!(events, self.config.parallel_chunk_size);
2021 let futures = chunks
2022 .into_iter()
2023 .map(|c| self.persist_events_chunk(c))
2024 .collect::<Vec<_>>();
2025
2026 futures::future::join_all(futures)
2027 .await
2028 .into_iter()
2029 .collect::<Result<Vec<_>, _>>()
2030 .map_err(|e| {
2031 IndexerError::PostgresWriteError(format!(
2032 "Failed to persist all events chunks: {:?}",
2033 e
2034 ))
2035 })?;
2036 let elapsed = guard.stop_and_record();
2037 info!(elapsed, "Persisted {} events", len);
2038 Ok(())
2039 }
2040
2041 async fn persist_displays(
2042 &self,
2043 display_updates: BTreeMap<String, StoredDisplay>,
2044 ) -> Result<(), IndexerError> {
2045 if display_updates.is_empty() {
2046 return Ok(());
2047 }
2048 self.persist_display_updates(display_updates.values().cloned().collect::<Vec<_>>())
2049 .await
2050 }
2051
2052 async fn persist_packages(&self, packages: Vec<IndexedPackage>) -> Result<(), IndexerError> {
2053 if packages.is_empty() {
2054 return Ok(());
2055 }
2056 self.persist_packages(packages).await
2057 }
2058
2059 async fn persist_event_indices(&self, indices: Vec<EventIndex>) -> Result<(), IndexerError> {
2060 if indices.is_empty() {
2061 return Ok(());
2062 }
2063 let len = indices.len();
2064 let guard = self
2065 .metrics
2066 .checkpoint_db_commit_latency_event_indices
2067 .start_timer();
2068 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2069
2070 let futures = chunks
2071 .into_iter()
2072 .map(|chunk| self.persist_event_indices_chunk(chunk))
2073 .collect::<Vec<_>>();
2074 futures::future::join_all(futures)
2075 .await
2076 .into_iter()
2077 .collect::<Result<Vec<_>, _>>()
2078 .map_err(|e| {
2079 IndexerError::PostgresWriteError(format!(
2080 "Failed to persist all event_indices chunks: {:?}",
2081 e
2082 ))
2083 })
2084 .tap_ok(|_| {
2085 let elapsed = guard.stop_and_record();
2086 info!(elapsed, "Persisted {} event_indices chunks", len);
2087 })
2088 .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?;
2089 Ok(())
2090 }
2091
2092 async fn persist_tx_indices(&self, indices: Vec<TxIndex>) -> Result<(), IndexerError> {
2093 if indices.is_empty() {
2094 return Ok(());
2095 }
2096 let len = indices.len();
2097 let guard = self
2098 .metrics
2099 .checkpoint_db_commit_latency_tx_indices
2100 .start_timer();
2101 let chunks = chunk!(indices, self.config.parallel_chunk_size);
2102
2103 let futures = chunks
2104 .into_iter()
2105 .map(|chunk| self.persist_tx_indices_chunk(chunk))
2106 .collect::<Vec<_>>();
2107 futures::future::join_all(futures)
2108 .await
2109 .into_iter()
2110 .collect::<Result<Vec<_>, _>>()
2111 .map_err(|e| {
2112 IndexerError::PostgresWriteError(format!(
2113 "Failed to persist all tx_indices chunks: {:?}",
2114 e
2115 ))
2116 })
2117 .tap_ok(|_| {
2118 let elapsed = guard.stop_and_record();
2119 info!(elapsed, "Persisted {} tx_indices chunks", len);
2120 })
2121 .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?;
2122 Ok(())
2123 }
2124
2125 async fn persist_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2126 self.persist_epoch(epoch).await
2127 }
2128
2129 async fn advance_epoch(&self, epoch: EpochToCommit) -> Result<(), IndexerError> {
2130 self.advance_epoch(epoch).await
2131 }
2132
2133 async fn prune_epoch(&self, epoch: u64) -> Result<(), IndexerError> {
2134 let (mut min_cp, max_cp) = match self.get_checkpoint_range_for_epoch(epoch).await? {
2135 (min_cp, Some(max_cp)) => Ok((min_cp, max_cp)),
2136 _ => Err(IndexerError::PostgresReadError(format!(
2137 "Failed to get checkpoint range for epoch {}",
2138 epoch
2139 ))),
2140 }?;
2141
2142 let min_prunable_cp = self.get_min_prunable_checkpoint().await?;
2147 min_cp = std::cmp::max(min_cp, min_prunable_cp);
2148 for cp in min_cp..=max_cp {
2149 info!(
2157 "Pruning checkpoint {} of epoch {} (min_prunable_cp: {})",
2158 cp, epoch, min_prunable_cp
2159 );
2160
2161 let (min_tx, max_tx) = self.get_transaction_range_for_checkpoint(cp).await?;
2162 self.prune_tx_indices_table(min_tx, max_tx).await?;
2163 info!(
2164 "Pruned transactions for checkpoint {} from tx {} to tx {}",
2165 cp, min_tx, max_tx
2166 );
2167 self.prune_event_indices_table(min_tx, max_tx).await?;
2168 info!(
2169 "Pruned events of transactions for checkpoint {} from tx {} to tx {}",
2170 cp, min_tx, max_tx
2171 );
2172 self.metrics.last_pruned_transaction.set(max_tx as i64);
2173
2174 self.prune_cp_tx_table(cp).await?;
2175 self.prune_checkpoints_table(cp).await?;
2178
2179 info!("Pruned checkpoint {} of epoch {}", cp, epoch);
2180 self.metrics.last_pruned_checkpoint.set(cp as i64);
2181 }
2182
2183 Ok(())
2184 }
2185
2186 async fn upload_display(&self, epoch_number: u64) -> Result<(), IndexerError> {
2187 use diesel_async::RunQueryDsl;
2188 let mut connection = self.pool.get().await?;
2189 let mut buffer = Cursor::new(Vec::new());
2190 {
2191 let mut writer = Writer::from_writer(&mut buffer);
2192 let displays = display::table
2193 .load::<StoredDisplay>(&mut connection)
2194 .await
2195 .map_err(Into::into)
2196 .context("Failed to get display from database")?;
2197 info!("Read {} displays", displays.len());
2198 writer
2199 .write_record(["object_type", "id", "version", "bcs"])
2200 .map_err(|_| {
2201 IndexerError::GcsError("Failed to write display to csv".to_string())
2202 })?;
2203 for display in displays {
2204 writer
2205 .write_record(&[
2206 display.object_type,
2207 hex::encode(display.id),
2208 display.version.to_string(),
2209 hex::encode(display.bcs),
2210 ])
2211 .map_err(|_| IndexerError::GcsError("Failed to write to csv".to_string()))?;
2212 }
2213 writer
2214 .flush()
2215 .map_err(|_| IndexerError::GcsError("Failed to flush csv".to_string()))?;
2216 }
2217
2218 if let (Some(cred_path), Some(bucket)) = (
2219 self.config.gcs_cred_path.clone(),
2220 self.config.gcs_display_bucket.clone(),
2221 ) {
2222 let remote_store_config = ObjectStoreConfig {
2223 object_store: Some(ObjectStoreType::GCS),
2224 bucket: Some(bucket),
2225 google_service_account: Some(cred_path),
2226 object_store_connection_limit: 200,
2227 no_sign_request: false,
2228 ..Default::default()
2229 };
2230 let remote_store = remote_store_config.make().map_err(|e| {
2231 IndexerError::GcsError(format!("Failed to make GCS remote store: {}", e))
2232 })?;
2233 let path = Path::from(format!("display_{}.csv", epoch_number).as_str());
2234 put(&remote_store, &path, buffer.into_inner().into())
2235 .await
2236 .map_err(|e| IndexerError::GcsError(format!("Failed to put to GCS: {}", e)))?;
2237 } else {
2238 warn!("Either GCS cred path or bucket is not set, skipping display upload.");
2239 }
2240 Ok(())
2241 }
2242
2243 async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError> {
2244 let cursor = Cursor::new(bytes);
2245 let mut csv_reader = ReaderBuilder::new().has_headers(true).from_reader(cursor);
2246 let displays = csv_reader
2247 .deserialize()
2248 .collect::<Result<Vec<StoredDisplay>, csv::Error>>()
2249 .map_err(|e| {
2250 IndexerError::GcsError(format!("Failed to deserialize display records: {}", e))
2251 })?;
2252 self.persist_display_updates(displays).await
2253 }
2254
2255 async fn get_network_total_transactions_by_end_of_epoch(
2256 &self,
2257 epoch: u64,
2258 ) -> Result<Option<u64>, IndexerError> {
2259 self.get_network_total_transactions_by_end_of_epoch(epoch)
2260 .await
2261 }
2262
2263 async fn persist_protocol_configs_and_feature_flags(
2266 &self,
2267 chain_id: Vec<u8>,
2268 ) -> Result<(), IndexerError> {
2269 use diesel_async::RunQueryDsl;
2270
2271 let chain_id = ChainIdentifier::from(
2272 CheckpointDigest::try_from(chain_id).expect("Unable to convert chain id"),
2273 );
2274
2275 let mut all_configs = vec![];
2276 let mut all_flags = vec![];
2277
2278 let (start_version, end_version) = self.get_protocol_version_index_range().await?;
2279 info!(
2280 "Persisting protocol configs with start_version: {}, end_version: {}",
2281 start_version, end_version
2282 );
2283
2284 for version in start_version..=end_version {
2286 let protocol_configs = ProtocolConfig::get_for_version_if_supported(
2287 (version as u64).into(),
2288 chain_id.chain(),
2289 )
2290 .ok_or(IndexerError::GenericError(format!(
2291 "Unable to fetch protocol version {} and chain {:?}",
2292 version,
2293 chain_id.chain()
2294 )))?;
2295 let configs_vec = protocol_configs
2296 .attr_map()
2297 .into_iter()
2298 .map(|(k, v)| StoredProtocolConfig {
2299 protocol_version: version,
2300 config_name: k,
2301 config_value: v.map(|v| v.to_string()),
2302 })
2303 .collect::<Vec<_>>();
2304 all_configs.extend(configs_vec);
2305
2306 let feature_flags = protocol_configs
2307 .feature_map()
2308 .into_iter()
2309 .map(|(k, v)| StoredFeatureFlag {
2310 protocol_version: version,
2311 flag_name: k,
2312 flag_value: v,
2313 })
2314 .collect::<Vec<_>>();
2315 all_flags.extend(feature_flags);
2316 }
2317
2318 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
2321 async {
2322 for config_chunk in all_configs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2323 diesel::insert_into(protocol_configs::table)
2324 .values(config_chunk)
2325 .on_conflict_do_nothing()
2326 .execute(conn)
2327 .await
2328 .map_err(IndexerError::from)
2329 .context("Failed to write to protocol_configs table")?;
2330 }
2331
2332 for flag_chunk in all_flags.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) {
2333 diesel::insert_into(feature_flags::table)
2334 .values(flag_chunk)
2335 .on_conflict_do_nothing()
2336 .execute(conn)
2337 .await
2338 .map_err(IndexerError::from)
2339 .context("Failed to write to feature_flags table")?;
2340 }
2341 Ok::<(), IndexerError>(())
2342 }
2343 .scope_boxed()
2344 })
2345 .await?;
2346 Ok(())
2347 }
2348
2349 async fn persist_chain_identifier(
2350 &self,
2351 checkpoint_digest: Vec<u8>,
2352 ) -> Result<(), IndexerError> {
2353 use diesel_async::RunQueryDsl;
2354
2355 transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
2356 async {
2357 diesel::insert_into(chain_identifier::table)
2358 .values(StoredChainIdentifier { checkpoint_digest })
2359 .on_conflict_do_nothing()
2360 .execute(conn)
2361 .await
2362 .map_err(IndexerError::from)
2363 .context("failed to write to chain_identifier table")?;
2364 Ok::<(), IndexerError>(())
2365 }
2366 .scope_boxed()
2367 })
2368 .await?;
2369 Ok(())
2370 }
2371
2372 async fn persist_raw_checkpoints(
2373 &self,
2374 checkpoints: Vec<StoredRawCheckpoint>,
2375 ) -> Result<(), IndexerError> {
2376 self.persist_raw_checkpoints_impl(&checkpoints).await
2377 }
2378
2379 async fn update_watermarks_upper_bound<E: IntoEnumIterator>(
2380 &self,
2381 watermark: CommitterWatermark,
2382 ) -> Result<(), IndexerError>
2383 where
2384 E::Iterator: Iterator<Item: AsRef<str>>,
2385 {
2386 self.update_watermarks_upper_bound::<E>(watermark).await
2387 }
2388
2389 async fn update_watermarks_lower_bound(
2390 &self,
2391 watermarks: Vec<(PrunableTable, u64)>,
2392 ) -> Result<(), IndexerError> {
2393 self.update_watermarks_lower_bound(watermarks).await
2394 }
2395
2396 async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError> {
2397 self.get_watermarks().await
2398 }
2399}
2400
2401fn make_objects_history_to_commit(
2402 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2403) -> Vec<StoredHistoryObject> {
2404 let deleted_objects: Vec<StoredHistoryObject> = tx_object_changes
2405 .clone()
2406 .into_iter()
2407 .flat_map(|changes| changes.deleted_objects)
2408 .map(|o| o.into())
2409 .collect();
2410 let mutated_objects: Vec<StoredHistoryObject> = tx_object_changes
2411 .into_iter()
2412 .flat_map(|changes| changes.changed_objects)
2413 .map(|o| o.into())
2414 .collect();
2415 deleted_objects.into_iter().chain(mutated_objects).collect()
2416}
2417
2418fn retain_latest_indexed_objects(
2423 tx_object_changes: Vec<TransactionObjectChangesToCommit>,
2424) -> (Vec<IndexedObject>, Vec<IndexedDeletedObject>) {
2425 let (mutations, deletions) = tx_object_changes
2428 .into_iter()
2429 .flat_map(|change| {
2430 change
2431 .changed_objects
2432 .into_iter()
2433 .map(Either::Left)
2434 .chain(
2435 change
2436 .deleted_objects
2437 .into_iter()
2438 .map(Either::Right),
2439 )
2440 })
2441 .fold(
2442 (HashMap::<ObjectID, IndexedObject>::new(), HashMap::<ObjectID, IndexedDeletedObject>::new()),
2443 |(mut mutations, mut deletions), either_change| {
2444 match either_change {
2445 Either::Left(mutation) => {
2449 let id = mutation.object.id();
2450 let mutation_version = mutation.object.version();
2451 if let Some(existing) = deletions.remove(&id) {
2452 assert!(
2453 existing.object_version < mutation_version.value(),
2454 "Mutation version ({:?}) should be greater than existing deletion version ({:?}) for object {:?}",
2455 mutation_version,
2456 existing.object_version,
2457 id
2458 );
2459 }
2460 if let Some(existing) = mutations.insert(id, mutation) {
2461 assert!(
2462 existing.object.version() < mutation_version,
2463 "Mutation version ({:?}) should be greater than existing mutation version ({:?}) for object {:?}",
2464 mutation_version,
2465 existing.object.version(),
2466 id
2467 );
2468 }
2469 }
2470 Either::Right(deletion) => {
2471 let id = deletion.object_id;
2472 let deletion_version = deletion.object_version;
2473 if let Some(existing) = mutations.remove(&id) {
2474 assert!(
2475 existing.object.version().value() < deletion_version,
2476 "Deletion version ({:?}) should be greater than existing mutation version ({:?}) for object {:?}",
2477 deletion_version,
2478 existing.object.version(),
2479 id
2480 );
2481 }
2482 if let Some(existing) = deletions.insert(id, deletion) {
2483 assert!(
2484 existing.object_version < deletion_version,
2485 "Deletion version ({:?}) should be greater than existing deletion version ({:?}) for object {:?}",
2486 deletion_version,
2487 existing.object_version,
2488 id
2489 );
2490 }
2491 }
2492 }
2493 (mutations, deletions)
2494 },
2495 );
2496 (
2497 mutations.into_values().collect(),
2498 deletions.into_values().collect(),
2499 )
2500}