sui_indexer/handlers/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use mysten_metrics::spawn_monitored_task;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::time::Duration;
8use strum_macros;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info};
11
12use crate::config::RetentionConfig;
13use crate::errors::IndexerError;
14use crate::store::PgIndexerStore;
15use crate::store::pg_partition_manager::PgPartitionManager;
16use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
17
18pub struct Pruner {
19    pub store: PgIndexerStore,
20    pub partition_manager: PgPartitionManager,
21    // TODO: (wlmyng) - we can remove this when pruner logic is updated to use `retention_policies`.
22    pub epochs_to_keep: u64,
23    pub retention_policies: HashMap<PrunableTable, u64>,
24    pub metrics: IndexerMetrics,
25}
26
27/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in
28/// the database, and should be used in lieu of string literals. This enum is also meant to
29/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the
30/// table's range. Pruner will ignore any table that is not listed here.
31#[derive(
32    Debug,
33    Eq,
34    PartialEq,
35    strum_macros::Display,
36    strum_macros::EnumString,
37    strum_macros::EnumIter,
38    strum_macros::AsRefStr,
39    Hash,
40    Serialize,
41    Deserialize,
42    Clone,
43)]
44#[strum(serialize_all = "snake_case")]
45#[serde(rename_all = "snake_case")]
46pub enum PrunableTable {
47    ObjectsHistory,
48    Transactions,
49    Events,
50
51    EventEmitPackage,
52    EventEmitModule,
53    EventSenders,
54    EventStructInstantiation,
55    EventStructModule,
56    EventStructName,
57    EventStructPackage,
58
59    TxAffectedAddresses,
60    TxAffectedObjects,
61    TxCallsPkg,
62    TxCallsMod,
63    TxCallsFun,
64    TxChangedObjects,
65    TxDigests,
66    TxInputObjects,
67    TxKinds,
68
69    Checkpoints,
70    PrunerCpWatermark,
71}
72
73impl PrunableTable {
74    pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 {
75        match self {
76            PrunableTable::ObjectsHistory => cp,
77            PrunableTable::Transactions => tx,
78            PrunableTable::Events => tx,
79
80            PrunableTable::EventEmitPackage => tx,
81            PrunableTable::EventEmitModule => tx,
82            PrunableTable::EventSenders => tx,
83            PrunableTable::EventStructInstantiation => tx,
84            PrunableTable::EventStructModule => tx,
85            PrunableTable::EventStructName => tx,
86            PrunableTable::EventStructPackage => tx,
87
88            PrunableTable::TxAffectedAddresses => tx,
89            PrunableTable::TxAffectedObjects => tx,
90            PrunableTable::TxCallsPkg => tx,
91            PrunableTable::TxCallsMod => tx,
92            PrunableTable::TxCallsFun => tx,
93            PrunableTable::TxChangedObjects => tx,
94            PrunableTable::TxDigests => tx,
95            PrunableTable::TxInputObjects => tx,
96            PrunableTable::TxKinds => tx,
97
98            PrunableTable::Checkpoints => cp,
99            PrunableTable::PrunerCpWatermark => cp,
100        }
101    }
102}
103
104impl Pruner {
105    /// Instantiates a pruner with default retention and overrides. Pruner will finalize the
106    /// retention policies so there is a value for every prunable table.
107    pub fn new(
108        store: PgIndexerStore,
109        retention_config: RetentionConfig,
110        metrics: IndexerMetrics,
111    ) -> Result<Self, IndexerError> {
112        let partition_manager = PgPartitionManager::new(store.pool())?;
113        let epochs_to_keep = retention_config.epochs_to_keep;
114        let retention_policies = retention_config.retention_policies();
115
116        Ok(Self {
117            store,
118            epochs_to_keep,
119            partition_manager,
120            retention_policies,
121            metrics,
122        })
123    }
124
125    /// Given a table name, return the number of epochs to keep for that table. Return `None` if the
126    /// table is not prunable.
127    fn table_retention(&self, table_name: &str) -> Option<u64> {
128        if let Ok(variant) = table_name.parse::<PrunableTable>() {
129            self.retention_policies.get(&variant).copied()
130        } else {
131            None
132        }
133    }
134
135    pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
136        let store_clone = self.store.clone();
137        let retention_policies = self.retention_policies.clone();
138        let cancel_clone = cancel.clone();
139        spawn_monitored_task!(update_watermarks_lower_bounds_task(
140            store_clone,
141            retention_policies,
142            cancel_clone
143        ));
144
145        let mut last_seen_max_epoch = 0;
146        // The first epoch that has not yet been pruned.
147        let mut next_prune_epoch = None;
148        while !cancel.is_cancelled() {
149            let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
150            if max_epoch == last_seen_max_epoch {
151                tokio::time::sleep(Duration::from_secs(5)).await;
152                continue;
153            }
154            last_seen_max_epoch = max_epoch;
155
156            // Not all partitioned tables are epoch-partitioned, so we need to filter them out.
157            let table_partitions: HashMap<_, _> = self
158                .partition_manager
159                .get_table_partitions()
160                .await?
161                .into_iter()
162                .filter(|(table_name, _)| {
163                    self.partition_manager
164                        .get_strategy(table_name)
165                        .is_epoch_partitioned()
166                })
167                .collect();
168
169            for (table_name, (min_partition, max_partition)) in &table_partitions {
170                if let Some(epochs_to_keep) = self.table_retention(table_name) {
171                    if last_seen_max_epoch != *max_partition {
172                        error!(
173                            "Epochs are out of sync for table {}: max_epoch={}, max_partition={}",
174                            table_name, last_seen_max_epoch, max_partition
175                        );
176                    }
177
178                    for epoch in
179                        *min_partition..last_seen_max_epoch.saturating_sub(epochs_to_keep - 1)
180                    {
181                        if cancel.is_cancelled() {
182                            info!("Pruner task cancelled.");
183                            return Ok(());
184                        }
185                        self.partition_manager
186                            .drop_table_partition(table_name.clone(), epoch)
187                            .await?;
188                        info!(
189                            "Batch dropped table partition {} epoch {}",
190                            table_name, epoch
191                        );
192                    }
193                }
194            }
195
196            // TODO: (wlmyng) Once we have the watermarks table, we can iterate through each row
197            // returned from `watermarks`, look it up against `retention_policies`, and process them
198            // independently. This also means that pruning overrides will only apply for
199            // epoch-partitioned tables right now.
200            let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1);
201            let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
202            for epoch in prune_start_epoch..prune_to_epoch {
203                if cancel.is_cancelled() {
204                    info!("Pruner task cancelled.");
205                    return Ok(());
206                }
207                info!("Pruning epoch {}", epoch);
208                if let Err(err) = self.store.prune_epoch(epoch).await {
209                    error!("Failed to prune epoch {}: {}", epoch, err);
210                    break;
211                };
212                self.metrics.last_pruned_epoch.set(epoch as i64);
213                info!("Pruned epoch {}", epoch);
214                next_prune_epoch = Some(epoch + 1);
215            }
216        }
217        info!("Pruner task cancelled.");
218        Ok(())
219    }
220}
221
222/// Task to periodically query the `watermarks` table and update the lower bounds for all watermarks
223/// if the entry exceeds epoch-level retention policy.
224async fn update_watermarks_lower_bounds_task(
225    store: PgIndexerStore,
226    retention_policies: HashMap<PrunableTable, u64>,
227    cancel: CancellationToken,
228) -> IndexerResult<()> {
229    let mut interval = tokio::time::interval(Duration::from_secs(5));
230    loop {
231        tokio::select! {
232            _ = cancel.cancelled() => {
233                info!("Pruner watermark lower bound update task cancelled.");
234                return Ok(());
235            }
236            _ = interval.tick() => {
237                update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?;
238            }
239        }
240    }
241}
242
243/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if
244/// its epoch range exceeds the respective retention policy.
245async fn update_watermarks_lower_bounds(
246    store: &PgIndexerStore,
247    retention_policies: &HashMap<PrunableTable, u64>,
248    cancel: &CancellationToken,
249) -> IndexerResult<()> {
250    let (watermarks, _) = store.get_watermarks().await?;
251    let mut lower_bound_updates = vec![];
252
253    for watermark in watermarks.iter() {
254        if cancel.is_cancelled() {
255            info!("Pruner watermark lower bound update task cancelled.");
256            return Ok(());
257        }
258
259        let Some(prunable_table) = watermark.entity() else {
260            continue;
261        };
262
263        let Some(epochs_to_keep) = retention_policies.get(&prunable_table) else {
264            error!(
265                "No retention policy found for prunable table {}",
266                prunable_table
267            );
268            continue;
269        };
270
271        if let Some(new_epoch_lo) = watermark.new_epoch_lo(*epochs_to_keep) {
272            lower_bound_updates.push((prunable_table, new_epoch_lo));
273        };
274    }
275
276    if !lower_bound_updates.is_empty() {
277        store
278            .update_watermarks_lower_bound(lower_bound_updates)
279            .await?;
280        info!("Finished updating lower bounds for watermarks");
281    }
282
283    Ok(())
284}