1use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13 register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29 CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32 base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33 storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42 [
43 "objects",
44 "effects",
45 "transactions",
46 "events",
47 "executed_effects",
48 "executed_transactions_to_checkpoint",
49 ]
50 .into_iter()
51 .map(|cf| cf.to_string())
52 .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56 _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61 pub epoch_id: Arc<AtomicU64>,
62 pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68 pub last_pruned_checkpoint: IntGauge,
69 pub num_pruned_objects: IntCounter,
70 pub num_pruned_tombstones: IntCounter,
71 pub last_pruned_effects_checkpoint: IntGauge,
72 pub last_pruned_indexes_transaction: IntGauge,
73 pub num_epochs_to_retain_for_objects: IntGauge,
74 pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78 pub fn new(registry: &Registry) -> Arc<Self> {
79 let this = Self {
80 last_pruned_checkpoint: register_int_gauge_with_registry!(
81 "last_pruned_checkpoint",
82 "Last pruned checkpoint",
83 registry
84 )
85 .unwrap(),
86 num_pruned_objects: register_int_counter_with_registry!(
87 "num_pruned_objects",
88 "Number of pruned objects",
89 registry
90 )
91 .unwrap(),
92 num_pruned_tombstones: register_int_counter_with_registry!(
93 "num_pruned_tombstones",
94 "Number of pruned tombstones",
95 registry
96 )
97 .unwrap(),
98 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99 "last_pruned_effects_checkpoint",
100 "Last pruned effects checkpoint",
101 registry
102 )
103 .unwrap(),
104 last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105 "last_pruned_indexes_transaction",
106 "Last pruned indexes transaction",
107 registry
108 )
109 .unwrap(),
110 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111 "num_epochs_to_retain_for_objects",
112 "Number of epochs to retain for objects",
113 registry
114 )
115 .unwrap(),
116 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117 "num_epochs_to_retain_for_checkpoints",
118 "Number of epochs to retain for checkpoints",
119 registry
120 )
121 .unwrap(),
122 };
123 Arc::new(this)
124 }
125
126 pub fn new_for_test() -> Arc<Self> {
127 Self::new(&Registry::new())
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133 Objects,
134 Checkpoints,
135}
136
137impl AuthorityStorePruner {
138 #[cfg(not(tidehunter))]
140 async fn prune_objects_and_indexes(
141 transaction_effects: Vec<TransactionEffects>,
142 perpetual_db: &Arc<AuthorityPerpetualTables>,
143 checkpoint_number: CheckpointSequenceNumber,
144 metrics: Arc<AuthorityStorePruningMetrics>,
145 pruned_tx_seq_exclusive: u64,
146 rpc_index: Option<&RpcIndexStore>,
147 enable_pruning_tombstones: bool,
148 ) -> anyhow::Result<()> {
149 let _scope = monitored_scope("ObjectsLivePruner");
150 let mut wb = perpetual_db.objects.batch();
151
152 let mut live_object_keys_to_prune = vec![];
154 let mut object_tombstones_to_prune = vec![];
155 for effects in &transaction_effects {
156 for (object_id, seq_number) in effects.modified_at_versions() {
157 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158 }
159
160 if enable_pruning_tombstones {
161 for deleted_object_key in effects.all_tombstones() {
162 object_tombstones_to_prune
163 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164 }
165 }
166 }
167
168 metrics
169 .num_pruned_objects
170 .inc_by(live_object_keys_to_prune.len() as u64);
171 metrics
172 .num_pruned_tombstones
173 .inc_by(object_tombstones_to_prune.len() as u64);
174
175 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177 updates
178 .entry(object_id)
179 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180 .or_insert((seq_number, seq_number));
181 }
182
183 for (object_id, (min_version, max_version)) in updates {
184 debug!(
185 "Pruning object {:?} versions {:?} - {:?}",
186 object_id, min_version, max_version
187 );
188 let start_range = ObjectKey(object_id, min_version);
189 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191 }
192
193 if !object_tombstones_to_prune.is_empty() {
199 let mut object_keys_to_delete = vec![];
200 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201 for result in perpetual_db.objects.safe_iter_with_bounds(
202 Some(ObjectKey(object_id, VersionNumber::MIN)),
203 Some(ObjectKey(object_id, seq_number.next())),
204 ) {
205 let (object_key, _) = result?;
206 assert_eq!(object_key.0, object_id);
207 object_keys_to_delete.push(object_key);
208 }
209 }
210
211 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212 }
213
214 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
215 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
216
217 wb.write()?;
218
219 if let Some(rpc_index) = rpc_index {
220 rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
221 }
222
223 Ok(())
224 }
225
226 #[cfg(tidehunter)]
227 async fn prune_objects_and_indexes(
228 transaction_effects: Vec<TransactionEffects>,
229 perpetual_db: &Arc<AuthorityPerpetualTables>,
230 checkpoint_number: CheckpointSequenceNumber,
231 metrics: Arc<AuthorityStorePruningMetrics>,
232 pruned_tx_seq_exclusive: u64,
233 rpc_index: Option<&RpcIndexStore>,
234 _: bool,
235 ) -> anyhow::Result<()> {
236 let _scope = monitored_scope("ObjectsLivePruner");
237 let mut wb = perpetual_db.objects.batch();
238 let mut objects_to_prune = vec![];
239
240 for effects in &transaction_effects {
241 for (object_id, version) in effects
242 .modified_at_versions()
243 .into_iter()
244 .chain(effects.all_tombstones())
245 {
246 debug!("Pruning object {:?} version {:?}", object_id, version);
247 objects_to_prune.push(ObjectKey(object_id, version));
248 }
249 }
250 metrics
251 .num_pruned_objects
252 .inc_by(objects_to_prune.len() as u64);
253 wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
254
255 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
256 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
257 wb.write()?;
258
259 if let Some(rpc_index) = rpc_index {
260 rpc_index.prune(checkpoint_number, pruned_tx_seq_exclusive)?;
261 }
262
263 Ok(())
264 }
265
266 fn prune_checkpoints(
267 perpetual_db: &Arc<AuthorityPerpetualTables>,
268 checkpoint_db: &Arc<CheckpointStore>,
269 checkpoint_number: CheckpointSequenceNumber,
270 checkpoints_to_prune: Vec<CheckpointDigest>,
271 checkpoint_content_to_prune: Vec<CheckpointContents>,
272 effects_to_prune: &Vec<TransactionEffects>,
273 metrics: Arc<AuthorityStorePruningMetrics>,
274 ) -> anyhow::Result<()> {
275 let _scope = monitored_scope("EffectsLivePruner");
276
277 let mut perpetual_batch = perpetual_db.objects.batch();
278 let transactions: Vec<_> = checkpoint_content_to_prune
279 .iter()
280 .flat_map(|content| content.iter().map(|tx| tx.transaction))
281 .collect();
282
283 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
284 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
285 perpetual_batch.delete_batch(
286 &perpetual_db.executed_transactions_to_checkpoint,
287 transactions.iter(),
288 )?;
289
290 let mut effect_digests = vec![];
291 for effects in effects_to_prune {
292 let effects_digest = effects.digest();
293 debug!("Pruning effects {:?}", effects_digest);
294 effect_digests.push(effects_digest);
295
296 if effects.events_digest().is_some() {
297 perpetual_batch
298 .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
299 }
300 }
301 perpetual_batch.delete_batch(
302 &perpetual_db.unchanged_loaded_runtime_objects,
303 transactions.iter(),
304 )?;
305 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
306
307 let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
308
309 let checkpoint_content_digests =
310 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
311 checkpoints_batch.delete_batch(
312 &checkpoint_db.tables.checkpoint_content,
313 checkpoint_content_digests.clone(),
314 )?;
315 checkpoints_batch.delete_batch(
316 &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
317 checkpoint_content_digests,
318 )?;
319
320 checkpoints_batch.delete_batch(
321 &checkpoint_db.tables.checkpoint_by_digest,
322 checkpoints_to_prune,
323 )?;
324
325 checkpoints_batch.insert_batch(
326 &checkpoint_db.tables.watermarks,
327 [(
328 &CheckpointWatermark::HighestPruned,
329 &(checkpoint_number, CheckpointDigest::random()),
330 )],
331 )?;
332
333 perpetual_batch.write()?;
334 checkpoints_batch.write()?;
335 metrics
336 .last_pruned_effects_checkpoint
337 .set(checkpoint_number as i64);
338
339 Ok(())
340 }
341
342 pub async fn prune_objects_for_eligible_epochs(
344 perpetual_db: &Arc<AuthorityPerpetualTables>,
345 checkpoint_store: &Arc<CheckpointStore>,
346 rpc_index: Option<&RpcIndexStore>,
347 config: AuthorityStorePruningConfig,
348 metrics: Arc<AuthorityStorePruningMetrics>,
349 epoch_duration_ms: u64,
350 ) -> anyhow::Result<()> {
351 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
352 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
353 .get_highest_executed_checkpoint()?
354 .map(|c| (*c.sequence_number(), c.epoch))
355 .unwrap_or_default();
356 let pruned_checkpoint_number = perpetual_db
357 .get_highest_pruned_checkpoint()?
358 .unwrap_or_default();
359 if config.smooth && config.num_epochs_to_retain > 0 {
360 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
361 checkpoint_store,
362 max_eligible_checkpoint_number,
363 pruned_checkpoint_number,
364 epoch_id,
365 epoch_duration_ms,
366 config.num_epochs_to_retain,
367 )?;
368 }
369 Self::prune_for_eligible_epochs(
370 perpetual_db,
371 checkpoint_store,
372 rpc_index,
373 PruningMode::Objects,
374 config.num_epochs_to_retain,
375 pruned_checkpoint_number,
376 max_eligible_checkpoint_number,
377 config,
378 metrics.clone(),
379 )
380 .await
381 }
382
383 pub async fn prune_checkpoints_for_eligible_epochs(
384 perpetual_db: &Arc<AuthorityPerpetualTables>,
385 checkpoint_store: &Arc<CheckpointStore>,
386 rpc_index: Option<&RpcIndexStore>,
387 config: AuthorityStorePruningConfig,
388 metrics: Arc<AuthorityStorePruningMetrics>,
389 epoch_duration_ms: u64,
390 pruner_watermarks: &Arc<PrunerWatermarks>,
391 ) -> anyhow::Result<()> {
392 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
393 let pruned_checkpoint_number = checkpoint_store
394 .get_highest_pruned_checkpoint_seq_number()?
395 .unwrap_or(0);
396 let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
397 .get_highest_executed_checkpoint()?
398 .map(|c| (*c.sequence_number(), c.epoch))
399 .unwrap_or_default();
400 if config.num_epochs_to_retain != u64::MAX {
401 max_eligible_checkpoint = min(
402 max_eligible_checkpoint,
403 perpetual_db
404 .get_highest_pruned_checkpoint()?
405 .unwrap_or_default(),
406 );
407 }
408 if config.smooth
409 && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
410 {
411 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
412 checkpoint_store,
413 max_eligible_checkpoint,
414 pruned_checkpoint_number,
415 epoch_id,
416 epoch_duration_ms,
417 num_epochs_to_retain,
418 )?;
419 }
420 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
421 Self::prune_for_eligible_epochs(
422 perpetual_db,
423 checkpoint_store,
424 rpc_index,
425 PruningMode::Checkpoints,
426 config
427 .num_epochs_to_retain_for_checkpoints()
428 .ok_or_else(|| anyhow!("config value not set"))?,
429 pruned_checkpoint_number,
430 max_eligible_checkpoint,
431 config.clone(),
432 metrics.clone(),
433 )
434 .await?;
435
436 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
437 Self::update_pruning_watermarks(
438 perpetual_db,
439 checkpoint_store,
440 num_epochs_to_retain,
441 pruner_watermarks,
442 false,
443 )?;
444 }
445 Ok(())
446 }
447
448 pub async fn prune_for_eligible_epochs(
450 perpetual_db: &Arc<AuthorityPerpetualTables>,
451 checkpoint_store: &Arc<CheckpointStore>,
452 rpc_index: Option<&RpcIndexStore>,
453 mode: PruningMode,
454 num_epochs_to_retain: u64,
455 starting_checkpoint_number: CheckpointSequenceNumber,
456 max_eligible_checkpoint: CheckpointSequenceNumber,
457 config: AuthorityStorePruningConfig,
458 metrics: Arc<AuthorityStorePruningMetrics>,
459 ) -> anyhow::Result<()> {
460 let _scope = monitored_scope("PruneForEligibleEpochs");
461
462 let mut checkpoint_number = starting_checkpoint_number;
463 let current_epoch = checkpoint_store
464 .get_highest_executed_checkpoint()?
465 .map(|c| c.epoch())
466 .unwrap_or_default();
467
468 let mut checkpoints_to_prune = vec![];
469 let mut checkpoint_content_to_prune = vec![];
470 let mut effects_to_prune = vec![];
471 let mut pruned_tx_seq_exclusive = 0u64;
476
477 loop {
478 let Some(ckpt) = checkpoint_store
479 .tables
480 .certified_checkpoints
481 .get(&(checkpoint_number + 1))?
482 else {
483 break;
484 };
485 let checkpoint = ckpt.into_inner();
486 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
491 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
492 {
493 break;
494 }
495 checkpoint_number = *checkpoint.sequence_number();
496 pruned_tx_seq_exclusive = checkpoint.network_total_transactions;
497
498 let content = checkpoint_store
499 .get_checkpoint_contents(&checkpoint.content_digest)?
500 .ok_or_else(|| {
501 anyhow::anyhow!(
502 "checkpoint content data is missing: {}",
503 checkpoint.sequence_number
504 )
505 })?;
506 let effects = perpetual_db
507 .effects
508 .multi_get(content.iter().map(|tx| tx.effects))?;
509
510 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
511 checkpoints_to_prune.push(*checkpoint.digest());
512 checkpoint_content_to_prune.push(content);
513 effects_to_prune.extend(effects.into_iter().flatten());
514
515 if effects_to_prune.len() >= config.max_transactions_in_batch
516 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
517 {
518 match mode {
519 PruningMode::Objects => {
520 Self::prune_objects_and_indexes(
521 effects_to_prune,
522 perpetual_db,
523 checkpoint_number,
524 metrics.clone(),
525 pruned_tx_seq_exclusive,
526 rpc_index,
527 !config.killswitch_tombstone_pruning,
528 )
529 .await?
530 }
531 PruningMode::Checkpoints => Self::prune_checkpoints(
532 perpetual_db,
533 checkpoint_store,
534 checkpoint_number,
535 checkpoints_to_prune,
536 checkpoint_content_to_prune,
537 &effects_to_prune,
538 metrics.clone(),
539 )?,
540 };
541 checkpoints_to_prune = vec![];
542 checkpoint_content_to_prune = vec![];
543 effects_to_prune = vec![];
544 tokio::task::yield_now().await;
546 }
547 }
548
549 if !checkpoints_to_prune.is_empty() {
550 match mode {
551 PruningMode::Objects => {
552 Self::prune_objects_and_indexes(
553 effects_to_prune,
554 perpetual_db,
555 checkpoint_number,
556 metrics.clone(),
557 pruned_tx_seq_exclusive,
558 rpc_index,
559 !config.killswitch_tombstone_pruning,
560 )
561 .await?
562 }
563 PruningMode::Checkpoints => Self::prune_checkpoints(
564 perpetual_db,
565 checkpoint_store,
566 checkpoint_number,
567 checkpoints_to_prune,
568 checkpoint_content_to_prune,
569 &effects_to_prune,
570 metrics.clone(),
571 )?,
572 };
573 }
574 Ok(())
575 }
576
577 fn prune_indexes(
578 indexes: Option<&IndexStore>,
579 config: &AuthorityStorePruningConfig,
580 epoch_duration_ms: u64,
581 metrics: &AuthorityStorePruningMetrics,
582 ) -> anyhow::Result<()> {
583 if let (Some(mut epochs_to_retain), Some(indexes)) =
584 (config.num_epochs_to_retain_for_indexes, indexes)
585 {
586 if epochs_to_retain < 7 {
587 warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
588 epochs_to_retain = 7;
589 }
590 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
591 if let Some(cut_time_ms) =
592 u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
593 {
594 let transaction_id = indexes.prune(cut_time_ms)?;
595 metrics
596 .last_pruned_indexes_transaction
597 .set(transaction_id as i64);
598 }
599 }
600 Ok(())
601 }
602
603 async fn prune_executed_tx_digests(
604 perpetual_db: &Arc<AuthorityPerpetualTables>,
605 checkpoint_store: &Arc<CheckpointStore>,
606 ) -> anyhow::Result<()> {
607 let current_epoch = checkpoint_store
608 .get_highest_executed_checkpoint()?
609 .map(|c| c.epoch)
610 .unwrap_or_default();
611
612 if current_epoch < 2 {
613 return Ok(());
614 }
615
616 let target_epoch = current_epoch - 1;
617
618 let start_key = (0u64, TransactionDigest::ZERO);
619 let end_key = (target_epoch, TransactionDigest::ZERO);
620
621 info!(
622 "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
623 target_epoch, current_epoch
624 );
625
626 let mut batch = perpetual_db.executed_transaction_digests.batch();
627 batch.schedule_delete_range(
628 &perpetual_db.executed_transaction_digests,
629 &start_key,
630 &end_key,
632 )?;
633 batch.write()?;
634 Ok(())
635 }
636
637 #[cfg(tidehunter)]
638 fn prune_executed_tx_digests_th(
639 perpetual_db: &Arc<AuthorityPerpetualTables>,
640 checkpoint_store: &Arc<CheckpointStore>,
641 ) -> anyhow::Result<()> {
642 let current_epoch = checkpoint_store
643 .get_highest_executed_checkpoint()?
644 .map(|c| c.epoch)
645 .unwrap_or_default();
646
647 if current_epoch < 2 {
648 return Ok(());
649 }
650
651 let last_epoch_to_delete = current_epoch - 2;
652 let from_key = (0u64, TransactionDigest::ZERO);
653 let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
654 info!(
655 "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
656 last_epoch_to_delete, current_epoch
657 );
658 perpetual_db
659 .executed_transaction_digests
660 .drop_cells_in_range(&from_key, &to_key)?;
661 Ok(())
662 }
663
664 fn update_pruning_watermarks(
665 perpetual_db: &Arc<AuthorityPerpetualTables>,
666 checkpoint_store: &Arc<CheckpointStore>,
667 num_epochs_to_retain: u64,
668 pruning_watermark: &Arc<PrunerWatermarks>,
669 objects_compactor_active: bool,
670 ) -> anyhow::Result<bool> {
671 use std::sync::atomic::Ordering;
672 let objects_pruning_checkpoint_id = perpetual_db
673 .get_highest_pruned_checkpoint()?
674 .unwrap_or_default();
675 let objects_pruning_epoch_id = checkpoint_store
676 .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
677 .map(|chk| chk.epoch)
678 .unwrap_or_default();
679
680 let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
681 let current_epoch_id = checkpoint_store
682 .get_highest_executed_checkpoint()?
683 .map(|c| c.epoch)
684 .unwrap_or_default();
685 if current_epoch_id < num_epochs_to_retain {
686 return Ok(false);
687 }
688 let target_epoch_id = current_epoch_id - num_epochs_to_retain;
689 let checkpoint_id =
690 checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
691
692 let new_watermark = if objects_compactor_active {
695 target_epoch_id + 1
696 } else {
697 min(target_epoch_id + 1, objects_pruning_epoch_id)
698 };
699 if current_watermark == new_watermark {
700 return Ok(false);
701 }
702 info!("relocation: setting epoch watermark to {}", new_watermark);
703 pruning_watermark
704 .epoch_id
705 .store(new_watermark, Ordering::Relaxed);
706 if let Some(checkpoint_id) = checkpoint_id {
707 let watermark = if objects_compactor_active {
708 checkpoint_id
709 } else {
710 min(checkpoint_id, objects_pruning_checkpoint_id)
711 };
712 info!("relocation: setting checkpoint watermark to {}", watermark);
713 pruning_watermark
714 .checkpoint_id
715 .store(watermark, Ordering::Relaxed);
716 }
717 Ok(true)
718 }
719
720 #[cfg(tidehunter)]
721 fn prune_th(
722 perpetual_db: &Arc<AuthorityPerpetualTables>,
723 checkpoint_store: &Arc<CheckpointStore>,
724 num_epochs_to_retain: u64,
725 pruning_watermark: Arc<PrunerWatermarks>,
726 objects_compactor_active: bool,
727 ) -> anyhow::Result<()> {
728 let watermark_updated = Self::update_pruning_watermarks(
729 perpetual_db,
730 checkpoint_store,
731 num_epochs_to_retain,
732 &pruning_watermark,
733 objects_compactor_active,
734 )?;
735 if !watermark_updated {
736 info!("skip relocation. Watermark hasn't changed");
737 return Ok(());
738 }
739 perpetual_db.objects.db.start_relocation()?;
740 checkpoint_store.tables.watermarks.db.start_relocation()?;
741 Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
742 Ok(())
743 }
744
745 fn compact_next_sst_file(
746 perpetual_db: Arc<AuthorityPerpetualTables>,
747 delay_days: usize,
748 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
749 ) -> anyhow::Result<Option<LiveFile>> {
750 let db_path = perpetual_db.objects.db.path_for_pruning();
751 let mut state = last_processed
752 .lock()
753 .expect("failed to obtain a lock for last processed SST files");
754 let mut sst_file_for_compaction: Option<LiveFile> = None;
755 let time_threshold =
756 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
757 for sst_file in perpetual_db.objects.db.live_files()? {
758 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
759 let last_modified = std::fs::metadata(file_path)?.modified()?;
760 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
761 || sst_file.level < 1
762 || sst_file.start_key.is_none()
763 || sst_file.end_key.is_none()
764 || last_modified > time_threshold
765 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
766 {
767 continue;
768 }
769 if let Some(candidate) = &sst_file_for_compaction
770 && candidate.size > sst_file.size
771 {
772 continue;
773 }
774 sst_file_for_compaction = Some(sst_file);
775 }
776 let Some(sst_file) = sst_file_for_compaction else {
777 return Ok(None);
778 };
779 info!(
780 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
781 sst_file.name, sst_file.size, sst_file.level
782 );
783 perpetual_db.objects.compact_range_raw(
784 &sst_file.column_family_name,
785 sst_file.start_key.clone().unwrap(),
786 sst_file.end_key.clone().unwrap(),
787 )?;
788 state.insert(sst_file.name.clone(), SystemTime::now());
789 Ok(Some(sst_file))
790 }
791
792 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
793 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
794 }
795
796 fn smoothed_max_eligible_checkpoint_number(
797 checkpoint_store: &Arc<CheckpointStore>,
798 mut max_eligible_checkpoint: CheckpointSequenceNumber,
799 pruned_checkpoint: CheckpointSequenceNumber,
800 epoch_id: EpochId,
801 epoch_duration_ms: u64,
802 num_epochs_to_retain: u64,
803 ) -> anyhow::Result<CheckpointSequenceNumber> {
804 if epoch_id < num_epochs_to_retain {
805 return Ok(0);
806 }
807 let last_checkpoint_in_epoch = checkpoint_store
808 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
809 .map(|checkpoint| checkpoint.sequence_number)
810 .unwrap_or_default();
811 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
812 if max_eligible_checkpoint == 0 {
813 return Ok(max_eligible_checkpoint);
814 }
815 let num_intervals = epoch_duration_ms
816 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
817 .unwrap_or(1);
818 let delta = max_eligible_checkpoint
819 .checked_sub(pruned_checkpoint)
820 .unwrap_or_default()
821 .checked_div(num_intervals)
822 .unwrap_or(1);
823 Ok(pruned_checkpoint + delta)
824 }
825
826 fn setup_pruning(
827 config: AuthorityStorePruningConfig,
828 epoch_duration_ms: u64,
829 perpetual_db: Arc<AuthorityPerpetualTables>,
830 checkpoint_store: Arc<CheckpointStore>,
831 rpc_index: Option<Arc<RpcIndexStore>>,
832 jsonrpc_index: Option<Arc<IndexStore>>,
833 metrics: Arc<AuthorityStorePruningMetrics>,
834 pruner_watermarks: Arc<PrunerWatermarks>,
835 ) -> Sender<()> {
836 let (sender, mut recv) = tokio::sync::oneshot::channel();
837 debug!(
838 "Starting object pruning service with num_epochs_to_retain={}",
839 config.num_epochs_to_retain
840 );
841
842 let tick_duration =
843 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
844 let pruning_initial_delay = if cfg!(msim) {
845 Duration::from_millis(1)
846 } else {
847 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
848 };
849 let mut objects_prune_interval =
850 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
851 let mut checkpoints_prune_interval =
852 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
853
854 metrics
855 .num_epochs_to_retain_for_objects
856 .set(config.num_epochs_to_retain as i64);
857 metrics.num_epochs_to_retain_for_checkpoints.set(
858 config
859 .num_epochs_to_retain_for_checkpoints
860 .unwrap_or_default() as i64,
861 );
862
863 #[cfg(tidehunter)]
864 {
865 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
866 let prune_objects = config.num_epochs_to_retain != u64::MAX;
867 tokio::task::spawn(async move {
868 loop {
869 tokio::select! {
870 _ = objects_prune_interval.tick(), if prune_objects => {
871 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
872 error!("Failed to prune objects: {:?}", err);
873 }
874 },
875 _ = checkpoints_prune_interval.tick() => {
876 if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone(), !prune_objects) {
877 error!("Failed to prune checkpoints: {:?}", err);
878 }
879 },
880 _ = &mut recv => break,
881 }
882 }
883 });
884 }
885 }
886 #[cfg(not(tidehunter))]
887 {
888 let mut indexes_prune_interval =
889 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
890
891 let perpetual_db_for_compaction = perpetual_db.clone();
892 if let Some(delay_days) = config.periodic_compaction_threshold_days {
893 spawn_monitored_task!(async move {
894 let last_processed = Arc::new(Mutex::new(HashMap::new()));
895 loop {
896 let db = perpetual_db_for_compaction.clone();
897 let state = Arc::clone(&last_processed);
898 let result = tokio::task::spawn_blocking(move || {
899 Self::compact_next_sst_file(db, delay_days, state)
900 })
901 .await;
902 let mut sleep_interval_secs = 1;
903 match result {
904 Err(err) => error!("Failed to compact sst file: {:?}", err),
905 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
906 Ok(Ok(None)) => {
907 sleep_interval_secs = 3600;
908 }
909 _ => {}
910 }
911 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
912 }
913 });
914 }
915 tokio::task::spawn(async move {
916 loop {
917 tokio::select! {
918 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
919 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
920 error!("Failed to prune objects: {:?}", err);
921 }
922 if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
923 error!("Failed to prune executed_tx_digests: {:?}", err);
924 }
925 },
926 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
927 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
928 error!("Failed to prune checkpoints: {:?}", err);
929 }
930 },
931 _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
932 if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
933 error!("Failed to prune indexes: {:?}", err);
934 }
935 }
936 _ = &mut recv => break,
937 }
938 }
939 });
940 }
941 sender
942 }
943
944 pub fn new(
945 perpetual_db: Arc<AuthorityPerpetualTables>,
946 checkpoint_store: Arc<CheckpointStore>,
947 rpc_index: Option<Arc<RpcIndexStore>>,
948 jsonrpc_index: Option<Arc<IndexStore>>,
949 mut pruning_config: AuthorityStorePruningConfig,
950 is_validator: bool,
951 epoch_duration_ms: u64,
952 registry: &Registry,
953 pruner_watermarks: Arc<PrunerWatermarks>, ) -> Self {
955 #[cfg(tidehunter)]
963 {
964 let objects_compactor_enabled =
965 is_validator || pruning_config.num_epochs_to_retain == 0;
966 if objects_compactor_enabled && pruning_config.num_epochs_to_retain != u64::MAX {
967 info!(
968 "Tidehunter: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
969 pruning_config.num_epochs_to_retain
970 );
971 pruning_config.num_epochs_to_retain = u64::MAX;
972 }
973 }
974
975 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
976 {
977 warn!(
978 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
979 pruning_config.num_epochs_to_retain
980 );
981 if is_validator {
982 warn!("Resetting to aggressive pruner.");
983 pruning_config.num_epochs_to_retain = 0;
984 } else {
985 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
986 }
987 }
988 AuthorityStorePruner {
989 _objects_pruner_cancel_handle: Self::setup_pruning(
990 pruning_config,
991 epoch_duration_ms,
992 perpetual_db,
993 checkpoint_store,
994 rpc_index,
995 jsonrpc_index,
996 AuthorityStorePruningMetrics::new(registry),
997 pruner_watermarks,
998 ),
999 }
1000 }
1001
1002 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
1003 perpetual_db.objects.compact_range(
1004 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
1005 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
1006 )
1007 }
1008}
1009
1010#[cfg(tidehunter)]
1011pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
1012 config: typed_store::tidehunter_util::KeySpaceConfig,
1013 pruner_watermark: Arc<AtomicU64>,
1014 extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
1015 by_key: bool,
1016) -> typed_store::tidehunter_util::KeySpaceConfig {
1017 use bincode::Options;
1018 use std::sync::atomic::Ordering;
1019 use typed_store::tidehunter_util::Decision;
1020 config.with_relocation_filter(move |key, value| {
1021 let data = if by_key {
1022 bincode::DefaultOptions::new()
1023 .with_big_endian()
1024 .with_fixint_encoding()
1025 .deserialize(&key)
1026 .expect("relocation filter deserialization error")
1027 } else {
1028 bcs::from_bytes(&value).expect("relocation filter deserialization error")
1029 };
1030 if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1031 Decision::Remove
1032 } else {
1033 Decision::StopRelocation
1034 }
1035 })
1036}
1037
1038#[cfg(test)]
1039mod tests {
1040 use more_asserts as ma;
1041 use std::path::Path;
1042 use std::time::Duration;
1043 use std::{collections::HashSet, sync::Arc};
1044 use tracing::log::info;
1045
1046 use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1047 use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1048 use crate::authority::authority_store_types::{
1049 StoreObject, StoreObjectWrapper, get_store_object,
1050 };
1051 use prometheus::Registry;
1052 use sui_types::base_types::ObjectDigest;
1053 use sui_types::effects::TransactionEffects;
1054 use sui_types::effects::TransactionEffectsAPI;
1055 use sui_types::{
1056 base_types::{ObjectID, SequenceNumber},
1057 object::Object,
1058 storage::ObjectKey,
1059 };
1060 use typed_store::Map;
1061 use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1062
1063 use super::AuthorityStorePruner;
1064
1065 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1066 let perpetual_db_path = path.join(Path::new("perpetual"));
1067 let cf_names = AuthorityPerpetualTables::describe_tables();
1068 let cfs: Vec<_> = cf_names
1069 .keys()
1070 .map(|x| (x.as_str(), default_db_options().options))
1071 .collect();
1072 let perpetual_db = typed_store::rocks::open_cf_opts(
1073 perpetual_db_path,
1074 None,
1075 MetricConf::new("perpetual_pruning"),
1076 &cfs,
1077 );
1078
1079 let mut after_pruning = HashSet::new();
1080 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1081 &perpetual_db?,
1082 Some("objects"),
1083 &ReadWriteOptions::default(),
1086 false,
1087 )?;
1088 let iter = objects.safe_iter();
1089 for item in iter {
1090 after_pruning.insert(item?.0);
1091 }
1092 Ok(after_pruning)
1093 }
1094
1095 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1096
1097 fn generate_test_data(
1098 db: Arc<AuthorityPerpetualTables>,
1099 num_versions_per_object: u64,
1100 num_object_versions_to_retain: u64,
1101 total_unique_object_ids: u32,
1102 ) -> Result<GenerateTestDataResult, anyhow::Error> {
1103 assert!(num_versions_per_object >= num_object_versions_to_retain);
1104
1105 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1106 let mut batch = db.objects.batch();
1107
1108 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1109 for id in ids {
1110 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1111 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1112 if counter < num_object_versions_to_retain.try_into().unwrap() {
1113 to_keep.push(object_key);
1115 } else {
1116 to_delete.push(object_key);
1117 }
1118 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1119 batch.insert_batch(
1120 &db.objects,
1121 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1122 )?;
1123 }
1124
1125 if num_object_versions_to_retain == 0 {
1127 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1128 println!("Adding tombstone object {:?}", tombstone_key);
1129 batch.insert_batch(
1130 &db.objects,
1131 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1132 )?;
1133 tombstones.push(tombstone_key);
1134 }
1135 }
1136 batch.write().unwrap();
1137 assert_eq!(
1138 to_keep.len() as u64,
1139 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1140 * total_unique_object_ids as u64
1141 );
1142 assert_eq!(
1143 tombstones.len() as u64,
1144 if num_object_versions_to_retain == 0 {
1145 total_unique_object_ids as u64
1146 } else {
1147 0
1148 }
1149 );
1150 Ok((to_keep, to_delete, tombstones))
1151 }
1152
1153 async fn run_pruner(
1154 path: &Path,
1155 num_versions_per_object: u64,
1156 num_object_versions_to_retain: u64,
1157 total_unique_object_ids: u32,
1158 ) -> Vec<ObjectKey> {
1159 let registry = Registry::default();
1160 let metrics = AuthorityStorePruningMetrics::new(®istry);
1161 let to_keep = {
1162 let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1163 let (to_keep, to_delete, tombstones) = generate_test_data(
1164 db.clone(),
1165 num_versions_per_object,
1166 num_object_versions_to_retain,
1167 total_unique_object_ids,
1168 )
1169 .unwrap();
1170 let mut effects = TransactionEffects::default();
1171 for object in to_delete {
1172 effects.unsafe_add_deleted_live_object_for_testing((
1173 object.0,
1174 object.1,
1175 ObjectDigest::MIN,
1176 ));
1177 }
1178 for object in tombstones {
1179 effects.unsafe_add_object_tombstone_for_testing((
1180 object.0,
1181 object.1,
1182 ObjectDigest::MIN,
1183 ));
1184 }
1185 AuthorityStorePruner::prune_objects_and_indexes(
1186 vec![effects],
1187 &db,
1188 0,
1189 metrics,
1190 0,
1191 None,
1192 true,
1193 )
1194 .await
1195 .unwrap();
1196 to_keep
1197 };
1198 tokio::time::sleep(Duration::from_secs(3)).await;
1199 to_keep
1200 }
1201
1202 #[cfg(not(tidehunter))]
1204 #[tokio::test]
1205 async fn test_pruning_objects() {
1206 let path = tempfile::tempdir().unwrap().keep();
1207 let to_keep = run_pruner(&path, 3, 2, 1000).await;
1208 assert_eq!(
1209 HashSet::from_iter(to_keep),
1210 get_keys_after_pruning(&path).unwrap()
1211 );
1212 run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1213 }
1214
1215 #[cfg(not(tidehunter))]
1217 #[tokio::test]
1218 async fn test_pruning_tombstones() {
1219 let path = tempfile::tempdir().unwrap().keep();
1220 let to_keep = run_pruner(&path, 0, 0, 1000).await;
1221 assert_eq!(to_keep.len(), 0);
1222 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1223
1224 let path = tempfile::tempdir().unwrap().keep();
1225 let to_keep = run_pruner(&path, 3, 0, 1000).await;
1226 assert_eq!(to_keep.len(), 0);
1227 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1228 }
1229
1230 #[cfg(not(target_env = "msvc"))]
1231 #[tokio::test]
1232 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1233 let primary_path = tempfile::tempdir()?.keep();
1234 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1235 let total_unique_object_ids = 10_000;
1236 let num_versions_per_object = 10;
1237 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1238 let mut to_delete = vec![];
1239 for id in ids {
1240 for i in (0..num_versions_per_object).rev() {
1241 if i < num_versions_per_object - 2 {
1242 to_delete.push((id, SequenceNumber::from(i)));
1243 }
1244 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1245 perpetual_db
1246 .objects
1247 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1248 }
1249 }
1250
1251 fn get_sst_size(path: &Path) -> u64 {
1252 let mut size = 0;
1253 for entry in std::fs::read_dir(path).unwrap() {
1254 let entry = entry.unwrap();
1255 let path = entry.path();
1256 if let Some(ext) = path.extension() {
1257 if ext != "sst" {
1258 continue;
1259 }
1260 size += std::fs::metadata(path).unwrap().len();
1261 }
1262 }
1263 size
1264 }
1265
1266 let db_path = primary_path.clone().join("perpetual");
1267 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1268 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1269
1270 perpetual_db.objects.compact_range(&start, &end)?;
1271 let before_compaction_size = get_sst_size(&db_path);
1272
1273 let mut effects = TransactionEffects::default();
1274 for object in to_delete {
1275 effects.unsafe_add_deleted_live_object_for_testing((
1276 object.0,
1277 object.1,
1278 ObjectDigest::MIN,
1279 ));
1280 }
1281 let registry = Registry::default();
1282 let metrics = AuthorityStorePruningMetrics::new(®istry);
1283 let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1284 vec![effects],
1285 &perpetual_db,
1286 0,
1287 metrics,
1288 0,
1289 None,
1290 true,
1291 )
1292 .await;
1293 info!("Total pruned keys = {:?}", total_pruned);
1294
1295 perpetual_db.objects.compact_range(&start, &end)?;
1296 let after_compaction_size = get_sst_size(&db_path);
1297
1298 info!(
1299 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1300 before_compaction_size, after_compaction_size
1301 );
1302 ma::assert_le!(after_compaction_size, before_compaction_size);
1303 Ok(())
1304 }
1305}