1use mysten_metrics::spawn_monitored_task;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8use strum_macros;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info};
11
12use crate::config::RetentionConfig;
13use crate::errors::IndexerError;
14use crate::store::PgIndexerStore;
15use crate::store::pg_partition_manager::PgPartitionManager;
16use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
17
18pub struct Pruner {
19 pub store: PgIndexerStore,
20 pub partition_manager: PgPartitionManager,
21 pub epochs_to_keep: u64,
23 pub retention_policies: HashMap<PrunableTable, u64>,
24 pub metrics: IndexerMetrics,
25}
26
27#[derive(
32 Debug,
33 Eq,
34 PartialEq,
35 strum_macros::Display,
36 strum_macros::EnumString,
37 strum_macros::EnumIter,
38 strum_macros::AsRefStr,
39 Hash,
40 Serialize,
41 Deserialize,
42 Clone,
43)]
44#[strum(serialize_all = "snake_case")]
45#[serde(rename_all = "snake_case")]
46pub enum PrunableTable {
47 ObjectsHistory,
48 Transactions,
49 Events,
50
51 EventEmitPackage,
52 EventEmitModule,
53 EventSenders,
54 EventStructInstantiation,
55 EventStructModule,
56 EventStructName,
57 EventStructPackage,
58
59 TxAffectedAddresses,
60 TxAffectedObjects,
61 TxCallsPkg,
62 TxCallsMod,
63 TxCallsFun,
64 TxChangedObjects,
65 TxDigests,
66 TxInputObjects,
67 TxKinds,
68
69 Checkpoints,
70 PrunerCpWatermark,
71}
72
73impl PrunableTable {
74 pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 {
75 match self {
76 PrunableTable::ObjectsHistory => cp,
77 PrunableTable::Transactions => tx,
78 PrunableTable::Events => tx,
79
80 PrunableTable::EventEmitPackage => tx,
81 PrunableTable::EventEmitModule => tx,
82 PrunableTable::EventSenders => tx,
83 PrunableTable::EventStructInstantiation => tx,
84 PrunableTable::EventStructModule => tx,
85 PrunableTable::EventStructName => tx,
86 PrunableTable::EventStructPackage => tx,
87
88 PrunableTable::TxAffectedAddresses => tx,
89 PrunableTable::TxAffectedObjects => tx,
90 PrunableTable::TxCallsPkg => tx,
91 PrunableTable::TxCallsMod => tx,
92 PrunableTable::TxCallsFun => tx,
93 PrunableTable::TxChangedObjects => tx,
94 PrunableTable::TxDigests => tx,
95 PrunableTable::TxInputObjects => tx,
96 PrunableTable::TxKinds => tx,
97
98 PrunableTable::Checkpoints => cp,
99 PrunableTable::PrunerCpWatermark => cp,
100 }
101 }
102}
103
104impl Pruner {
105 pub fn new(
108 store: PgIndexerStore,
109 retention_config: RetentionConfig,
110 metrics: IndexerMetrics,
111 ) -> Result<Self, IndexerError> {
112 let partition_manager = PgPartitionManager::new(store.pool())?;
113 let epochs_to_keep = retention_config.epochs_to_keep;
114 let retention_policies = retention_config.retention_policies();
115
116 Ok(Self {
117 store,
118 epochs_to_keep,
119 partition_manager,
120 retention_policies,
121 metrics,
122 })
123 }
124
125 fn table_retention(&self, table_name: &str) -> Option<u64> {
128 if let Ok(variant) = table_name.parse::<PrunableTable>() {
129 self.retention_policies.get(&variant).copied()
130 } else {
131 None
132 }
133 }
134
135 pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
136 let store_clone = self.store.clone();
137 let retention_policies = self.retention_policies.clone();
138 let cancel_clone = cancel.clone();
139 spawn_monitored_task!(update_watermarks_lower_bounds_task(
140 store_clone,
141 retention_policies,
142 cancel_clone
143 ));
144
145 let mut last_seen_max_epoch = 0;
146 let mut next_prune_epoch = None;
148 while !cancel.is_cancelled() {
149 let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
150 if max_epoch == last_seen_max_epoch {
151 tokio::time::sleep(Duration::from_secs(5)).await;
152 continue;
153 }
154 last_seen_max_epoch = max_epoch;
155
156 let table_partitions: HashMap<_, _> = self
158 .partition_manager
159 .get_table_partitions()
160 .await?
161 .into_iter()
162 .filter(|(table_name, _)| {
163 self.partition_manager
164 .get_strategy(table_name)
165 .is_epoch_partitioned()
166 })
167 .collect();
168
169 for (table_name, (min_partition, max_partition)) in &table_partitions {
170 if let Some(epochs_to_keep) = self.table_retention(table_name) {
171 if last_seen_max_epoch != *max_partition {
172 error!(
173 "Epochs are out of sync for table {}: max_epoch={}, max_partition={}",
174 table_name, last_seen_max_epoch, max_partition
175 );
176 }
177
178 for epoch in
179 *min_partition..last_seen_max_epoch.saturating_sub(epochs_to_keep - 1)
180 {
181 if cancel.is_cancelled() {
182 info!("Pruner task cancelled.");
183 return Ok(());
184 }
185 self.partition_manager
186 .drop_table_partition(table_name.clone(), epoch)
187 .await?;
188 info!(
189 "Batch dropped table partition {} epoch {}",
190 table_name, epoch
191 );
192 }
193 }
194 }
195
196 let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1);
201 let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
202 for epoch in prune_start_epoch..prune_to_epoch {
203 if cancel.is_cancelled() {
204 info!("Pruner task cancelled.");
205 return Ok(());
206 }
207 info!("Pruning epoch {}", epoch);
208 if let Err(err) = self.store.prune_epoch(epoch).await {
209 error!("Failed to prune epoch {}: {}", epoch, err);
210 break;
211 };
212 self.metrics.last_pruned_epoch.set(epoch as i64);
213 info!("Pruned epoch {}", epoch);
214 next_prune_epoch = Some(epoch + 1);
215 }
216 }
217 info!("Pruner task cancelled.");
218 Ok(())
219 }
220}
221
222async fn update_watermarks_lower_bounds_task(
225 store: PgIndexerStore,
226 retention_policies: HashMap<PrunableTable, u64>,
227 cancel: CancellationToken,
228) -> IndexerResult<()> {
229 let mut interval = tokio::time::interval(Duration::from_secs(5));
230 loop {
231 tokio::select! {
232 _ = cancel.cancelled() => {
233 info!("Pruner watermark lower bound update task cancelled.");
234 return Ok(());
235 }
236 _ = interval.tick() => {
237 update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?;
238 }
239 }
240 }
241}
242
243async fn update_watermarks_lower_bounds(
246 store: &PgIndexerStore,
247 retention_policies: &HashMap<PrunableTable, u64>,
248 cancel: &CancellationToken,
249) -> IndexerResult<()> {
250 let (watermarks, _) = store.get_watermarks().await?;
251 let mut lower_bound_updates = vec![];
252
253 for watermark in watermarks.iter() {
254 if cancel.is_cancelled() {
255 info!("Pruner watermark lower bound update task cancelled.");
256 return Ok(());
257 }
258
259 let Some(prunable_table) = watermark.entity() else {
260 continue;
261 };
262
263 let Some(epochs_to_keep) = retention_policies.get(&prunable_table) else {
264 error!(
265 "No retention policy found for prunable table {}",
266 prunable_table
267 );
268 continue;
269 };
270
271 if let Some(new_epoch_lo) = watermark.new_epoch_lo(*epochs_to_keep) {
272 lower_bound_updates.push((prunable_table, new_epoch_lo));
273 };
274 }
275
276 if !lower_bound_updates.is_empty() {
277 store
278 .update_watermarks_lower_bound(lower_bound_updates)
279 .await?;
280 info!("Finished updating lower bounds for watermarks");
281 }
282
283 Ok(())
284}