1use diesel::QueryableByName;
5use diesel::sql_types::{BigInt, VarChar};
6use diesel_async::scoped_futures::ScopedFutureExt;
7use std::collections::{BTreeMap, HashMap};
8use std::time::Duration;
9use tracing::{error, info};
10
11use crate::database::ConnectionPool;
12use crate::errors::IndexerError;
13use crate::handlers::EpochToCommit;
14use crate::models::epoch::StoredEpochInfo;
15use crate::store::transaction_with_retry;
16
17const GET_PARTITION_SQL: &str = r"
18SELECT parent.relname AS table_name,
19 MIN(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS first_partition,
20 MAX(CAST(SUBSTRING(child.relname FROM '\d+$') AS BIGINT)) AS last_partition
21FROM pg_inherits
22 JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
23 JOIN pg_class child ON pg_inherits.inhrelid = child.oid
24 JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
25 JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
26WHERE parent.relkind = 'p'
27GROUP BY table_name;
28";
29
30#[derive(Clone)]
31pub struct PgPartitionManager {
32 pool: ConnectionPool,
33
34 partition_strategies: HashMap<&'static str, PgPartitionStrategy>,
35}
36
37#[derive(Clone, Copy)]
38pub enum PgPartitionStrategy {
39 CheckpointSequenceNumber,
40 TxSequenceNumber,
41 ObjectId,
42}
43
44impl PgPartitionStrategy {
45 pub fn is_epoch_partitioned(&self) -> bool {
46 matches!(
47 self,
48 Self::CheckpointSequenceNumber | Self::TxSequenceNumber
49 )
50 }
51}
52
53#[derive(Clone, Debug)]
54pub struct EpochPartitionData {
55 last_epoch: u64,
56 next_epoch: u64,
57 last_epoch_start_cp: u64,
58 next_epoch_start_cp: u64,
59 last_epoch_start_tx: u64,
60 next_epoch_start_tx: u64,
61}
62
63impl EpochPartitionData {
64 pub fn compose_data(epoch: EpochToCommit, last_db_epoch: StoredEpochInfo) -> Self {
65 let last_epoch = last_db_epoch.epoch as u64;
66 let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64;
67 let next_epoch = epoch.new_epoch_id();
68 let next_epoch_start_cp = epoch.new_epoch_first_checkpoint_id();
69 let next_epoch_start_tx = epoch.new_epoch_first_tx_sequence_number();
70 let last_epoch_start_tx =
71 next_epoch_start_tx - epoch.last_epoch_total_transactions().unwrap();
72
73 Self {
74 last_epoch,
75 next_epoch,
76 last_epoch_start_cp,
77 next_epoch_start_cp,
78 last_epoch_start_tx,
79 next_epoch_start_tx,
80 }
81 }
82}
83
84impl PgPartitionManager {
85 pub fn new(pool: ConnectionPool) -> Result<Self, IndexerError> {
86 let mut partition_strategies = HashMap::new();
87 partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber);
88 partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber);
89 partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId);
90 let manager = Self {
91 pool,
92 partition_strategies,
93 };
94 Ok(manager)
95 }
96
97 pub async fn get_table_partitions(&self) -> Result<BTreeMap<String, (u64, u64)>, IndexerError> {
98 #[derive(QueryableByName, Debug, Clone)]
99 struct PartitionedTable {
100 #[diesel(sql_type = VarChar)]
101 table_name: String,
102 #[diesel(sql_type = BigInt)]
103 first_partition: i64,
104 #[diesel(sql_type = BigInt)]
105 last_partition: i64,
106 }
107
108 let mut connection = self.pool.get().await?;
109
110 Ok(
111 diesel_async::RunQueryDsl::load(diesel::sql_query(GET_PARTITION_SQL), &mut connection)
112 .await?
113 .into_iter()
114 .map(|table: PartitionedTable| {
115 (
116 table.table_name,
117 (table.first_partition as u64, table.last_partition as u64),
118 )
119 })
120 .collect(),
121 )
122 }
123
124 pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy {
128 self.partition_strategies
129 .get(table_name)
130 .copied()
131 .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber)
132 }
133
134 pub fn determine_epoch_partition_range(
135 &self,
136 table_name: &str,
137 data: &EpochPartitionData,
138 ) -> Option<(u64, u64)> {
139 match self.get_strategy(table_name) {
140 PgPartitionStrategy::CheckpointSequenceNumber => {
141 Some((data.last_epoch_start_cp, data.next_epoch_start_cp))
142 }
143 PgPartitionStrategy::TxSequenceNumber => {
144 Some((data.last_epoch_start_tx, data.next_epoch_start_tx))
145 }
146 PgPartitionStrategy::ObjectId => None,
147 }
148 }
149
150 pub async fn advance_epoch(
151 &self,
152 table: String,
153 last_partition: u64,
154 data: &EpochPartitionData,
155 ) -> Result<(), IndexerError> {
156 let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else {
157 return Ok(());
158 };
159 if data.next_epoch == 0 {
160 tracing::info!("Epoch 0 partition has been created in the initial setup.");
161 return Ok(());
162 }
163 if last_partition == data.last_epoch {
164 transaction_with_retry(&self.pool, Duration::from_secs(10), |conn| {
165 async {
166 diesel_async::RunQueryDsl::execute(
167 diesel::sql_query("CALL robust_advance_partition($1, $2, $3, $4)")
168 .bind::<diesel::sql_types::Text, _>(table.clone())
169 .bind::<diesel::sql_types::BigInt, _>(data.last_epoch as i64)
170 .bind::<diesel::sql_types::BigInt, _>(data.next_epoch as i64)
171 .bind::<diesel::sql_types::BigInt, _>(partition_range.1 as i64),
172 conn,
173 )
174 .await?;
175 Ok(())
176 }
177 .scope_boxed()
178 })
179 .await?;
180
181 info!(
182 "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}",
183 table, last_partition, data.next_epoch, partition_range.0
184 );
185 } else if last_partition != data.next_epoch {
186 error!(
189 "Epoch partition for table {} is not in sync with the last epoch {}.",
190 table, data.last_epoch
191 );
192 } else {
193 info!(
194 "Epoch has been advanced to {} already, skipping.",
195 data.next_epoch
196 );
197 }
198 Ok(())
199 }
200
201 pub async fn drop_table_partition(
202 &self,
203 table: String,
204 partition: u64,
205 ) -> Result<(), IndexerError> {
206 transaction_with_retry(&self.pool, Duration::from_secs(10), |conn| {
207 async {
208 diesel_async::RunQueryDsl::execute(
209 diesel::sql_query("CALL drop_partition($1, $2)")
210 .bind::<diesel::sql_types::Text, _>(table.clone())
211 .bind::<diesel::sql_types::BigInt, _>(partition as i64),
212 conn,
213 )
214 .await?;
215 Ok(())
216 }
217 .scope_boxed()
218 })
219 .await?;
220 Ok(())
221 }
222}