1use super::authority_store_tables::{AuthorityPerpetualTables, AuthorityPrunerTables};
5use crate::authority::authority_store_types::{StoreObject, StoreObjectWrapper};
6use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
7use crate::jsonrpc_index::IndexStore;
8use crate::rpc_index::RpcIndexStore;
9use anyhow::anyhow;
10use bincode::Options;
11use mysten_metrics::{monitored_scope, spawn_monitored_task};
12use once_cell::sync::Lazy;
13use prometheus::{
14 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
15 register_int_gauge_with_registry,
16};
17#[cfg(tidehunter)]
18use serde::de::DeserializeOwned;
19use std::cmp::{max, min};
20use std::collections::{BTreeSet, HashMap};
21use std::sync::atomic::AtomicU64;
22use std::sync::{Mutex, Weak};
23use std::time::{SystemTime, UNIX_EPOCH};
24use std::{sync::Arc, time::Duration};
25use sui_config::node::AuthorityStorePruningConfig;
26use sui_types::committee::EpochId;
27use sui_types::effects::TransactionEffects;
28use sui_types::effects::TransactionEffectsAPI;
29use sui_types::message_envelope::Message;
30use sui_types::messages_checkpoint::{
31 CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
32};
33use sui_types::{
34 base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
35 storage::ObjectKey,
36};
37use tokio::sync::oneshot::{self, Sender};
38use tokio::time::Instant;
39use tracing::{debug, error, info, warn};
40use typed_store::rocksdb::LiveFile;
41use typed_store::rocksdb::compaction_filter::Decision;
42use typed_store::{Map, TypedStoreError};
43
44static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
45 [
46 "objects",
47 "effects",
48 "transactions",
49 "events",
50 "executed_effects",
51 "executed_transactions_to_checkpoint",
52 ]
53 .into_iter()
54 .map(|cf| cf.to_string())
55 .collect()
56});
57pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
58pub struct AuthorityStorePruner {
59 _objects_pruner_cancel_handle: oneshot::Sender<()>,
60}
61
62#[derive(Default)]
63pub struct PrunerWatermarks {
64 pub epoch_id: Arc<AtomicU64>,
65 pub checkpoint_id: Arc<AtomicU64>,
66}
67
68static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
69
70pub struct AuthorityStorePruningMetrics {
71 pub last_pruned_checkpoint: IntGauge,
72 pub num_pruned_objects: IntCounter,
73 pub num_pruned_tombstones: IntCounter,
74 pub last_pruned_effects_checkpoint: IntGauge,
75 pub last_pruned_indexes_transaction: IntGauge,
76 pub num_epochs_to_retain_for_objects: IntGauge,
77 pub num_epochs_to_retain_for_checkpoints: IntGauge,
78}
79
80impl AuthorityStorePruningMetrics {
81 pub fn new(registry: &Registry) -> Arc<Self> {
82 let this = Self {
83 last_pruned_checkpoint: register_int_gauge_with_registry!(
84 "last_pruned_checkpoint",
85 "Last pruned checkpoint",
86 registry
87 )
88 .unwrap(),
89 num_pruned_objects: register_int_counter_with_registry!(
90 "num_pruned_objects",
91 "Number of pruned objects",
92 registry
93 )
94 .unwrap(),
95 num_pruned_tombstones: register_int_counter_with_registry!(
96 "num_pruned_tombstones",
97 "Number of pruned tombstones",
98 registry
99 )
100 .unwrap(),
101 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
102 "last_pruned_effects_checkpoint",
103 "Last pruned effects checkpoint",
104 registry
105 )
106 .unwrap(),
107 last_pruned_indexes_transaction: register_int_gauge_with_registry!(
108 "last_pruned_indexes_transaction",
109 "Last pruned indexes transaction",
110 registry
111 )
112 .unwrap(),
113 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
114 "num_epochs_to_retain_for_objects",
115 "Number of epochs to retain for objects",
116 registry
117 )
118 .unwrap(),
119 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
120 "num_epochs_to_retain_for_checkpoints",
121 "Number of epochs to retain for checkpoints",
122 registry
123 )
124 .unwrap(),
125 };
126 Arc::new(this)
127 }
128
129 pub fn new_for_test() -> Arc<Self> {
130 Self::new(&Registry::new())
131 }
132}
133
134#[derive(Debug, Clone, Copy, PartialEq)]
135pub enum PruningMode {
136 Objects,
137 Checkpoints,
138}
139
140impl AuthorityStorePruner {
141 async fn prune_objects(
143 transaction_effects: Vec<TransactionEffects>,
144 perpetual_db: &Arc<AuthorityPerpetualTables>,
145 pruner_db: Option<&Arc<AuthorityPrunerTables>>,
146 checkpoint_number: CheckpointSequenceNumber,
147 metrics: Arc<AuthorityStorePruningMetrics>,
148 enable_pruning_tombstones: bool,
149 ) -> anyhow::Result<()> {
150 let _scope = monitored_scope("ObjectsLivePruner");
151 let mut wb = perpetual_db.objects.batch();
152 let mut pruner_db_wb = pruner_db.map(|db| db.object_tombstones.batch());
153
154 let mut live_object_keys_to_prune = vec![];
156 let mut object_tombstones_to_prune = vec![];
157 for effects in &transaction_effects {
158 for (object_id, seq_number) in effects.modified_at_versions() {
159 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
160 }
161
162 if enable_pruning_tombstones {
163 for deleted_object_key in effects.all_tombstones() {
164 object_tombstones_to_prune
165 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
166 }
167 }
168 }
169
170 metrics
171 .num_pruned_objects
172 .inc_by(live_object_keys_to_prune.len() as u64);
173 metrics
174 .num_pruned_tombstones
175 .inc_by(object_tombstones_to_prune.len() as u64);
176
177 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
178 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
179 updates
180 .entry(object_id)
181 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
182 .or_insert((seq_number, seq_number));
183 }
184
185 for (object_id, (min_version, max_version)) in updates {
186 debug!(
187 "Pruning object {:?} versions {:?} - {:?}",
188 object_id, min_version, max_version
189 );
190 match pruner_db_wb {
191 Some(ref mut batch) => {
192 batch.insert_batch(
193 &pruner_db.expect("invariant checked").object_tombstones,
194 std::iter::once((object_id, max_version)),
195 )?;
196 }
197 None => {
198 let start_range = ObjectKey(object_id, min_version);
199 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
200 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
201 }
202 }
203 }
204
205 if !object_tombstones_to_prune.is_empty() {
211 let mut object_keys_to_delete = vec![];
212 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
213 for result in perpetual_db.objects.safe_iter_with_bounds(
214 Some(ObjectKey(object_id, VersionNumber::MIN)),
215 Some(ObjectKey(object_id, seq_number.next())),
216 ) {
217 let (object_key, _) = result?;
218 assert_eq!(object_key.0, object_id);
219 object_keys_to_delete.push(object_key);
220 }
221 }
222
223 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
224 }
225
226 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
227 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
228
229 if let Some(batch) = pruner_db_wb {
230 batch.write()?;
231 }
232 wb.write()?;
233 Ok(())
234 }
235
236 fn prune_checkpoints(
237 perpetual_db: &Arc<AuthorityPerpetualTables>,
238 checkpoint_db: &Arc<CheckpointStore>,
239 rpc_index: Option<&RpcIndexStore>,
240 checkpoint_number: CheckpointSequenceNumber,
241 checkpoints_to_prune: Vec<CheckpointDigest>,
242 checkpoint_content_to_prune: Vec<CheckpointContents>,
243 effects_to_prune: &Vec<TransactionEffects>,
244 metrics: Arc<AuthorityStorePruningMetrics>,
245 ) -> anyhow::Result<()> {
246 let _scope = monitored_scope("EffectsLivePruner");
247
248 let mut perpetual_batch = perpetual_db.objects.batch();
249 let transactions: Vec<_> = checkpoint_content_to_prune
250 .iter()
251 .flat_map(|content| content.iter().map(|tx| tx.transaction))
252 .collect();
253
254 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
255 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
256 perpetual_batch.delete_batch(
257 &perpetual_db.executed_transactions_to_checkpoint,
258 transactions.iter(),
259 )?;
260
261 let mut effect_digests = vec![];
262 for effects in effects_to_prune {
263 let effects_digest = effects.digest();
264 debug!("Pruning effects {:?}", effects_digest);
265 effect_digests.push(effects_digest);
266
267 if effects.events_digest().is_some() {
268 perpetual_batch
269 .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
270 }
271 }
272 perpetual_batch.delete_batch(
273 &perpetual_db.unchanged_loaded_runtime_objects,
274 transactions.iter(),
275 )?;
276 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
277
278 let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
279
280 let checkpoint_content_digests =
281 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
282 checkpoints_batch.delete_batch(
283 &checkpoint_db.tables.checkpoint_content,
284 checkpoint_content_digests.clone(),
285 )?;
286 checkpoints_batch.delete_batch(
287 &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
288 checkpoint_content_digests,
289 )?;
290
291 checkpoints_batch.delete_batch(
292 &checkpoint_db.tables.checkpoint_by_digest,
293 checkpoints_to_prune,
294 )?;
295
296 checkpoints_batch.insert_batch(
297 &checkpoint_db.tables.watermarks,
298 [(
299 &CheckpointWatermark::HighestPruned,
300 &(checkpoint_number, CheckpointDigest::random()),
301 )],
302 )?;
303
304 if let Some(rpc_index) = rpc_index {
305 rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
306 }
307 perpetual_batch.write()?;
308 checkpoints_batch.write()?;
309 metrics
310 .last_pruned_effects_checkpoint
311 .set(checkpoint_number as i64);
312
313 Ok(())
314 }
315
316 pub async fn prune_objects_for_eligible_epochs(
318 perpetual_db: &Arc<AuthorityPerpetualTables>,
319 checkpoint_store: &Arc<CheckpointStore>,
320 rpc_index: Option<&RpcIndexStore>,
321 pruner_db: Option<&Arc<AuthorityPrunerTables>>,
322 config: AuthorityStorePruningConfig,
323 metrics: Arc<AuthorityStorePruningMetrics>,
324 epoch_duration_ms: u64,
325 ) -> anyhow::Result<()> {
326 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
327 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
328 .get_highest_executed_checkpoint()?
329 .map(|c| (*c.sequence_number(), c.epoch))
330 .unwrap_or_default();
331 let pruned_checkpoint_number = perpetual_db
332 .get_highest_pruned_checkpoint()?
333 .unwrap_or_default();
334 if config.smooth && config.num_epochs_to_retain > 0 {
335 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
336 checkpoint_store,
337 max_eligible_checkpoint_number,
338 pruned_checkpoint_number,
339 epoch_id,
340 epoch_duration_ms,
341 config.num_epochs_to_retain,
342 )?;
343 }
344 Self::prune_for_eligible_epochs(
345 perpetual_db,
346 checkpoint_store,
347 rpc_index,
348 pruner_db,
349 PruningMode::Objects,
350 config.num_epochs_to_retain,
351 pruned_checkpoint_number,
352 max_eligible_checkpoint_number,
353 config,
354 metrics.clone(),
355 )
356 .await
357 }
358
359 pub async fn prune_checkpoints_for_eligible_epochs(
360 perpetual_db: &Arc<AuthorityPerpetualTables>,
361 checkpoint_store: &Arc<CheckpointStore>,
362 rpc_index: Option<&RpcIndexStore>,
363 pruner_db: Option<&Arc<AuthorityPrunerTables>>,
364 config: AuthorityStorePruningConfig,
365 metrics: Arc<AuthorityStorePruningMetrics>,
366 epoch_duration_ms: u64,
367 pruner_watermarks: &Arc<PrunerWatermarks>,
368 ) -> anyhow::Result<()> {
369 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
370 let pruned_checkpoint_number = checkpoint_store
371 .get_highest_pruned_checkpoint_seq_number()?
372 .unwrap_or(0);
373 let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
374 .get_highest_executed_checkpoint()?
375 .map(|c| (*c.sequence_number(), c.epoch))
376 .unwrap_or_default();
377 if config.num_epochs_to_retain != u64::MAX {
378 max_eligible_checkpoint = min(
379 max_eligible_checkpoint,
380 perpetual_db
381 .get_highest_pruned_checkpoint()?
382 .unwrap_or_default(),
383 );
384 }
385 if config.smooth
386 && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
387 {
388 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
389 checkpoint_store,
390 max_eligible_checkpoint,
391 pruned_checkpoint_number,
392 epoch_id,
393 epoch_duration_ms,
394 num_epochs_to_retain,
395 )?;
396 }
397 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
398 Self::prune_for_eligible_epochs(
399 perpetual_db,
400 checkpoint_store,
401 rpc_index,
402 pruner_db,
403 PruningMode::Checkpoints,
404 config
405 .num_epochs_to_retain_for_checkpoints()
406 .ok_or_else(|| anyhow!("config value not set"))?,
407 pruned_checkpoint_number,
408 max_eligible_checkpoint,
409 config.clone(),
410 metrics.clone(),
411 )
412 .await?;
413
414 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
415 Self::update_pruning_watermarks(
416 checkpoint_store,
417 num_epochs_to_retain,
418 pruner_watermarks,
419 )?;
420 }
421 Ok(())
422 }
423
424 pub async fn prune_for_eligible_epochs(
426 perpetual_db: &Arc<AuthorityPerpetualTables>,
427 checkpoint_store: &Arc<CheckpointStore>,
428 rpc_index: Option<&RpcIndexStore>,
429 pruner_db: Option<&Arc<AuthorityPrunerTables>>,
430 mode: PruningMode,
431 num_epochs_to_retain: u64,
432 starting_checkpoint_number: CheckpointSequenceNumber,
433 max_eligible_checkpoint: CheckpointSequenceNumber,
434 config: AuthorityStorePruningConfig,
435 metrics: Arc<AuthorityStorePruningMetrics>,
436 ) -> anyhow::Result<()> {
437 let _scope = monitored_scope("PruneForEligibleEpochs");
438
439 let mut checkpoint_number = starting_checkpoint_number;
440 let current_epoch = checkpoint_store
441 .get_highest_executed_checkpoint()?
442 .map(|c| c.epoch())
443 .unwrap_or_default();
444
445 let mut checkpoints_to_prune = vec![];
446 let mut checkpoint_content_to_prune = vec![];
447 let mut effects_to_prune = vec![];
448
449 loop {
450 let Some(ckpt) = checkpoint_store
451 .tables
452 .certified_checkpoints
453 .get(&(checkpoint_number + 1))?
454 else {
455 break;
456 };
457 let checkpoint = ckpt.into_inner();
458 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
463 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
464 {
465 break;
466 }
467 checkpoint_number = *checkpoint.sequence_number();
468
469 let content = checkpoint_store
470 .get_checkpoint_contents(&checkpoint.content_digest)?
471 .ok_or_else(|| {
472 anyhow::anyhow!(
473 "checkpoint content data is missing: {}",
474 checkpoint.sequence_number
475 )
476 })?;
477 let effects = perpetual_db
478 .effects
479 .multi_get(content.iter().map(|tx| tx.effects))?;
480
481 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
482 checkpoints_to_prune.push(*checkpoint.digest());
483 checkpoint_content_to_prune.push(content);
484 effects_to_prune.extend(effects.into_iter().flatten());
485
486 if effects_to_prune.len() >= config.max_transactions_in_batch
487 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
488 {
489 match mode {
490 PruningMode::Objects => {
491 Self::prune_objects(
492 effects_to_prune,
493 perpetual_db,
494 pruner_db,
495 checkpoint_number,
496 metrics.clone(),
497 !config.killswitch_tombstone_pruning,
498 )
499 .await?
500 }
501 PruningMode::Checkpoints => Self::prune_checkpoints(
502 perpetual_db,
503 checkpoint_store,
504 rpc_index,
505 checkpoint_number,
506 checkpoints_to_prune,
507 checkpoint_content_to_prune,
508 &effects_to_prune,
509 metrics.clone(),
510 )?,
511 };
512 checkpoints_to_prune = vec![];
513 checkpoint_content_to_prune = vec![];
514 effects_to_prune = vec![];
515 tokio::task::yield_now().await;
517 }
518 }
519
520 if !checkpoints_to_prune.is_empty() {
521 match mode {
522 PruningMode::Objects => {
523 Self::prune_objects(
524 effects_to_prune,
525 perpetual_db,
526 pruner_db,
527 checkpoint_number,
528 metrics.clone(),
529 !config.killswitch_tombstone_pruning,
530 )
531 .await?
532 }
533 PruningMode::Checkpoints => Self::prune_checkpoints(
534 perpetual_db,
535 checkpoint_store,
536 rpc_index,
537 checkpoint_number,
538 checkpoints_to_prune,
539 checkpoint_content_to_prune,
540 &effects_to_prune,
541 metrics.clone(),
542 )?,
543 };
544 }
545 Ok(())
546 }
547
548 fn prune_indexes(
549 indexes: Option<&IndexStore>,
550 config: &AuthorityStorePruningConfig,
551 epoch_duration_ms: u64,
552 metrics: &AuthorityStorePruningMetrics,
553 ) -> anyhow::Result<()> {
554 if let (Some(mut epochs_to_retain), Some(indexes)) =
555 (config.num_epochs_to_retain_for_indexes, indexes)
556 {
557 if epochs_to_retain < 7 {
558 warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
559 epochs_to_retain = 7;
560 }
561 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
562 if let Some(cut_time_ms) =
563 u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
564 {
565 let transaction_id = indexes.prune(cut_time_ms)?;
566 metrics
567 .last_pruned_indexes_transaction
568 .set(transaction_id as i64);
569 }
570 }
571 Ok(())
572 }
573
574 async fn prune_executed_tx_digests(
575 perpetual_db: &Arc<AuthorityPerpetualTables>,
576 checkpoint_store: &Arc<CheckpointStore>,
577 ) -> anyhow::Result<()> {
578 let current_epoch = checkpoint_store
579 .get_highest_executed_checkpoint()?
580 .map(|c| c.epoch)
581 .unwrap_or_default();
582
583 if current_epoch < 2 {
584 return Ok(());
585 }
586
587 let target_epoch = current_epoch - 1;
588
589 let start_key = (0u64, TransactionDigest::ZERO);
590 let end_key = (target_epoch, TransactionDigest::ZERO);
591
592 info!(
593 "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
594 target_epoch, current_epoch
595 );
596
597 let mut batch = perpetual_db.executed_transaction_digests.batch();
598 batch.schedule_delete_range(
599 &perpetual_db.executed_transaction_digests,
600 &start_key,
601 &end_key,
603 )?;
604 batch.write()?;
605 Ok(())
606 }
607
608 #[cfg(tidehunter)]
609 fn prune_executed_tx_digests_th(
610 perpetual_db: &Arc<AuthorityPerpetualTables>,
611 checkpoint_store: &Arc<CheckpointStore>,
612 ) -> anyhow::Result<()> {
613 let current_epoch = checkpoint_store
614 .get_highest_executed_checkpoint()?
615 .map(|c| c.epoch)
616 .unwrap_or_default();
617
618 if current_epoch < 2 {
619 return Ok(());
620 }
621
622 let last_epoch_to_delete = current_epoch - 2;
623 let from_key = (0u64, TransactionDigest::ZERO);
624 let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
625 info!(
626 "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
627 last_epoch_to_delete, current_epoch
628 );
629 perpetual_db
630 .executed_transaction_digests
631 .drop_cells_in_range(&from_key, &to_key)?;
632 Ok(())
633 }
634
635 fn update_pruning_watermarks(
636 checkpoint_store: &Arc<CheckpointStore>,
637 num_epochs_to_retain: u64,
638 pruning_watermark: &Arc<PrunerWatermarks>,
639 ) -> anyhow::Result<bool> {
640 use std::sync::atomic::Ordering;
641 let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
642 let current_epoch_id = checkpoint_store
643 .get_highest_executed_checkpoint()?
644 .map(|c| c.epoch)
645 .unwrap_or_default();
646 if current_epoch_id < num_epochs_to_retain {
647 return Ok(false);
648 }
649 let target_epoch_id = current_epoch_id - num_epochs_to_retain;
650 let checkpoint_id =
651 checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
652
653 let new_watermark = target_epoch_id + 1;
654 if current_watermark == new_watermark {
655 return Ok(false);
656 }
657 info!("relocation: setting epoch watermark to {}", new_watermark);
658 pruning_watermark
659 .epoch_id
660 .store(new_watermark, Ordering::Relaxed);
661 if let Some(checkpoint_id) = checkpoint_id {
662 info!(
663 "relocation: setting checkpoint watermark to {}",
664 checkpoint_id
665 );
666 pruning_watermark
667 .checkpoint_id
668 .store(checkpoint_id, Ordering::Relaxed);
669 }
670 Ok(true)
671 }
672
673 #[cfg(tidehunter)]
674 fn prune_th(
675 perpetual_db: &Arc<AuthorityPerpetualTables>,
676 checkpoint_store: &Arc<CheckpointStore>,
677 num_epochs_to_retain: u64,
678 pruning_watermark: Arc<PrunerWatermarks>,
679 ) -> anyhow::Result<()> {
680 let watermark_updated = Self::update_pruning_watermarks(
681 checkpoint_store,
682 num_epochs_to_retain,
683 &pruning_watermark,
684 )?;
685 if !watermark_updated {
686 info!("skip relocation. Watermark hasn't changed");
687 return Ok(());
688 }
689 perpetual_db.objects.db.start_relocation()?;
690 checkpoint_store.tables.watermarks.db.start_relocation()?;
691 Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
692 Ok(())
693 }
694
695 fn compact_next_sst_file(
696 perpetual_db: Arc<AuthorityPerpetualTables>,
697 delay_days: usize,
698 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
699 ) -> anyhow::Result<Option<LiveFile>> {
700 let db_path = perpetual_db.objects.db.path_for_pruning();
701 let mut state = last_processed
702 .lock()
703 .expect("failed to obtain a lock for last processed SST files");
704 let mut sst_file_for_compaction: Option<LiveFile> = None;
705 let time_threshold =
706 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
707 for sst_file in perpetual_db.objects.db.live_files()? {
708 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
709 let last_modified = std::fs::metadata(file_path)?.modified()?;
710 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
711 || sst_file.level < 1
712 || sst_file.start_key.is_none()
713 || sst_file.end_key.is_none()
714 || last_modified > time_threshold
715 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
716 {
717 continue;
718 }
719 if let Some(candidate) = &sst_file_for_compaction
720 && candidate.size > sst_file.size
721 {
722 continue;
723 }
724 sst_file_for_compaction = Some(sst_file);
725 }
726 let Some(sst_file) = sst_file_for_compaction else {
727 return Ok(None);
728 };
729 info!(
730 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
731 sst_file.name, sst_file.size, sst_file.level
732 );
733 perpetual_db.objects.compact_range_raw(
734 &sst_file.column_family_name,
735 sst_file.start_key.clone().unwrap(),
736 sst_file.end_key.clone().unwrap(),
737 )?;
738 state.insert(sst_file.name.clone(), SystemTime::now());
739 Ok(Some(sst_file))
740 }
741
742 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
743 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
744 }
745
746 fn smoothed_max_eligible_checkpoint_number(
747 checkpoint_store: &Arc<CheckpointStore>,
748 mut max_eligible_checkpoint: CheckpointSequenceNumber,
749 pruned_checkpoint: CheckpointSequenceNumber,
750 epoch_id: EpochId,
751 epoch_duration_ms: u64,
752 num_epochs_to_retain: u64,
753 ) -> anyhow::Result<CheckpointSequenceNumber> {
754 if epoch_id < num_epochs_to_retain {
755 return Ok(0);
756 }
757 let last_checkpoint_in_epoch = checkpoint_store
758 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
759 .map(|checkpoint| checkpoint.sequence_number)
760 .unwrap_or_default();
761 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
762 if max_eligible_checkpoint == 0 {
763 return Ok(max_eligible_checkpoint);
764 }
765 let num_intervals = epoch_duration_ms
766 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
767 .unwrap_or(1);
768 let delta = max_eligible_checkpoint
769 .checked_sub(pruned_checkpoint)
770 .unwrap_or_default()
771 .checked_div(num_intervals)
772 .unwrap_or(1);
773 Ok(pruned_checkpoint + delta)
774 }
775
776 fn setup_pruning(
777 config: AuthorityStorePruningConfig,
778 epoch_duration_ms: u64,
779 perpetual_db: Arc<AuthorityPerpetualTables>,
780 checkpoint_store: Arc<CheckpointStore>,
781 rpc_index: Option<Arc<RpcIndexStore>>,
782 jsonrpc_index: Option<Arc<IndexStore>>,
783 pruner_db: Option<Arc<AuthorityPrunerTables>>,
784 metrics: Arc<AuthorityStorePruningMetrics>,
785 pruner_watermarks: Arc<PrunerWatermarks>,
786 ) -> Sender<()> {
787 let (sender, mut recv) = tokio::sync::oneshot::channel();
788 debug!(
789 "Starting object pruning service with num_epochs_to_retain={}",
790 config.num_epochs_to_retain
791 );
792
793 let tick_duration =
794 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
795 let pruning_initial_delay = if cfg!(msim) {
796 Duration::from_millis(1)
797 } else {
798 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
799 };
800 let mut objects_prune_interval =
801 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
802
803 metrics
804 .num_epochs_to_retain_for_objects
805 .set(config.num_epochs_to_retain as i64);
806 metrics.num_epochs_to_retain_for_checkpoints.set(
807 config
808 .num_epochs_to_retain_for_checkpoints
809 .unwrap_or_default() as i64,
810 );
811
812 #[cfg(tidehunter)]
813 {
814 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
815 tokio::task::spawn(async move {
816 loop {
817 objects_prune_interval.tick().await;
818 if let Err(err) = Self::prune_th(
819 &perpetual_db,
820 &checkpoint_store,
821 num_epochs_to_retain,
822 pruner_watermarks.clone(),
823 ) {
824 error!("Failed to prune tidehunter: {:?}", err);
825 }
826 }
827 });
828 }
829 }
830 #[cfg(not(tidehunter))]
831 {
832 let mut checkpoints_prune_interval =
833 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
834 let mut indexes_prune_interval =
835 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
836
837 let perpetual_db_for_compaction = perpetual_db.clone();
838 if let Some(delay_days) = config.periodic_compaction_threshold_days {
839 spawn_monitored_task!(async move {
840 let last_processed = Arc::new(Mutex::new(HashMap::new()));
841 loop {
842 let db = perpetual_db_for_compaction.clone();
843 let state = Arc::clone(&last_processed);
844 let result = tokio::task::spawn_blocking(move || {
845 Self::compact_next_sst_file(db, delay_days, state)
846 })
847 .await;
848 let mut sleep_interval_secs = 1;
849 match result {
850 Err(err) => error!("Failed to compact sst file: {:?}", err),
851 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
852 Ok(Ok(None)) => {
853 sleep_interval_secs = 3600;
854 }
855 _ => {}
856 }
857 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
858 }
859 });
860 }
861 tokio::task::spawn(async move {
862 loop {
863 tokio::select! {
864 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
865 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), pruner_db.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
866 error!("Failed to prune objects: {:?}", err);
867 }
868 if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
869 error!("Failed to prune executed_tx_digests: {:?}", err);
870 }
871 },
872 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
873 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), pruner_db.as_ref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
874 error!("Failed to prune checkpoints: {:?}", err);
875 }
876 },
877 _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
878 if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
879 error!("Failed to prune indexes: {:?}", err);
880 }
881 }
882 _ = &mut recv => break,
883 }
884 }
885 });
886 }
887 sender
888 }
889
890 pub fn new(
891 perpetual_db: Arc<AuthorityPerpetualTables>,
892 checkpoint_store: Arc<CheckpointStore>,
893 rpc_index: Option<Arc<RpcIndexStore>>,
894 jsonrpc_index: Option<Arc<IndexStore>>,
895 mut pruning_config: AuthorityStorePruningConfig,
896 is_validator: bool,
897 epoch_duration_ms: u64,
898 registry: &Registry,
899 pruner_db: Option<Arc<AuthorityPrunerTables>>,
900 pruner_watermarks: Arc<PrunerWatermarks>, ) -> Self {
902 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
903 {
904 warn!(
905 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
906 pruning_config.num_epochs_to_retain
907 );
908 if is_validator {
909 warn!("Resetting to aggressive pruner.");
910 pruning_config.num_epochs_to_retain = 0;
911 } else {
912 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
913 }
914 }
915 AuthorityStorePruner {
916 _objects_pruner_cancel_handle: Self::setup_pruning(
917 pruning_config,
918 epoch_duration_ms,
919 perpetual_db,
920 checkpoint_store,
921 rpc_index,
922 jsonrpc_index,
923 pruner_db,
924 AuthorityStorePruningMetrics::new(registry),
925 pruner_watermarks,
926 ),
927 }
928 }
929
930 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
931 perpetual_db.objects.compact_range(
932 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
933 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
934 )
935 }
936}
937
938#[cfg(tidehunter)]
939pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
940 config: typed_store::tidehunter_util::KeySpaceConfig,
941 pruner_watermark: Arc<AtomicU64>,
942 extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
943 by_key: bool,
944) -> typed_store::tidehunter_util::KeySpaceConfig {
945 use bincode::Options;
946 use std::sync::atomic::Ordering;
947 use typed_store::tidehunter_util::Decision;
948 config.with_relocation_filter(move |key, value| {
949 let data = if by_key {
950 bincode::DefaultOptions::new()
951 .with_big_endian()
952 .with_fixint_encoding()
953 .deserialize(&key)
954 .expect("relocation filter deserialization error")
955 } else {
956 bcs::from_bytes(&value).expect("relocation filter deserialization error")
957 };
958 if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
959 Decision::Remove
960 } else {
961 Decision::StopRelocation
962 }
963 })
964}
965
966#[derive(Clone)]
967pub struct ObjectsCompactionFilter {
968 db: Weak<AuthorityPrunerTables>,
969 metrics: Arc<ObjectCompactionMetrics>,
970}
971
972impl ObjectsCompactionFilter {
973 pub fn new(db: Arc<AuthorityPrunerTables>, registry: &Registry) -> Self {
974 Self {
975 db: Arc::downgrade(&db),
976 metrics: ObjectCompactionMetrics::new(registry),
977 }
978 }
979 pub fn filter(&mut self, key: &[u8], value: &[u8]) -> anyhow::Result<Decision> {
980 let ObjectKey(object_id, version) = bincode::DefaultOptions::new()
981 .with_big_endian()
982 .with_fixint_encoding()
983 .deserialize(key)?;
984 let object: StoreObjectWrapper = bcs::from_bytes(value)?;
985 if matches!(object.into_inner(), StoreObject::Value(_))
986 && let Some(db) = self.db.upgrade()
987 {
988 match db.object_tombstones.get(&object_id)? {
989 Some(gc_version) => {
990 if version <= gc_version {
991 self.metrics.key_removed.inc();
992 return Ok(Decision::Remove);
993 }
994 self.metrics.key_kept.inc();
995 }
996 None => self.metrics.key_not_found.inc(),
997 }
998 }
999 Ok(Decision::Keep)
1000 }
1001}
1002
1003struct ObjectCompactionMetrics {
1004 key_removed: IntCounter,
1005 key_kept: IntCounter,
1006 key_not_found: IntCounter,
1007}
1008
1009impl ObjectCompactionMetrics {
1010 pub fn new(registry: &Registry) -> Arc<Self> {
1011 Arc::new(Self {
1012 key_removed: register_int_counter_with_registry!(
1013 "objects_compaction_filter_key_removed",
1014 "Compaction key removed",
1015 registry
1016 )
1017 .unwrap(),
1018 key_kept: register_int_counter_with_registry!(
1019 "objects_compaction_filter_key_kept",
1020 "Compaction key kept",
1021 registry
1022 )
1023 .unwrap(),
1024 key_not_found: register_int_counter_with_registry!(
1025 "objects_compaction_filter_key_not_found",
1026 "Compaction key not found",
1027 registry
1028 )
1029 .unwrap(),
1030 })
1031 }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036 use more_asserts as ma;
1037 use std::path::Path;
1038 use std::time::Duration;
1039 use std::{collections::HashSet, sync::Arc};
1040 use tracing::log::info;
1041
1042 use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1043 use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1044 use crate::authority::authority_store_types::{
1045 StoreObject, StoreObjectWrapper, get_store_object,
1046 };
1047 use prometheus::Registry;
1048 use sui_types::base_types::ObjectDigest;
1049 use sui_types::effects::TransactionEffects;
1050 use sui_types::effects::TransactionEffectsAPI;
1051 use sui_types::{
1052 base_types::{ObjectID, SequenceNumber},
1053 object::Object,
1054 storage::ObjectKey,
1055 };
1056 use typed_store::Map;
1057 use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1058
1059 use super::AuthorityStorePruner;
1060
1061 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1062 let perpetual_db_path = path.join(Path::new("perpetual"));
1063 let cf_names = AuthorityPerpetualTables::describe_tables();
1064 let cfs: Vec<_> = cf_names
1065 .keys()
1066 .map(|x| (x.as_str(), default_db_options().options))
1067 .collect();
1068 let perpetual_db = typed_store::rocks::open_cf_opts(
1069 perpetual_db_path,
1070 None,
1071 MetricConf::new("perpetual_pruning"),
1072 &cfs,
1073 );
1074
1075 let mut after_pruning = HashSet::new();
1076 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1077 &perpetual_db?,
1078 Some("objects"),
1079 &ReadWriteOptions::default(),
1082 false,
1083 )?;
1084 let iter = objects.safe_iter();
1085 for item in iter {
1086 after_pruning.insert(item?.0);
1087 }
1088 Ok(after_pruning)
1089 }
1090
1091 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1092
1093 fn generate_test_data(
1094 db: Arc<AuthorityPerpetualTables>,
1095 num_versions_per_object: u64,
1096 num_object_versions_to_retain: u64,
1097 total_unique_object_ids: u32,
1098 ) -> Result<GenerateTestDataResult, anyhow::Error> {
1099 assert!(num_versions_per_object >= num_object_versions_to_retain);
1100
1101 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1102 let mut batch = db.objects.batch();
1103
1104 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1105 for id in ids {
1106 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1107 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1108 if counter < num_object_versions_to_retain.try_into().unwrap() {
1109 to_keep.push(object_key);
1111 } else {
1112 to_delete.push(object_key);
1113 }
1114 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1115 batch.insert_batch(
1116 &db.objects,
1117 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1118 )?;
1119 }
1120
1121 if num_object_versions_to_retain == 0 {
1123 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1124 println!("Adding tombstone object {:?}", tombstone_key);
1125 batch.insert_batch(
1126 &db.objects,
1127 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1128 )?;
1129 tombstones.push(tombstone_key);
1130 }
1131 }
1132 batch.write().unwrap();
1133 assert_eq!(
1134 to_keep.len() as u64,
1135 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1136 * total_unique_object_ids as u64
1137 );
1138 assert_eq!(
1139 tombstones.len() as u64,
1140 if num_object_versions_to_retain == 0 {
1141 total_unique_object_ids as u64
1142 } else {
1143 0
1144 }
1145 );
1146 Ok((to_keep, to_delete, tombstones))
1147 }
1148
1149 async fn run_pruner(
1150 path: &Path,
1151 num_versions_per_object: u64,
1152 num_object_versions_to_retain: u64,
1153 total_unique_object_ids: u32,
1154 ) -> Vec<ObjectKey> {
1155 let registry = Registry::default();
1156 let metrics = AuthorityStorePruningMetrics::new(®istry);
1157 let to_keep = {
1158 let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1159 let (to_keep, to_delete, tombstones) = generate_test_data(
1160 db.clone(),
1161 num_versions_per_object,
1162 num_object_versions_to_retain,
1163 total_unique_object_ids,
1164 )
1165 .unwrap();
1166 let mut effects = TransactionEffects::default();
1167 for object in to_delete {
1168 effects.unsafe_add_deleted_live_object_for_testing((
1169 object.0,
1170 object.1,
1171 ObjectDigest::MIN,
1172 ));
1173 }
1174 for object in tombstones {
1175 effects.unsafe_add_object_tombstone_for_testing((
1176 object.0,
1177 object.1,
1178 ObjectDigest::MIN,
1179 ));
1180 }
1181 AuthorityStorePruner::prune_objects(vec![effects], &db, None, 0, metrics, true)
1182 .await
1183 .unwrap();
1184 to_keep
1185 };
1186 tokio::time::sleep(Duration::from_secs(3)).await;
1187 to_keep
1188 }
1189
1190 #[tokio::test]
1192 async fn test_pruning_objects() {
1193 let path = tempfile::tempdir().unwrap().keep();
1194 let to_keep = run_pruner(&path, 3, 2, 1000).await;
1195 assert_eq!(
1196 HashSet::from_iter(to_keep),
1197 get_keys_after_pruning(&path).unwrap()
1198 );
1199 run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1200 }
1201
1202 #[tokio::test]
1204 async fn test_pruning_tombstones() {
1205 let path = tempfile::tempdir().unwrap().keep();
1206 let to_keep = run_pruner(&path, 0, 0, 1000).await;
1207 assert_eq!(to_keep.len(), 0);
1208 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1209
1210 let path = tempfile::tempdir().unwrap().keep();
1211 let to_keep = run_pruner(&path, 3, 0, 1000).await;
1212 assert_eq!(to_keep.len(), 0);
1213 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1214 }
1215
1216 #[cfg(not(target_env = "msvc"))]
1217 #[tokio::test]
1218 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1219 let primary_path = tempfile::tempdir()?.keep();
1220 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1221 let total_unique_object_ids = 10_000;
1222 let num_versions_per_object = 10;
1223 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1224 let mut to_delete = vec![];
1225 for id in ids {
1226 for i in (0..num_versions_per_object).rev() {
1227 if i < num_versions_per_object - 2 {
1228 to_delete.push((id, SequenceNumber::from(i)));
1229 }
1230 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1231 perpetual_db
1232 .objects
1233 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1234 }
1235 }
1236
1237 fn get_sst_size(path: &Path) -> u64 {
1238 let mut size = 0;
1239 for entry in std::fs::read_dir(path).unwrap() {
1240 let entry = entry.unwrap();
1241 let path = entry.path();
1242 if let Some(ext) = path.extension() {
1243 if ext != "sst" {
1244 continue;
1245 }
1246 size += std::fs::metadata(path).unwrap().len();
1247 }
1248 }
1249 size
1250 }
1251
1252 let db_path = primary_path.clone().join("perpetual");
1253 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1254 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1255
1256 perpetual_db.objects.compact_range(&start, &end)?;
1257 let before_compaction_size = get_sst_size(&db_path);
1258
1259 let mut effects = TransactionEffects::default();
1260 for object in to_delete {
1261 effects.unsafe_add_deleted_live_object_for_testing((
1262 object.0,
1263 object.1,
1264 ObjectDigest::MIN,
1265 ));
1266 }
1267 let registry = Registry::default();
1268 let metrics = AuthorityStorePruningMetrics::new(®istry);
1269 let total_pruned = AuthorityStorePruner::prune_objects(
1270 vec![effects],
1271 &perpetual_db,
1272 None,
1273 0,
1274 metrics,
1275 true,
1276 )
1277 .await;
1278 info!("Total pruned keys = {:?}", total_pruned);
1279
1280 perpetual_db.objects.compact_range(&start, &end)?;
1281 let after_compaction_size = get_sst_size(&db_path);
1282
1283 info!(
1284 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1285 before_compaction_size, after_compaction_size
1286 );
1287 ma::assert_le!(after_compaction_size, before_compaction_size);
1288 Ok(())
1289 }
1290}