sui_core/epoch/
consensus_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use consensus_config::Epoch;
5use mysten_metrics::spawn_logged_monitored_task;
6use prometheus::{
7    IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
8    register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::fs;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18    last_pruned_consensus_db_epoch: IntGauge,
19    successfully_pruned_consensus_dbs: IntCounter,
20    error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24    fn new(registry: &Registry) -> Self {
25        Self {
26            last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27                "last_pruned_consensus_db_epoch",
28                "The last epoch for which the consensus store was pruned",
29                registry
30            )
31            .unwrap(),
32            successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33                "successfully_pruned_consensus_dbs",
34                "The number of consensus dbs successfully pruned",
35                registry
36            )
37            .unwrap(),
38            error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39                "error_pruning_consensus_dbs",
40                "The number of errors encountered while pruning consensus dbs",
41                &["mode"],
42                registry
43            )
44            .unwrap(),
45        }
46    }
47}
48
49pub struct ConsensusStorePruner {
50    tx_remove: mpsc::Sender<Epoch>,
51    _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55    pub fn new(
56        base_path: PathBuf,
57        epoch_retention: u64,
58        epoch_prune_period: Duration,
59        registry: &Registry,
60    ) -> Self {
61        let (tx_remove, mut rx_remove) = mpsc::channel(1);
62        let metrics = Metrics::new(registry);
63
64        let _handle = spawn_logged_monitored_task!(async {
65            info!(
66                "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67            );
68
69            let mut timeout = tokio::time::interval_at(
70                Instant::now() + Duration::from_secs(60), // allow some time for the node to boot etc before attempting to prune
71                epoch_prune_period,
72            );
73
74            let mut latest_epoch = 0;
75            loop {
76                tokio::select! {
77                    _ = timeout.tick() => {
78                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79                    }
80                    result = rx_remove.recv() => {
81                        if result.is_none() {
82                            info!("Closing consensus store pruner");
83                            break;
84                        }
85                        latest_epoch = result.unwrap();
86                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87                    }
88                }
89            }
90        });
91
92        Self { tx_remove, _handle }
93    }
94
95    /// This method will remove all epoch data stores and directories that are older than the current epoch minus the epoch retention. The method ensures
96    /// that always the `current_epoch` data is retained.
97    pub async fn prune(&self, current_epoch: Epoch) {
98        let result = self.tx_remove.send(current_epoch).await;
99        if result.is_err() {
100            error!(
101                "Error sending message to data removal task for epoch {:?}",
102                current_epoch,
103            );
104        }
105    }
106
107    async fn prune_old_epoch_data(
108        storage_base_path: &PathBuf,
109        current_epoch: Epoch,
110        epoch_retention: u64,
111        metrics: &Metrics,
112    ) {
113        let drop_boundary = current_epoch.saturating_sub(epoch_retention);
114
115        info!(
116            "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
117            current_epoch, drop_boundary
118        );
119
120        // Get all the epoch stores in the base path directory
121        let files = match fs::read_dir(storage_base_path) {
122            Ok(f) => f,
123            Err(e) => {
124                error!(
125                    "Can not read the files in the storage path directory for epoch cleanup: {:?}",
126                    e
127                );
128                return;
129            }
130        };
131
132        // Look for any that are less than the drop boundary and drop
133        for file_res in files {
134            let f = match file_res {
135                Ok(f) => f,
136                Err(e) => {
137                    error!(
138                        "Error while cleaning up storage of previous epochs: {:?}",
139                        e
140                    );
141                    continue;
142                }
143            };
144
145            let name = f.file_name();
146            let file_epoch_string = match name.to_str() {
147                Some(f) => f,
148                None => continue,
149            };
150
151            let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
152                Ok(f) => f,
153                Err(e) => {
154                    error!(
155                        "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
156                        e
157                    );
158                    continue;
159                }
160            };
161
162            if file_epoch < drop_boundary {
163                const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
164                match safe_drop_db(f.path(), WAIT_TIMEOUT).await {
165                    Ok(()) => {
166                        info!(
167                            "Successfully pruned consensus epoch storage directory: {:?}",
168                            f.path()
169                        );
170                        let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
171                        metrics
172                            .last_pruned_consensus_db_epoch
173                            .set(last_epoch.max(file_epoch as i64));
174                        metrics.successfully_pruned_consensus_dbs.inc();
175                    }
176                    Err(e) => {
177                        #[cfg(not(tidehunter))]
178                        {
179                            warn!(
180                                "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
181                                f.path(),
182                                e
183                            );
184                            metrics
185                                .error_pruning_consensus_dbs
186                                .with_label_values(&["safe"])
187                                .inc();
188
189                            if let Err(err) = fs::remove_dir_all(f.path()) {
190                                error!(
191                                    "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
192                                    f.path(),
193                                    err
194                                );
195                                metrics
196                                    .error_pruning_consensus_dbs
197                                    .with_label_values(&["force"])
198                                    .inc();
199                            } else {
200                                info!(
201                                    "Successfully pruned consensus epoch storage directory with force delete: {:?}",
202                                    f.path()
203                                );
204                                let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
205                                metrics
206                                    .last_pruned_consensus_db_epoch
207                                    .set(last_epoch.max(file_epoch as i64));
208                                metrics.successfully_pruned_consensus_dbs.inc();
209                            }
210                        }
211                        #[cfg(tidehunter)]
212                        {
213                            error!(
214                                "Could not prune old consensus storage \"{:?}\" directory: {:?}",
215                                f.path(),
216                                e
217                            );
218                            metrics
219                                .error_pruning_consensus_dbs
220                                .with_label_values(&["safe"])
221                                .inc();
222                        }
223                    }
224                }
225            }
226        }
227
228        info!(
229            "Completed old epoch data removal process for epoch {:?}",
230            current_epoch
231        );
232    }
233}
234
235#[cfg(test)]
236mod tests {
237    use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
238    use prometheus::Registry;
239    use std::fs;
240    use tokio::time::sleep;
241
242    #[tokio::test]
243    async fn test_remove_old_epoch_data() {
244        telemetry_subscribers::init_for_testing();
245        let metrics = Metrics::new(&Registry::new());
246
247        {
248            // Epoch 0 should not be removed when it's current epoch.
249            let epoch_retention = 0;
250            let current_epoch = 0;
251
252            let base_directory = tempfile::tempdir().unwrap().keep();
253
254            create_epoch_directories(&base_directory, vec!["0", "other"]);
255
256            ConsensusStorePruner::prune_old_epoch_data(
257                &base_directory,
258                current_epoch,
259                epoch_retention,
260                &metrics,
261            )
262            .await;
263
264            let epochs_left = read_epoch_directories(&base_directory);
265
266            assert_eq!(epochs_left.len(), 1);
267            assert_eq!(epochs_left[0], 0);
268        }
269
270        {
271            // Every directory should be retained only for 1 epoch. We expect any epoch directories < 99 to be removed.
272            let epoch_retention = 1;
273            let current_epoch = 100;
274
275            let base_directory = tempfile::tempdir().unwrap().keep();
276
277            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
278
279            ConsensusStorePruner::prune_old_epoch_data(
280                &base_directory,
281                current_epoch,
282                epoch_retention,
283                &metrics,
284            )
285            .await;
286
287            let epochs_left = read_epoch_directories(&base_directory);
288
289            assert_eq!(epochs_left.len(), 2);
290            assert_eq!(epochs_left[0], 99);
291            assert_eq!(epochs_left[1], 100);
292        }
293
294        {
295            // Every directory should be retained only for 0 epochs. That means only the current epoch directory should be retained and everything else
296            // deleted.
297            let epoch_retention = 0;
298            let current_epoch = 100;
299
300            let base_directory = tempfile::tempdir().unwrap().keep();
301
302            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
303
304            ConsensusStorePruner::prune_old_epoch_data(
305                &base_directory,
306                current_epoch,
307                epoch_retention,
308                &metrics,
309            )
310            .await;
311
312            let epochs_left = read_epoch_directories(&base_directory);
313
314            assert_eq!(epochs_left.len(), 1);
315            assert_eq!(epochs_left[0], 100);
316        }
317    }
318
319    #[tokio::test(flavor = "current_thread")]
320    async fn test_consensus_store_pruner() {
321        let epoch_retention = 1;
322        let epoch_prune_period = std::time::Duration::from_millis(500);
323
324        let base_directory = tempfile::tempdir().unwrap().keep();
325
326        // We create some directories up to epoch 100
327        create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
328
329        let pruner = ConsensusStorePruner::new(
330            base_directory.clone(),
331            epoch_retention,
332            epoch_prune_period,
333            &Registry::new(),
334        );
335
336        // We let the pruner run for a couple of times to prune the old directories. Since the default epoch of 0 is used no dirs should be pruned.
337        sleep(3 * epoch_prune_period).await;
338
339        // We expect the directories to be the same as before
340        let epoch_dirs = read_epoch_directories(&base_directory);
341        assert_eq!(epoch_dirs.len(), 4);
342
343        // Then we update the epoch and instruct to prune for current epoch = 100
344        pruner.prune(100).await;
345
346        // We let the pruner run and check again the directories - no directories of epoch < 99 should be left
347        sleep(2 * epoch_prune_period).await;
348
349        let epoch_dirs = read_epoch_directories(&base_directory);
350        assert_eq!(epoch_dirs.len(), 2);
351        assert_eq!(epoch_dirs[0], 99);
352        assert_eq!(epoch_dirs[1], 100);
353    }
354
355    fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
356        for epoch in epochs {
357            let mut path = base_directory.to_path_buf();
358            path.push(epoch);
359            fs::create_dir(path).unwrap();
360        }
361    }
362
363    fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
364        let files = fs::read_dir(base_directory).unwrap();
365
366        let mut epochs = Vec::new();
367        for file_res in files {
368            let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
369            if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
370                epochs.push(file_epoch);
371            }
372        }
373
374        epochs.sort();
375        epochs
376    }
377}