1use consensus_config::Epoch;
5use mysten_metrics::spawn_logged_monitored_task;
6use prometheus::{
7 IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
8 register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::fs;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18 last_pruned_consensus_db_epoch: IntGauge,
19 successfully_pruned_consensus_dbs: IntCounter,
20 error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24 fn new(registry: &Registry) -> Self {
25 Self {
26 last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27 "last_pruned_consensus_db_epoch",
28 "The last epoch for which the consensus store was pruned",
29 registry
30 )
31 .unwrap(),
32 successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33 "successfully_pruned_consensus_dbs",
34 "The number of consensus dbs successfully pruned",
35 registry
36 )
37 .unwrap(),
38 error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39 "error_pruning_consensus_dbs",
40 "The number of errors encountered while pruning consensus dbs",
41 &["mode"],
42 registry
43 )
44 .unwrap(),
45 }
46 }
47}
48
49pub struct ConsensusStorePruner {
50 tx_remove: mpsc::Sender<Epoch>,
51 _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55 pub fn new(
56 base_path: PathBuf,
57 epoch_retention: u64,
58 epoch_prune_period: Duration,
59 registry: &Registry,
60 ) -> Self {
61 let (tx_remove, mut rx_remove) = mpsc::channel(1);
62 let metrics = Metrics::new(registry);
63
64 let _handle = spawn_logged_monitored_task!(async {
65 info!(
66 "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67 );
68
69 let mut timeout = tokio::time::interval_at(
70 Instant::now() + Duration::from_secs(60), epoch_prune_period,
72 );
73
74 let mut latest_epoch = 0;
75 loop {
76 tokio::select! {
77 _ = timeout.tick() => {
78 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79 }
80 result = rx_remove.recv() => {
81 if result.is_none() {
82 info!("Closing consensus store pruner");
83 break;
84 }
85 latest_epoch = result.unwrap();
86 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87 }
88 }
89 }
90 });
91
92 Self { tx_remove, _handle }
93 }
94
95 pub async fn prune(&self, current_epoch: Epoch) {
98 let result = self.tx_remove.send(current_epoch).await;
99 if result.is_err() {
100 error!(
101 "Error sending message to data removal task for epoch {:?}",
102 current_epoch,
103 );
104 }
105 }
106
107 async fn prune_old_epoch_data(
108 storage_base_path: &PathBuf,
109 current_epoch: Epoch,
110 epoch_retention: u64,
111 metrics: &Metrics,
112 ) {
113 let drop_boundary = current_epoch.saturating_sub(epoch_retention);
114
115 info!(
116 "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
117 current_epoch, drop_boundary
118 );
119
120 let files = match fs::read_dir(storage_base_path) {
122 Ok(f) => f,
123 Err(e) => {
124 error!(
125 "Can not read the files in the storage path directory for epoch cleanup: {:?}",
126 e
127 );
128 return;
129 }
130 };
131
132 for file_res in files {
134 let f = match file_res {
135 Ok(f) => f,
136 Err(e) => {
137 error!(
138 "Error while cleaning up storage of previous epochs: {:?}",
139 e
140 );
141 continue;
142 }
143 };
144
145 let name = f.file_name();
146 let file_epoch_string = match name.to_str() {
147 Some(f) => f,
148 None => continue,
149 };
150
151 let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
152 Ok(f) => f,
153 Err(e) => {
154 error!(
155 "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
156 e
157 );
158 continue;
159 }
160 };
161
162 if file_epoch < drop_boundary {
163 const WAIT_TIMEOUT: Duration = Duration::from_secs(5);
164 match safe_drop_db(f.path(), WAIT_TIMEOUT).await {
165 Ok(()) => {
166 info!(
167 "Successfully pruned consensus epoch storage directory: {:?}",
168 f.path()
169 );
170 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
171 metrics
172 .last_pruned_consensus_db_epoch
173 .set(last_epoch.max(file_epoch as i64));
174 metrics.successfully_pruned_consensus_dbs.inc();
175 }
176 Err(e) => {
177 #[cfg(not(tidehunter))]
178 {
179 warn!(
180 "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
181 f.path(),
182 e
183 );
184 metrics
185 .error_pruning_consensus_dbs
186 .with_label_values(&["safe"])
187 .inc();
188
189 if let Err(err) = fs::remove_dir_all(f.path()) {
190 error!(
191 "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
192 f.path(),
193 err
194 );
195 metrics
196 .error_pruning_consensus_dbs
197 .with_label_values(&["force"])
198 .inc();
199 } else {
200 info!(
201 "Successfully pruned consensus epoch storage directory with force delete: {:?}",
202 f.path()
203 );
204 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
205 metrics
206 .last_pruned_consensus_db_epoch
207 .set(last_epoch.max(file_epoch as i64));
208 metrics.successfully_pruned_consensus_dbs.inc();
209 }
210 }
211 #[cfg(tidehunter)]
212 {
213 error!(
214 "Could not prune old consensus storage \"{:?}\" directory: {:?}",
215 f.path(),
216 e
217 );
218 metrics
219 .error_pruning_consensus_dbs
220 .with_label_values(&["safe"])
221 .inc();
222 }
223 }
224 }
225 }
226 }
227
228 info!(
229 "Completed old epoch data removal process for epoch {:?}",
230 current_epoch
231 );
232 }
233}
234
235#[cfg(test)]
236mod tests {
237 use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
238 use prometheus::Registry;
239 use std::fs;
240 use tokio::time::sleep;
241
242 #[tokio::test]
243 async fn test_remove_old_epoch_data() {
244 telemetry_subscribers::init_for_testing();
245 let metrics = Metrics::new(&Registry::new());
246
247 {
248 let epoch_retention = 0;
250 let current_epoch = 0;
251
252 let base_directory = tempfile::tempdir().unwrap().keep();
253
254 create_epoch_directories(&base_directory, vec!["0", "other"]);
255
256 ConsensusStorePruner::prune_old_epoch_data(
257 &base_directory,
258 current_epoch,
259 epoch_retention,
260 &metrics,
261 )
262 .await;
263
264 let epochs_left = read_epoch_directories(&base_directory);
265
266 assert_eq!(epochs_left.len(), 1);
267 assert_eq!(epochs_left[0], 0);
268 }
269
270 {
271 let epoch_retention = 1;
273 let current_epoch = 100;
274
275 let base_directory = tempfile::tempdir().unwrap().keep();
276
277 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
278
279 ConsensusStorePruner::prune_old_epoch_data(
280 &base_directory,
281 current_epoch,
282 epoch_retention,
283 &metrics,
284 )
285 .await;
286
287 let epochs_left = read_epoch_directories(&base_directory);
288
289 assert_eq!(epochs_left.len(), 2);
290 assert_eq!(epochs_left[0], 99);
291 assert_eq!(epochs_left[1], 100);
292 }
293
294 {
295 let epoch_retention = 0;
298 let current_epoch = 100;
299
300 let base_directory = tempfile::tempdir().unwrap().keep();
301
302 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
303
304 ConsensusStorePruner::prune_old_epoch_data(
305 &base_directory,
306 current_epoch,
307 epoch_retention,
308 &metrics,
309 )
310 .await;
311
312 let epochs_left = read_epoch_directories(&base_directory);
313
314 assert_eq!(epochs_left.len(), 1);
315 assert_eq!(epochs_left[0], 100);
316 }
317 }
318
319 #[tokio::test(flavor = "current_thread")]
320 async fn test_consensus_store_pruner() {
321 let epoch_retention = 1;
322 let epoch_prune_period = std::time::Duration::from_millis(500);
323
324 let base_directory = tempfile::tempdir().unwrap().keep();
325
326 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
328
329 let pruner = ConsensusStorePruner::new(
330 base_directory.clone(),
331 epoch_retention,
332 epoch_prune_period,
333 &Registry::new(),
334 );
335
336 sleep(3 * epoch_prune_period).await;
338
339 let epoch_dirs = read_epoch_directories(&base_directory);
341 assert_eq!(epoch_dirs.len(), 4);
342
343 pruner.prune(100).await;
345
346 sleep(2 * epoch_prune_period).await;
348
349 let epoch_dirs = read_epoch_directories(&base_directory);
350 assert_eq!(epoch_dirs.len(), 2);
351 assert_eq!(epoch_dirs[0], 99);
352 assert_eq!(epoch_dirs[1], 100);
353 }
354
355 fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
356 for epoch in epochs {
357 let mut path = base_directory.to_path_buf();
358 path.push(epoch);
359 fs::create_dir(path).unwrap();
360 }
361 }
362
363 fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
364 let files = fs::read_dir(base_directory).unwrap();
365
366 let mut epochs = Vec::new();
367 for file_res in files {
368 let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
369 if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
370 epochs.push(file_epoch);
371 }
372 }
373
374 epochs.sort();
375 epochs
376 }
377}