sui_indexer/store/
pg_partition_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use diesel::QueryableByName;
5use diesel::sql_types::{BigInt, VarChar};
6use diesel_async::scoped_futures::ScopedFutureExt;
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9use tracing::{error, info};
10
11use crate::database::ConnectionPool;
12use crate::errors::IndexerError;
13use crate::handlers::EpochToCommit;
14use crate::models::epoch::StoredEpochInfo;
15use crate::store::transaction_with_retry;
16
17const GET_PARTITION_SQL: &str = r"
18SELECT parent.relname                                            AS table_name,
19       MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
20       MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
21FROM pg_inherits
22         JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
23         JOIN pg_class child ON pg_inherits.inhrelid = child.oid
24         JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
25         JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
26WHERE parent.relkind = 'p'
27GROUP BY table_name;
28";
29
30#[derive(Clone)]
31pub struct PgPartitionManager {
32    pool: ConnectionPool,
33
34    partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
35}
36
37#[derive(Clone, Copy)]
38pub enum PgPartitionStrategy {
39    CheckpointSequenceNumber,
40    TxSequenceNumber,
41    ObjectId,
42}
43
44impl PgPartitionStrategy {
45    pub fn is_epoch_partitioned(&self) -> bool {
46        matches!(
47            self,
48            Self::CheckpointSequenceNumber | Self::TxSequenceNumber
49        )
50    }
51}
52
53#[derive(Clone, Debug)]
54pub struct EpochPartitionData {
55    last_epoch: u64,
56    next_epoch: u64,
57    last_epoch_start_cp: u64,
58    next_epoch_start_cp: u64,
59    last_epoch_start_tx: u64,
60    next_epoch_start_tx: u64,
61}
62
63impl EpochPartitionData {
64    pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
65        let last_epoch = last_db_epoch.epoch as u64;
66        let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
67        let next_epoch = epoch.new_epoch_id();
68        let next_epoch_start_cp = epoch.new_epoch_first_checkpoint_id();
69        let next_epoch_start_tx = epoch.new_epoch_first_tx_sequence_number();
70        let last_epoch_start_tx =
71            next_epoch_start_tx - epoch.last_epoch_total_transactions().unwrap();
72
73        Self {
74            last_epoch,
75            next_epoch,
76            last_epoch_start_cp,
77            next_epoch_start_cp,
78            last_epoch_start_tx,
79            next_epoch_start_tx,
80        }
81    }
82}
83
84impl PgPartitionManager {
85    pub fn new(pool: ConnectionPool) -> Result<Self, IndexerError> {
86        let mut partition_strategies = HashMap::new();
87        partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
88        partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
89        partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
90        let manager = Self {
91            pool,
92            partition_strategies,
93        };
94        Ok(manager)
95    }
96
97    pub async fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
98        #[derive(QueryableByName, Debug, Clone)]
99        struct PartitionedTable {
100            #[diesel(sql_type = VarChar)]
101            table_name: String,
102            #[diesel(sql_type = BigInt)]
103            first_partition: i64,
104            #[diesel(sql_type = BigInt)]
105            last_partition: i64,
106        }
107
108        let mut connection = self.pool.get().await?;
109
110        Ok(
111            diesel_async::RunQueryDsl::load(diesel::sql_query(GET_PARTITION_SQL), &mut connection)
112                .await?
113                .into_iter()
114                .map(|table: PartitionedTable| {
115                    (
116                        table.table_name,
117                        (table.first_partition as u64, table.last_partition as u64),
118                    )
119                })
120                .collect(),
121        )
122    }
123
124    /// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to
125    /// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's
126    /// checkpoints today.
127    pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
128        self.partition_strategies
129            .get(table_name)
130            .copied()
131            .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
132    }
133
134    pub fn determine_epoch_partition_range(
135        &self,
136        table_name: &str,
137        data: &EpochPartitionData,
138    ) -> Option<(u64, u64)> {
139        match self.get_strategy(table_name) {
140            PgPartitionStrategy::CheckpointSequenceNumber => {
141                Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
142            }
143            PgPartitionStrategy::TxSequenceNumber => {
144                Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
145            }
146            PgPartitionStrategy::ObjectId => None,
147        }
148    }
149
150    pub async fn advance_epoch(
151        &self,
152        table: String,
153        last_partition: u64,
154        data: &EpochPartitionData,
155    ) -> Result<(), IndexerError> {
156        let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
157            return Ok(());
158        };
159        if data.next_epoch == 0 {
160            tracing::info!("Epoch 0 partition has been created in the initial setup.");
161            return Ok(());
162        }
163        if last_partition == data.last_epoch {
164            transaction_with_retry(&self.pool, Duration::from_secs(10), |conn| {
165                async {
166                    diesel_async::RunQueryDsl::execute(
167                        diesel::sql_query("CALL robust_advance_partition($1, $2, $3, $4)")
168                            .bind::<diesel::sql_types::Text, _>(table.clone())
169                            .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
170                            .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
171                            .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
172                        conn,
173                    )
174                    .await?;
175                    Ok(())
176                }
177                .scope_boxed()
178            })
179            .await?;
180
181            info!(
182                "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
183                table, last_partition, data.next_epoch, partition_range.0
184            );
185        } else if last_partition != data.next_epoch {
186            // skip when the partition is already advanced once, which is possible when indexer
187            // crashes and restarts; error otherwise.
188            error!(
189                "Epoch partition for table {} is not in sync with the last epoch {}.",
190                table, data.last_epoch
191            );
192        } else {
193            info!(
194                "Epoch has been advanced to {} already, skipping.",
195                data.next_epoch
196            );
197        }
198        Ok(())
199    }
200
201    pub async fn drop_table_partition(
202        &self,
203        table: String,
204        partition: u64,
205    ) -> Result<(), IndexerError> {
206        transaction_with_retry(&self.pool, Duration::from_secs(10), |conn| {
207            async {
208                diesel_async::RunQueryDsl::execute(
209                    diesel::sql_query("CALL drop_partition($1, $2)")
210                        .bind::<diesel::sql_types::Text, _>(table.clone())
211                        .bind::<diesel::sql_types::BigInt, _>(partition as i64),
212                    conn,
213                )
214                .await?;
215                Ok(())
216            }
217            .scope_boxed()
218        })
219        .await?;
220        Ok(())
221    }
222}