sui_core/epoch/
consensus_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use consensus_config::Epoch;
5use mysten_metrics::spawn_logged_monitored_task;
6use prometheus::{
7    IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
8    register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::fs;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18    last_pruned_consensus_db_epoch: IntGauge,
19    successfully_pruned_consensus_dbs: IntCounter,
20    error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24    fn new(registry: &Registry) -> Self {
25        Self {
26            last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27                "last_pruned_consensus_db_epoch",
28                "The last epoch for which the consensus store was pruned",
29                registry
30            )
31            .unwrap(),
32            successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33                "successfully_pruned_consensus_dbs",
34                "The number of consensus dbs successfully pruned",
35                registry
36            )
37            .unwrap(),
38            error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39                "error_pruning_consensus_dbs",
40                "The number of errors encountered while pruning consensus dbs",
41                &["mode"],
42                registry
43            )
44            .unwrap(),
45        }
46    }
47}
48
49pub struct ConsensusStorePruner {
50    tx_remove: mpsc::Sender<Epoch>,
51    _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55    pub fn new(
56        base_path: PathBuf,
57        epoch_retention: u64,
58        epoch_prune_period: Duration,
59        registry: &Registry,
60    ) -> Self {
61        let (tx_remove, mut rx_remove) = mpsc::channel(1);
62        let metrics = Metrics::new(registry);
63
64        let _handle = spawn_logged_monitored_task!(async {
65            info!(
66                "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67            );
68
69            let mut timeout = tokio::time::interval_at(
70                Instant::now() + Duration::from_secs(60), // allow some time for the node to boot etc before attempting to prune
71                epoch_prune_period,
72            );
73
74            let mut latest_epoch = 0;
75            loop {
76                tokio::select! {
77                    _ = timeout.tick() => {
78                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79                    }
80                    result = rx_remove.recv() => {
81                        if result.is_none() {
82                            info!("Closing consensus store pruner");
83                            break;
84                        }
85                        latest_epoch = result.unwrap();
86                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87                    }
88                }
89            }
90        });
91
92        Self { tx_remove, _handle }
93    }
94
95    /// This method will remove all epoch data stores and directories that are older than the current epoch minus the epoch retention. The method ensures
96    /// that always the `current_epoch` data is retained.
97    pub async fn prune(&self, current_epoch: Epoch) {
98        let result = self.tx_remove.send(current_epoch).await;
99        if result.is_err() {
100            error!(
101                "Error sending message to data removal task for epoch {:?}",
102                current_epoch,
103            );
104        }
105    }
106
107    async fn prune_old_epoch_data(
108        storage_base_path: &PathBuf,
109        current_epoch: Epoch,
110        epoch_retention: u64,
111        metrics: &Metrics,
112    ) {
113        let drop_boundary = current_epoch.saturating_sub(epoch_retention);
114
115        info!(
116            "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
117            current_epoch, drop_boundary
118        );
119
120        // Get all the epoch stores in the base path directory
121        let files = match fs::read_dir(storage_base_path) {
122            Ok(f) => f,
123            Err(e) => {
124                error!(
125                    "Can not read the files in the storage path directory for epoch cleanup: {:?}",
126                    e
127                );
128                return;
129            }
130        };
131
132        // Look for any that are less than the drop boundary and drop
133        for file_res in files {
134            let f = match file_res {
135                Ok(f) => f,
136                Err(e) => {
137                    error!(
138                        "Error while cleaning up storage of previous epochs: {:?}",
139                        e
140                    );
141                    continue;
142                }
143            };
144
145            let name = f.file_name();
146            let file_epoch_string = match name.to_str() {
147                Some(f) => f,
148                None => continue,
149            };
150
151            let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
152                Ok(f) => f,
153                Err(e) => {
154                    error!(
155                        "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
156                        e
157                    );
158                    continue;
159                }
160            };
161
162            if file_epoch < drop_boundary {
163                const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
164                if let Err(e) = safe_drop_db(f.path(), WAIT_BEFORE_FORCE_DELETE).await {
165                    warn!(
166                        "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
167                        f.path(),
168                        e
169                    );
170                    metrics
171                        .error_pruning_consensus_dbs
172                        .with_label_values(&["safe"])
173                        .inc();
174
175                    if let Err(err) = fs::remove_dir_all(f.path()) {
176                        error!(
177                            "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
178                            f.path(),
179                            err
180                        );
181                        metrics
182                            .error_pruning_consensus_dbs
183                            .with_label_values(&["force"])
184                            .inc();
185                    } else {
186                        info!(
187                            "Successfully pruned consensus epoch storage directory with force delete: {:?}",
188                            f.path()
189                        );
190                        let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
191                        metrics
192                            .last_pruned_consensus_db_epoch
193                            .set(last_epoch.max(file_epoch as i64));
194                        metrics.successfully_pruned_consensus_dbs.inc();
195                    }
196                } else {
197                    info!(
198                        "Successfully pruned consensus epoch storage directory: {:?}",
199                        f.path()
200                    );
201                    let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
202                    metrics
203                        .last_pruned_consensus_db_epoch
204                        .set(last_epoch.max(file_epoch as i64));
205                    metrics.successfully_pruned_consensus_dbs.inc();
206                }
207            }
208        }
209
210        info!(
211            "Completed old epoch data removal process for epoch {:?}",
212            current_epoch
213        );
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
220    use prometheus::Registry;
221    use std::fs;
222    use tokio::time::sleep;
223
224    #[tokio::test]
225    async fn test_remove_old_epoch_data() {
226        telemetry_subscribers::init_for_testing();
227        let metrics = Metrics::new(&Registry::new());
228
229        {
230            // Epoch 0 should not be removed when it's current epoch.
231            let epoch_retention = 0;
232            let current_epoch = 0;
233
234            let base_directory = tempfile::tempdir().unwrap().keep();
235
236            create_epoch_directories(&base_directory, vec!["0", "other"]);
237
238            ConsensusStorePruner::prune_old_epoch_data(
239                &base_directory,
240                current_epoch,
241                epoch_retention,
242                &metrics,
243            )
244            .await;
245
246            let epochs_left = read_epoch_directories(&base_directory);
247
248            assert_eq!(epochs_left.len(), 1);
249            assert_eq!(epochs_left[0], 0);
250        }
251
252        {
253            // Every directory should be retained only for 1 epoch. We expect any epoch directories < 99 to be removed.
254            let epoch_retention = 1;
255            let current_epoch = 100;
256
257            let base_directory = tempfile::tempdir().unwrap().keep();
258
259            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
260
261            ConsensusStorePruner::prune_old_epoch_data(
262                &base_directory,
263                current_epoch,
264                epoch_retention,
265                &metrics,
266            )
267            .await;
268
269            let epochs_left = read_epoch_directories(&base_directory);
270
271            assert_eq!(epochs_left.len(), 2);
272            assert_eq!(epochs_left[0], 99);
273            assert_eq!(epochs_left[1], 100);
274        }
275
276        {
277            // Every directory should be retained only for 0 epochs. That means only the current epoch directory should be retained and everything else
278            // deleted.
279            let epoch_retention = 0;
280            let current_epoch = 100;
281
282            let base_directory = tempfile::tempdir().unwrap().keep();
283
284            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
285
286            ConsensusStorePruner::prune_old_epoch_data(
287                &base_directory,
288                current_epoch,
289                epoch_retention,
290                &metrics,
291            )
292            .await;
293
294            let epochs_left = read_epoch_directories(&base_directory);
295
296            assert_eq!(epochs_left.len(), 1);
297            assert_eq!(epochs_left[0], 100);
298        }
299    }
300
301    #[tokio::test(flavor = "current_thread")]
302    async fn test_consensus_store_pruner() {
303        let epoch_retention = 1;
304        let epoch_prune_period = std::time::Duration::from_millis(500);
305
306        let base_directory = tempfile::tempdir().unwrap().keep();
307
308        // We create some directories up to epoch 100
309        create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
310
311        let pruner = ConsensusStorePruner::new(
312            base_directory.clone(),
313            epoch_retention,
314            epoch_prune_period,
315            &Registry::new(),
316        );
317
318        // We let the pruner run for a couple of times to prune the old directories. Since the default epoch of 0 is used no dirs should be pruned.
319        sleep(3 * epoch_prune_period).await;
320
321        // We expect the directories to be the same as before
322        let epoch_dirs = read_epoch_directories(&base_directory);
323        assert_eq!(epoch_dirs.len(), 4);
324
325        // Then we update the epoch and instruct to prune for current epoch = 100
326        pruner.prune(100).await;
327
328        // We let the pruner run and check again the directories - no directories of epoch < 99 should be left
329        sleep(2 * epoch_prune_period).await;
330
331        let epoch_dirs = read_epoch_directories(&base_directory);
332        assert_eq!(epoch_dirs.len(), 2);
333        assert_eq!(epoch_dirs[0], 99);
334        assert_eq!(epoch_dirs[1], 100);
335    }
336
337    fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
338        for epoch in epochs {
339            let mut path = base_directory.to_path_buf();
340            path.push(epoch);
341            fs::create_dir(path).unwrap();
342        }
343    }
344
345    fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
346        let files = fs::read_dir(base_directory).unwrap();
347
348        let mut epochs = Vec::new();
349        for file_res in files {
350            let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
351            if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
352                epochs.push(file_epoch);
353            }
354        }
355
356        epochs.sort();
357        epochs
358    }
359}