1use consensus_config::Epoch;
5use mysten_metrics::spawn_logged_monitored_task;
6use prometheus::{
7 IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
8 register_int_counter_with_registry, register_int_gauge_with_registry,
9};
10use std::fs;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18 last_pruned_consensus_db_epoch: IntGauge,
19 successfully_pruned_consensus_dbs: IntCounter,
20 error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24 fn new(registry: &Registry) -> Self {
25 Self {
26 last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27 "last_pruned_consensus_db_epoch",
28 "The last epoch for which the consensus store was pruned",
29 registry
30 )
31 .unwrap(),
32 successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33 "successfully_pruned_consensus_dbs",
34 "The number of consensus dbs successfully pruned",
35 registry
36 )
37 .unwrap(),
38 error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39 "error_pruning_consensus_dbs",
40 "The number of errors encountered while pruning consensus dbs",
41 &["mode"],
42 registry
43 )
44 .unwrap(),
45 }
46 }
47}
48
49pub struct ConsensusStorePruner {
50 tx_remove: mpsc::Sender<Epoch>,
51 _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55 pub fn new(
56 base_path: PathBuf,
57 epoch_retention: u64,
58 epoch_prune_period: Duration,
59 registry: &Registry,
60 ) -> Self {
61 let (tx_remove, mut rx_remove) = mpsc::channel(1);
62 let metrics = Metrics::new(registry);
63
64 let _handle = spawn_logged_monitored_task!(async {
65 info!(
66 "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67 );
68
69 let mut timeout = tokio::time::interval_at(
70 Instant::now() + Duration::from_secs(60), epoch_prune_period,
72 );
73
74 let mut latest_epoch = 0;
75 loop {
76 tokio::select! {
77 _ = timeout.tick() => {
78 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79 }
80 result = rx_remove.recv() => {
81 if result.is_none() {
82 info!("Closing consensus store pruner");
83 break;
84 }
85 latest_epoch = result.unwrap();
86 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87 }
88 }
89 }
90 });
91
92 Self { tx_remove, _handle }
93 }
94
95 pub async fn prune(&self, current_epoch: Epoch) {
98 let result = self.tx_remove.send(current_epoch).await;
99 if result.is_err() {
100 error!(
101 "Error sending message to data removal task for epoch {:?}",
102 current_epoch,
103 );
104 }
105 }
106
107 async fn prune_old_epoch_data(
108 storage_base_path: &PathBuf,
109 current_epoch: Epoch,
110 epoch_retention: u64,
111 metrics: &Metrics,
112 ) {
113 let drop_boundary = current_epoch.saturating_sub(epoch_retention);
114
115 info!(
116 "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
117 current_epoch, drop_boundary
118 );
119
120 let files = match fs::read_dir(storage_base_path) {
122 Ok(f) => f,
123 Err(e) => {
124 error!(
125 "Can not read the files in the storage path directory for epoch cleanup: {:?}",
126 e
127 );
128 return;
129 }
130 };
131
132 for file_res in files {
134 let f = match file_res {
135 Ok(f) => f,
136 Err(e) => {
137 error!(
138 "Error while cleaning up storage of previous epochs: {:?}",
139 e
140 );
141 continue;
142 }
143 };
144
145 let name = f.file_name();
146 let file_epoch_string = match name.to_str() {
147 Some(f) => f,
148 None => continue,
149 };
150
151 let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
152 Ok(f) => f,
153 Err(e) => {
154 error!(
155 "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
156 e
157 );
158 continue;
159 }
160 };
161
162 if file_epoch < drop_boundary {
163 const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
164 if let Err(e) = safe_drop_db(f.path(), WAIT_BEFORE_FORCE_DELETE).await {
165 warn!(
166 "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
167 f.path(),
168 e
169 );
170 metrics
171 .error_pruning_consensus_dbs
172 .with_label_values(&["safe"])
173 .inc();
174
175 if let Err(err) = fs::remove_dir_all(f.path()) {
176 error!(
177 "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
178 f.path(),
179 err
180 );
181 metrics
182 .error_pruning_consensus_dbs
183 .with_label_values(&["force"])
184 .inc();
185 } else {
186 info!(
187 "Successfully pruned consensus epoch storage directory with force delete: {:?}",
188 f.path()
189 );
190 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
191 metrics
192 .last_pruned_consensus_db_epoch
193 .set(last_epoch.max(file_epoch as i64));
194 metrics.successfully_pruned_consensus_dbs.inc();
195 }
196 } else {
197 info!(
198 "Successfully pruned consensus epoch storage directory: {:?}",
199 f.path()
200 );
201 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
202 metrics
203 .last_pruned_consensus_db_epoch
204 .set(last_epoch.max(file_epoch as i64));
205 metrics.successfully_pruned_consensus_dbs.inc();
206 }
207 }
208 }
209
210 info!(
211 "Completed old epoch data removal process for epoch {:?}",
212 current_epoch
213 );
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
220 use prometheus::Registry;
221 use std::fs;
222 use tokio::time::sleep;
223
224 #[tokio::test]
225 async fn test_remove_old_epoch_data() {
226 telemetry_subscribers::init_for_testing();
227 let metrics = Metrics::new(&Registry::new());
228
229 {
230 let epoch_retention = 0;
232 let current_epoch = 0;
233
234 let base_directory = tempfile::tempdir().unwrap().keep();
235
236 create_epoch_directories(&base_directory, vec!["0", "other"]);
237
238 ConsensusStorePruner::prune_old_epoch_data(
239 &base_directory,
240 current_epoch,
241 epoch_retention,
242 &metrics,
243 )
244 .await;
245
246 let epochs_left = read_epoch_directories(&base_directory);
247
248 assert_eq!(epochs_left.len(), 1);
249 assert_eq!(epochs_left[0], 0);
250 }
251
252 {
253 let epoch_retention = 1;
255 let current_epoch = 100;
256
257 let base_directory = tempfile::tempdir().unwrap().keep();
258
259 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
260
261 ConsensusStorePruner::prune_old_epoch_data(
262 &base_directory,
263 current_epoch,
264 epoch_retention,
265 &metrics,
266 )
267 .await;
268
269 let epochs_left = read_epoch_directories(&base_directory);
270
271 assert_eq!(epochs_left.len(), 2);
272 assert_eq!(epochs_left[0], 99);
273 assert_eq!(epochs_left[1], 100);
274 }
275
276 {
277 let epoch_retention = 0;
280 let current_epoch = 100;
281
282 let base_directory = tempfile::tempdir().unwrap().keep();
283
284 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
285
286 ConsensusStorePruner::prune_old_epoch_data(
287 &base_directory,
288 current_epoch,
289 epoch_retention,
290 &metrics,
291 )
292 .await;
293
294 let epochs_left = read_epoch_directories(&base_directory);
295
296 assert_eq!(epochs_left.len(), 1);
297 assert_eq!(epochs_left[0], 100);
298 }
299 }
300
301 #[tokio::test(flavor = "current_thread")]
302 async fn test_consensus_store_pruner() {
303 let epoch_retention = 1;
304 let epoch_prune_period = std::time::Duration::from_millis(500);
305
306 let base_directory = tempfile::tempdir().unwrap().keep();
307
308 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
310
311 let pruner = ConsensusStorePruner::new(
312 base_directory.clone(),
313 epoch_retention,
314 epoch_prune_period,
315 &Registry::new(),
316 );
317
318 sleep(3 * epoch_prune_period).await;
320
321 let epoch_dirs = read_epoch_directories(&base_directory);
323 assert_eq!(epoch_dirs.len(), 4);
324
325 pruner.prune(100).await;
327
328 sleep(2 * epoch_prune_period).await;
330
331 let epoch_dirs = read_epoch_directories(&base_directory);
332 assert_eq!(epoch_dirs.len(), 2);
333 assert_eq!(epoch_dirs[0], 99);
334 assert_eq!(epoch_dirs[1], 100);
335 }
336
337 fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
338 for epoch in epochs {
339 let mut path = base_directory.to_path_buf();
340 path.push(epoch);
341 fs::create_dir(path).unwrap();
342 }
343 }
344
345 fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
346 let files = fs::read_dir(base_directory).unwrap();
347
348 let mut epochs = Vec::new();
349 for file_res in files {
350 let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
351 if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
352 epochs.push(file_epoch);
353 }
354 }
355
356 epochs.sort();
357 epochs
358 }
359}