1use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13 register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29 CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32 base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33 storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42 [
43 "objects",
44 "effects",
45 "transactions",
46 "events",
47 "executed_effects",
48 "executed_transactions_to_checkpoint",
49 ]
50 .into_iter()
51 .map(|cf| cf.to_string())
52 .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56 _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61 pub epoch_id: Arc<AtomicU64>,
62 pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68 pub last_pruned_checkpoint: IntGauge,
69 pub num_pruned_objects: IntCounter,
70 pub num_pruned_tombstones: IntCounter,
71 pub last_pruned_effects_checkpoint: IntGauge,
72 pub last_pruned_indexes_transaction: IntGauge,
73 pub num_epochs_to_retain_for_objects: IntGauge,
74 pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78 pub fn new(registry: &Registry) -> Arc<Self> {
79 let this = Self {
80 last_pruned_checkpoint: register_int_gauge_with_registry!(
81 "last_pruned_checkpoint",
82 "Last pruned checkpoint",
83 registry
84 )
85 .unwrap(),
86 num_pruned_objects: register_int_counter_with_registry!(
87 "num_pruned_objects",
88 "Number of pruned objects",
89 registry
90 )
91 .unwrap(),
92 num_pruned_tombstones: register_int_counter_with_registry!(
93 "num_pruned_tombstones",
94 "Number of pruned tombstones",
95 registry
96 )
97 .unwrap(),
98 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99 "last_pruned_effects_checkpoint",
100 "Last pruned effects checkpoint",
101 registry
102 )
103 .unwrap(),
104 last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105 "last_pruned_indexes_transaction",
106 "Last pruned indexes transaction",
107 registry
108 )
109 .unwrap(),
110 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111 "num_epochs_to_retain_for_objects",
112 "Number of epochs to retain for objects",
113 registry
114 )
115 .unwrap(),
116 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117 "num_epochs_to_retain_for_checkpoints",
118 "Number of epochs to retain for checkpoints",
119 registry
120 )
121 .unwrap(),
122 };
123 Arc::new(this)
124 }
125
126 pub fn new_for_test() -> Arc<Self> {
127 Self::new(&Registry::new())
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133 Objects,
134 Checkpoints,
135}
136
137impl AuthorityStorePruner {
138 #[cfg(not(tidehunter))]
140 async fn prune_objects_and_indexes(
141 transaction_effects: Vec<TransactionEffects>,
142 perpetual_db: &Arc<AuthorityPerpetualTables>,
143 checkpoint_number: CheckpointSequenceNumber,
144 metrics: Arc<AuthorityStorePruningMetrics>,
145 checkpoint_content_to_prune: Vec<CheckpointContents>,
146 rpc_index: Option<&RpcIndexStore>,
147 enable_pruning_tombstones: bool,
148 ) -> anyhow::Result<()> {
149 let _scope = monitored_scope("ObjectsLivePruner");
150 let mut wb = perpetual_db.objects.batch();
151
152 let mut live_object_keys_to_prune = vec![];
154 let mut object_tombstones_to_prune = vec![];
155 for effects in &transaction_effects {
156 for (object_id, seq_number) in effects.modified_at_versions() {
157 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158 }
159
160 if enable_pruning_tombstones {
161 for deleted_object_key in effects.all_tombstones() {
162 object_tombstones_to_prune
163 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164 }
165 }
166 }
167
168 metrics
169 .num_pruned_objects
170 .inc_by(live_object_keys_to_prune.len() as u64);
171 metrics
172 .num_pruned_tombstones
173 .inc_by(object_tombstones_to_prune.len() as u64);
174
175 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177 updates
178 .entry(object_id)
179 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180 .or_insert((seq_number, seq_number));
181 }
182
183 for (object_id, (min_version, max_version)) in updates {
184 debug!(
185 "Pruning object {:?} versions {:?} - {:?}",
186 object_id, min_version, max_version
187 );
188 let start_range = ObjectKey(object_id, min_version);
189 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191 }
192
193 if !object_tombstones_to_prune.is_empty() {
199 let mut object_keys_to_delete = vec![];
200 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201 for result in perpetual_db.objects.safe_iter_with_bounds(
202 Some(ObjectKey(object_id, VersionNumber::MIN)),
203 Some(ObjectKey(object_id, seq_number.next())),
204 ) {
205 let (object_key, _) = result?;
206 assert_eq!(object_key.0, object_id);
207 object_keys_to_delete.push(object_key);
208 }
209 }
210
211 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212 }
213
214 if let Some(rpc_index) = rpc_index {
215 rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
216 }
217
218 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
219 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
220
221 wb.write()?;
222 Ok(())
223 }
224
225 #[cfg(tidehunter)]
226 async fn prune_objects_and_indexes(
227 transaction_effects: Vec<TransactionEffects>,
228 perpetual_db: &Arc<AuthorityPerpetualTables>,
229 checkpoint_number: CheckpointSequenceNumber,
230 metrics: Arc<AuthorityStorePruningMetrics>,
231 checkpoint_content_to_prune: Vec<CheckpointContents>,
232 rpc_index: Option<&RpcIndexStore>,
233 _: bool,
234 ) -> anyhow::Result<()> {
235 let _scope = monitored_scope("ObjectsLivePruner");
236 let mut wb = perpetual_db.objects.batch();
237 let mut objects_to_prune = vec![];
238
239 for effects in &transaction_effects {
240 for (object_id, version) in effects
241 .modified_at_versions()
242 .into_iter()
243 .chain(effects.all_tombstones())
244 {
245 debug!("Pruning object {:?} version {:?}", object_id, version);
246 objects_to_prune.push(ObjectKey(object_id, version));
247 }
248 }
249 metrics
250 .num_pruned_objects
251 .inc_by(objects_to_prune.len() as u64);
252 wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
253
254 if let Some(rpc_index) = rpc_index {
255 rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
256 }
257
258 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
259 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
260 wb.write()?;
261 Ok(())
262 }
263
264 fn prune_checkpoints(
265 perpetual_db: &Arc<AuthorityPerpetualTables>,
266 checkpoint_db: &Arc<CheckpointStore>,
267 checkpoint_number: CheckpointSequenceNumber,
268 checkpoints_to_prune: Vec<CheckpointDigest>,
269 checkpoint_content_to_prune: Vec<CheckpointContents>,
270 effects_to_prune: &Vec<TransactionEffects>,
271 metrics: Arc<AuthorityStorePruningMetrics>,
272 ) -> anyhow::Result<()> {
273 let _scope = monitored_scope("EffectsLivePruner");
274
275 let mut perpetual_batch = perpetual_db.objects.batch();
276 let transactions: Vec<_> = checkpoint_content_to_prune
277 .iter()
278 .flat_map(|content| content.iter().map(|tx| tx.transaction))
279 .collect();
280
281 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
282 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
283 perpetual_batch.delete_batch(
284 &perpetual_db.executed_transactions_to_checkpoint,
285 transactions.iter(),
286 )?;
287
288 let mut effect_digests = vec![];
289 for effects in effects_to_prune {
290 let effects_digest = effects.digest();
291 debug!("Pruning effects {:?}", effects_digest);
292 effect_digests.push(effects_digest);
293
294 if effects.events_digest().is_some() {
295 perpetual_batch
296 .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
297 }
298 }
299 perpetual_batch.delete_batch(
300 &perpetual_db.unchanged_loaded_runtime_objects,
301 transactions.iter(),
302 )?;
303 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
304
305 let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
306
307 let checkpoint_content_digests =
308 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
309 checkpoints_batch.delete_batch(
310 &checkpoint_db.tables.checkpoint_content,
311 checkpoint_content_digests.clone(),
312 )?;
313 checkpoints_batch.delete_batch(
314 &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
315 checkpoint_content_digests,
316 )?;
317
318 checkpoints_batch.delete_batch(
319 &checkpoint_db.tables.checkpoint_by_digest,
320 checkpoints_to_prune,
321 )?;
322
323 checkpoints_batch.insert_batch(
324 &checkpoint_db.tables.watermarks,
325 [(
326 &CheckpointWatermark::HighestPruned,
327 &(checkpoint_number, CheckpointDigest::random()),
328 )],
329 )?;
330
331 perpetual_batch.write()?;
332 checkpoints_batch.write()?;
333 metrics
334 .last_pruned_effects_checkpoint
335 .set(checkpoint_number as i64);
336
337 Ok(())
338 }
339
340 pub async fn prune_objects_for_eligible_epochs(
342 perpetual_db: &Arc<AuthorityPerpetualTables>,
343 checkpoint_store: &Arc<CheckpointStore>,
344 rpc_index: Option<&RpcIndexStore>,
345 config: AuthorityStorePruningConfig,
346 metrics: Arc<AuthorityStorePruningMetrics>,
347 epoch_duration_ms: u64,
348 ) -> anyhow::Result<()> {
349 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
350 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
351 .get_highest_executed_checkpoint()?
352 .map(|c| (*c.sequence_number(), c.epoch))
353 .unwrap_or_default();
354 let pruned_checkpoint_number = perpetual_db
355 .get_highest_pruned_checkpoint()?
356 .unwrap_or_default();
357 if config.smooth && config.num_epochs_to_retain > 0 {
358 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
359 checkpoint_store,
360 max_eligible_checkpoint_number,
361 pruned_checkpoint_number,
362 epoch_id,
363 epoch_duration_ms,
364 config.num_epochs_to_retain,
365 )?;
366 }
367 Self::prune_for_eligible_epochs(
368 perpetual_db,
369 checkpoint_store,
370 rpc_index,
371 PruningMode::Objects,
372 config.num_epochs_to_retain,
373 pruned_checkpoint_number,
374 max_eligible_checkpoint_number,
375 config,
376 metrics.clone(),
377 )
378 .await
379 }
380
381 pub async fn prune_checkpoints_for_eligible_epochs(
382 perpetual_db: &Arc<AuthorityPerpetualTables>,
383 checkpoint_store: &Arc<CheckpointStore>,
384 rpc_index: Option<&RpcIndexStore>,
385 config: AuthorityStorePruningConfig,
386 metrics: Arc<AuthorityStorePruningMetrics>,
387 epoch_duration_ms: u64,
388 pruner_watermarks: &Arc<PrunerWatermarks>,
389 ) -> anyhow::Result<()> {
390 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
391 let pruned_checkpoint_number = checkpoint_store
392 .get_highest_pruned_checkpoint_seq_number()?
393 .unwrap_or(0);
394 let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
395 .get_highest_executed_checkpoint()?
396 .map(|c| (*c.sequence_number(), c.epoch))
397 .unwrap_or_default();
398 if config.num_epochs_to_retain != u64::MAX {
399 max_eligible_checkpoint = min(
400 max_eligible_checkpoint,
401 perpetual_db
402 .get_highest_pruned_checkpoint()?
403 .unwrap_or_default(),
404 );
405 }
406 if config.smooth
407 && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
408 {
409 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
410 checkpoint_store,
411 max_eligible_checkpoint,
412 pruned_checkpoint_number,
413 epoch_id,
414 epoch_duration_ms,
415 num_epochs_to_retain,
416 )?;
417 }
418 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
419 Self::prune_for_eligible_epochs(
420 perpetual_db,
421 checkpoint_store,
422 rpc_index,
423 PruningMode::Checkpoints,
424 config
425 .num_epochs_to_retain_for_checkpoints()
426 .ok_or_else(|| anyhow!("config value not set"))?,
427 pruned_checkpoint_number,
428 max_eligible_checkpoint,
429 config.clone(),
430 metrics.clone(),
431 )
432 .await?;
433
434 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
435 Self::update_pruning_watermarks(
436 perpetual_db,
437 checkpoint_store,
438 num_epochs_to_retain,
439 pruner_watermarks,
440 )?;
441 }
442 Ok(())
443 }
444
445 pub async fn prune_for_eligible_epochs(
447 perpetual_db: &Arc<AuthorityPerpetualTables>,
448 checkpoint_store: &Arc<CheckpointStore>,
449 rpc_index: Option<&RpcIndexStore>,
450 mode: PruningMode,
451 num_epochs_to_retain: u64,
452 starting_checkpoint_number: CheckpointSequenceNumber,
453 max_eligible_checkpoint: CheckpointSequenceNumber,
454 config: AuthorityStorePruningConfig,
455 metrics: Arc<AuthorityStorePruningMetrics>,
456 ) -> anyhow::Result<()> {
457 let _scope = monitored_scope("PruneForEligibleEpochs");
458
459 let mut checkpoint_number = starting_checkpoint_number;
460 let current_epoch = checkpoint_store
461 .get_highest_executed_checkpoint()?
462 .map(|c| c.epoch())
463 .unwrap_or_default();
464
465 let mut checkpoints_to_prune = vec![];
466 let mut checkpoint_content_to_prune = vec![];
467 let mut effects_to_prune = vec![];
468
469 loop {
470 let Some(ckpt) = checkpoint_store
471 .tables
472 .certified_checkpoints
473 .get(&(checkpoint_number + 1))?
474 else {
475 break;
476 };
477 let checkpoint = ckpt.into_inner();
478 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484 {
485 break;
486 }
487 checkpoint_number = *checkpoint.sequence_number();
488
489 let content = checkpoint_store
490 .get_checkpoint_contents(&checkpoint.content_digest)?
491 .ok_or_else(|| {
492 anyhow::anyhow!(
493 "checkpoint content data is missing: {}",
494 checkpoint.sequence_number
495 )
496 })?;
497 let effects = perpetual_db
498 .effects
499 .multi_get(content.iter().map(|tx| tx.effects))?;
500
501 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502 checkpoints_to_prune.push(*checkpoint.digest());
503 checkpoint_content_to_prune.push(content);
504 effects_to_prune.extend(effects.into_iter().flatten());
505
506 if effects_to_prune.len() >= config.max_transactions_in_batch
507 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508 {
509 match mode {
510 PruningMode::Objects => {
511 Self::prune_objects_and_indexes(
512 effects_to_prune,
513 perpetual_db,
514 checkpoint_number,
515 metrics.clone(),
516 checkpoint_content_to_prune,
517 rpc_index,
518 !config.killswitch_tombstone_pruning,
519 )
520 .await?
521 }
522 PruningMode::Checkpoints => Self::prune_checkpoints(
523 perpetual_db,
524 checkpoint_store,
525 checkpoint_number,
526 checkpoints_to_prune,
527 checkpoint_content_to_prune,
528 &effects_to_prune,
529 metrics.clone(),
530 )?,
531 };
532 checkpoints_to_prune = vec![];
533 checkpoint_content_to_prune = vec![];
534 effects_to_prune = vec![];
535 tokio::task::yield_now().await;
537 }
538 }
539
540 if !checkpoints_to_prune.is_empty() {
541 match mode {
542 PruningMode::Objects => {
543 Self::prune_objects_and_indexes(
544 effects_to_prune,
545 perpetual_db,
546 checkpoint_number,
547 metrics.clone(),
548 checkpoint_content_to_prune,
549 rpc_index,
550 !config.killswitch_tombstone_pruning,
551 )
552 .await?
553 }
554 PruningMode::Checkpoints => Self::prune_checkpoints(
555 perpetual_db,
556 checkpoint_store,
557 checkpoint_number,
558 checkpoints_to_prune,
559 checkpoint_content_to_prune,
560 &effects_to_prune,
561 metrics.clone(),
562 )?,
563 };
564 }
565 Ok(())
566 }
567
568 fn prune_indexes(
569 indexes: Option<&IndexStore>,
570 config: &AuthorityStorePruningConfig,
571 epoch_duration_ms: u64,
572 metrics: &AuthorityStorePruningMetrics,
573 ) -> anyhow::Result<()> {
574 if let (Some(mut epochs_to_retain), Some(indexes)) =
575 (config.num_epochs_to_retain_for_indexes, indexes)
576 {
577 if epochs_to_retain < 7 {
578 warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
579 epochs_to_retain = 7;
580 }
581 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
582 if let Some(cut_time_ms) =
583 u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
584 {
585 let transaction_id = indexes.prune(cut_time_ms)?;
586 metrics
587 .last_pruned_indexes_transaction
588 .set(transaction_id as i64);
589 }
590 }
591 Ok(())
592 }
593
594 async fn prune_executed_tx_digests(
595 perpetual_db: &Arc<AuthorityPerpetualTables>,
596 checkpoint_store: &Arc<CheckpointStore>,
597 ) -> anyhow::Result<()> {
598 let current_epoch = checkpoint_store
599 .get_highest_executed_checkpoint()?
600 .map(|c| c.epoch)
601 .unwrap_or_default();
602
603 if current_epoch < 2 {
604 return Ok(());
605 }
606
607 let target_epoch = current_epoch - 1;
608
609 let start_key = (0u64, TransactionDigest::ZERO);
610 let end_key = (target_epoch, TransactionDigest::ZERO);
611
612 info!(
613 "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
614 target_epoch, current_epoch
615 );
616
617 let mut batch = perpetual_db.executed_transaction_digests.batch();
618 batch.schedule_delete_range(
619 &perpetual_db.executed_transaction_digests,
620 &start_key,
621 &end_key,
623 )?;
624 batch.write()?;
625 Ok(())
626 }
627
628 #[cfg(tidehunter)]
629 fn prune_executed_tx_digests_th(
630 perpetual_db: &Arc<AuthorityPerpetualTables>,
631 checkpoint_store: &Arc<CheckpointStore>,
632 ) -> anyhow::Result<()> {
633 let current_epoch = checkpoint_store
634 .get_highest_executed_checkpoint()?
635 .map(|c| c.epoch)
636 .unwrap_or_default();
637
638 if current_epoch < 2 {
639 return Ok(());
640 }
641
642 let last_epoch_to_delete = current_epoch - 2;
643 let from_key = (0u64, TransactionDigest::ZERO);
644 let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
645 info!(
646 "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
647 last_epoch_to_delete, current_epoch
648 );
649 perpetual_db
650 .executed_transaction_digests
651 .drop_cells_in_range(&from_key, &to_key)?;
652 Ok(())
653 }
654
655 fn update_pruning_watermarks(
656 perpetual_db: &Arc<AuthorityPerpetualTables>,
657 checkpoint_store: &Arc<CheckpointStore>,
658 num_epochs_to_retain: u64,
659 pruning_watermark: &Arc<PrunerWatermarks>,
660 ) -> anyhow::Result<bool> {
661 use std::sync::atomic::Ordering;
662 let objects_pruning_checkpoint_id = perpetual_db
663 .get_highest_pruned_checkpoint()?
664 .unwrap_or_default();
665 let objects_pruning_epoch_id = checkpoint_store
666 .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
667 .map(|chk| chk.epoch)
668 .unwrap_or_default();
669
670 let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
671 let current_epoch_id = checkpoint_store
672 .get_highest_executed_checkpoint()?
673 .map(|c| c.epoch)
674 .unwrap_or_default();
675 if current_epoch_id < num_epochs_to_retain {
676 return Ok(false);
677 }
678 let target_epoch_id = current_epoch_id - num_epochs_to_retain;
679 let checkpoint_id =
680 checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
681
682 let new_watermark = min(target_epoch_id + 1, objects_pruning_epoch_id);
683 if current_watermark == new_watermark {
684 return Ok(false);
685 }
686 info!("relocation: setting epoch watermark to {}", new_watermark);
687 pruning_watermark
688 .epoch_id
689 .store(new_watermark, Ordering::Relaxed);
690 if let Some(checkpoint_id) = checkpoint_id {
691 let watermark = min(checkpoint_id, objects_pruning_checkpoint_id);
692 info!("relocation: setting checkpoint watermark to {}", watermark);
693 pruning_watermark
694 .checkpoint_id
695 .store(watermark, Ordering::Relaxed);
696 }
697 Ok(true)
698 }
699
700 #[cfg(tidehunter)]
701 fn prune_th(
702 perpetual_db: &Arc<AuthorityPerpetualTables>,
703 checkpoint_store: &Arc<CheckpointStore>,
704 num_epochs_to_retain: u64,
705 pruning_watermark: Arc<PrunerWatermarks>,
706 ) -> anyhow::Result<()> {
707 let watermark_updated = Self::update_pruning_watermarks(
708 perpetual_db,
709 checkpoint_store,
710 num_epochs_to_retain,
711 &pruning_watermark,
712 )?;
713 if !watermark_updated {
714 info!("skip relocation. Watermark hasn't changed");
715 return Ok(());
716 }
717 perpetual_db.objects.db.start_relocation()?;
718 checkpoint_store.tables.watermarks.db.start_relocation()?;
719 Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
720 Ok(())
721 }
722
723 fn compact_next_sst_file(
724 perpetual_db: Arc<AuthorityPerpetualTables>,
725 delay_days: usize,
726 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
727 ) -> anyhow::Result<Option<LiveFile>> {
728 let db_path = perpetual_db.objects.db.path_for_pruning();
729 let mut state = last_processed
730 .lock()
731 .expect("failed to obtain a lock for last processed SST files");
732 let mut sst_file_for_compaction: Option<LiveFile> = None;
733 let time_threshold =
734 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
735 for sst_file in perpetual_db.objects.db.live_files()? {
736 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
737 let last_modified = std::fs::metadata(file_path)?.modified()?;
738 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
739 || sst_file.level < 1
740 || sst_file.start_key.is_none()
741 || sst_file.end_key.is_none()
742 || last_modified > time_threshold
743 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
744 {
745 continue;
746 }
747 if let Some(candidate) = &sst_file_for_compaction
748 && candidate.size > sst_file.size
749 {
750 continue;
751 }
752 sst_file_for_compaction = Some(sst_file);
753 }
754 let Some(sst_file) = sst_file_for_compaction else {
755 return Ok(None);
756 };
757 info!(
758 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
759 sst_file.name, sst_file.size, sst_file.level
760 );
761 perpetual_db.objects.compact_range_raw(
762 &sst_file.column_family_name,
763 sst_file.start_key.clone().unwrap(),
764 sst_file.end_key.clone().unwrap(),
765 )?;
766 state.insert(sst_file.name.clone(), SystemTime::now());
767 Ok(Some(sst_file))
768 }
769
770 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
771 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
772 }
773
774 fn smoothed_max_eligible_checkpoint_number(
775 checkpoint_store: &Arc<CheckpointStore>,
776 mut max_eligible_checkpoint: CheckpointSequenceNumber,
777 pruned_checkpoint: CheckpointSequenceNumber,
778 epoch_id: EpochId,
779 epoch_duration_ms: u64,
780 num_epochs_to_retain: u64,
781 ) -> anyhow::Result<CheckpointSequenceNumber> {
782 if epoch_id < num_epochs_to_retain {
783 return Ok(0);
784 }
785 let last_checkpoint_in_epoch = checkpoint_store
786 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
787 .map(|checkpoint| checkpoint.sequence_number)
788 .unwrap_or_default();
789 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
790 if max_eligible_checkpoint == 0 {
791 return Ok(max_eligible_checkpoint);
792 }
793 let num_intervals = epoch_duration_ms
794 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
795 .unwrap_or(1);
796 let delta = max_eligible_checkpoint
797 .checked_sub(pruned_checkpoint)
798 .unwrap_or_default()
799 .checked_div(num_intervals)
800 .unwrap_or(1);
801 Ok(pruned_checkpoint + delta)
802 }
803
804 fn setup_pruning(
805 config: AuthorityStorePruningConfig,
806 epoch_duration_ms: u64,
807 perpetual_db: Arc<AuthorityPerpetualTables>,
808 checkpoint_store: Arc<CheckpointStore>,
809 rpc_index: Option<Arc<RpcIndexStore>>,
810 jsonrpc_index: Option<Arc<IndexStore>>,
811 metrics: Arc<AuthorityStorePruningMetrics>,
812 pruner_watermarks: Arc<PrunerWatermarks>,
813 ) -> Sender<()> {
814 let (sender, mut recv) = tokio::sync::oneshot::channel();
815 debug!(
816 "Starting object pruning service with num_epochs_to_retain={}",
817 config.num_epochs_to_retain
818 );
819
820 let tick_duration =
821 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
822 let pruning_initial_delay = if cfg!(msim) {
823 Duration::from_millis(1)
824 } else {
825 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
826 };
827 let mut objects_prune_interval =
828 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
829 let mut checkpoints_prune_interval =
830 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
831
832 metrics
833 .num_epochs_to_retain_for_objects
834 .set(config.num_epochs_to_retain as i64);
835 metrics.num_epochs_to_retain_for_checkpoints.set(
836 config
837 .num_epochs_to_retain_for_checkpoints
838 .unwrap_or_default() as i64,
839 );
840
841 #[cfg(tidehunter)]
842 {
843 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
844 let prune_objects = config.num_epochs_to_retain != u64::MAX;
845 tokio::task::spawn(async move {
846 loop {
847 tokio::select! {
848 _ = objects_prune_interval.tick(), if prune_objects => {
849 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
850 error!("Failed to prune objects: {:?}", err);
851 }
852 },
853 _ = checkpoints_prune_interval.tick() => {
854 if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone()) {
855 error!("Failed to prune checkpoints: {:?}", err);
856 }
857 },
858 _ = &mut recv => break,
859 }
860 }
861 });
862 }
863 }
864 #[cfg(not(tidehunter))]
865 {
866 let mut indexes_prune_interval =
867 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
868
869 let perpetual_db_for_compaction = perpetual_db.clone();
870 if let Some(delay_days) = config.periodic_compaction_threshold_days {
871 spawn_monitored_task!(async move {
872 let last_processed = Arc::new(Mutex::new(HashMap::new()));
873 loop {
874 let db = perpetual_db_for_compaction.clone();
875 let state = Arc::clone(&last_processed);
876 let result = tokio::task::spawn_blocking(move || {
877 Self::compact_next_sst_file(db, delay_days, state)
878 })
879 .await;
880 let mut sleep_interval_secs = 1;
881 match result {
882 Err(err) => error!("Failed to compact sst file: {:?}", err),
883 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
884 Ok(Ok(None)) => {
885 sleep_interval_secs = 3600;
886 }
887 _ => {}
888 }
889 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
890 }
891 });
892 }
893 tokio::task::spawn(async move {
894 loop {
895 tokio::select! {
896 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
897 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
898 error!("Failed to prune objects: {:?}", err);
899 }
900 if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
901 error!("Failed to prune executed_tx_digests: {:?}", err);
902 }
903 },
904 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
905 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
906 error!("Failed to prune checkpoints: {:?}", err);
907 }
908 },
909 _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
910 if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
911 error!("Failed to prune indexes: {:?}", err);
912 }
913 }
914 _ = &mut recv => break,
915 }
916 }
917 });
918 }
919 sender
920 }
921
922 pub fn new(
923 perpetual_db: Arc<AuthorityPerpetualTables>,
924 checkpoint_store: Arc<CheckpointStore>,
925 rpc_index: Option<Arc<RpcIndexStore>>,
926 jsonrpc_index: Option<Arc<IndexStore>>,
927 mut pruning_config: AuthorityStorePruningConfig,
928 is_validator: bool,
929 epoch_duration_ms: u64,
930 registry: &Registry,
931 pruner_watermarks: Arc<PrunerWatermarks>, ) -> Self {
933 #[cfg(tidehunter)]
939 if is_validator && pruning_config.num_epochs_to_retain != u64::MAX {
940 info!(
941 "Tidehunter validator: disabling object pruner (was num_epochs_to_retain={}). The objects compactor performs equivalent compaction.",
942 pruning_config.num_epochs_to_retain
943 );
944 pruning_config.num_epochs_to_retain = u64::MAX;
945 }
946
947 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
948 {
949 warn!(
950 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
951 pruning_config.num_epochs_to_retain
952 );
953 if is_validator {
954 warn!("Resetting to aggressive pruner.");
955 pruning_config.num_epochs_to_retain = 0;
956 } else {
957 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
958 }
959 }
960 AuthorityStorePruner {
961 _objects_pruner_cancel_handle: Self::setup_pruning(
962 pruning_config,
963 epoch_duration_ms,
964 perpetual_db,
965 checkpoint_store,
966 rpc_index,
967 jsonrpc_index,
968 AuthorityStorePruningMetrics::new(registry),
969 pruner_watermarks,
970 ),
971 }
972 }
973
974 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
975 perpetual_db.objects.compact_range(
976 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
977 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
978 )
979 }
980}
981
982#[cfg(tidehunter)]
983pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
984 config: typed_store::tidehunter_util::KeySpaceConfig,
985 pruner_watermark: Arc<AtomicU64>,
986 extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
987 by_key: bool,
988) -> typed_store::tidehunter_util::KeySpaceConfig {
989 use bincode::Options;
990 use std::sync::atomic::Ordering;
991 use typed_store::tidehunter_util::Decision;
992 config.with_relocation_filter(move |key, value| {
993 let data = if by_key {
994 bincode::DefaultOptions::new()
995 .with_big_endian()
996 .with_fixint_encoding()
997 .deserialize(&key)
998 .expect("relocation filter deserialization error")
999 } else {
1000 bcs::from_bytes(&value).expect("relocation filter deserialization error")
1001 };
1002 if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
1003 Decision::Remove
1004 } else {
1005 Decision::StopRelocation
1006 }
1007 })
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use more_asserts as ma;
1013 use std::path::Path;
1014 use std::time::Duration;
1015 use std::{collections::HashSet, sync::Arc};
1016 use tracing::log::info;
1017
1018 use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1019 use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1020 use crate::authority::authority_store_types::{
1021 StoreObject, StoreObjectWrapper, get_store_object,
1022 };
1023 use prometheus::Registry;
1024 use sui_types::base_types::ObjectDigest;
1025 use sui_types::effects::TransactionEffects;
1026 use sui_types::effects::TransactionEffectsAPI;
1027 use sui_types::{
1028 base_types::{ObjectID, SequenceNumber},
1029 object::Object,
1030 storage::ObjectKey,
1031 };
1032 use typed_store::Map;
1033 use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1034
1035 use super::AuthorityStorePruner;
1036
1037 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1038 let perpetual_db_path = path.join(Path::new("perpetual"));
1039 let cf_names = AuthorityPerpetualTables::describe_tables();
1040 let cfs: Vec<_> = cf_names
1041 .keys()
1042 .map(|x| (x.as_str(), default_db_options().options))
1043 .collect();
1044 let perpetual_db = typed_store::rocks::open_cf_opts(
1045 perpetual_db_path,
1046 None,
1047 MetricConf::new("perpetual_pruning"),
1048 &cfs,
1049 );
1050
1051 let mut after_pruning = HashSet::new();
1052 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1053 &perpetual_db?,
1054 Some("objects"),
1055 &ReadWriteOptions::default(),
1058 false,
1059 )?;
1060 let iter = objects.safe_iter();
1061 for item in iter {
1062 after_pruning.insert(item?.0);
1063 }
1064 Ok(after_pruning)
1065 }
1066
1067 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1068
1069 fn generate_test_data(
1070 db: Arc<AuthorityPerpetualTables>,
1071 num_versions_per_object: u64,
1072 num_object_versions_to_retain: u64,
1073 total_unique_object_ids: u32,
1074 ) -> Result<GenerateTestDataResult, anyhow::Error> {
1075 assert!(num_versions_per_object >= num_object_versions_to_retain);
1076
1077 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1078 let mut batch = db.objects.batch();
1079
1080 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1081 for id in ids {
1082 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1083 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1084 if counter < num_object_versions_to_retain.try_into().unwrap() {
1085 to_keep.push(object_key);
1087 } else {
1088 to_delete.push(object_key);
1089 }
1090 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1091 batch.insert_batch(
1092 &db.objects,
1093 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1094 )?;
1095 }
1096
1097 if num_object_versions_to_retain == 0 {
1099 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1100 println!("Adding tombstone object {:?}", tombstone_key);
1101 batch.insert_batch(
1102 &db.objects,
1103 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1104 )?;
1105 tombstones.push(tombstone_key);
1106 }
1107 }
1108 batch.write().unwrap();
1109 assert_eq!(
1110 to_keep.len() as u64,
1111 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1112 * total_unique_object_ids as u64
1113 );
1114 assert_eq!(
1115 tombstones.len() as u64,
1116 if num_object_versions_to_retain == 0 {
1117 total_unique_object_ids as u64
1118 } else {
1119 0
1120 }
1121 );
1122 Ok((to_keep, to_delete, tombstones))
1123 }
1124
1125 async fn run_pruner(
1126 path: &Path,
1127 num_versions_per_object: u64,
1128 num_object_versions_to_retain: u64,
1129 total_unique_object_ids: u32,
1130 ) -> Vec<ObjectKey> {
1131 let registry = Registry::default();
1132 let metrics = AuthorityStorePruningMetrics::new(®istry);
1133 let to_keep = {
1134 let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1135 let (to_keep, to_delete, tombstones) = generate_test_data(
1136 db.clone(),
1137 num_versions_per_object,
1138 num_object_versions_to_retain,
1139 total_unique_object_ids,
1140 )
1141 .unwrap();
1142 let mut effects = TransactionEffects::default();
1143 for object in to_delete {
1144 effects.unsafe_add_deleted_live_object_for_testing((
1145 object.0,
1146 object.1,
1147 ObjectDigest::MIN,
1148 ));
1149 }
1150 for object in tombstones {
1151 effects.unsafe_add_object_tombstone_for_testing((
1152 object.0,
1153 object.1,
1154 ObjectDigest::MIN,
1155 ));
1156 }
1157 AuthorityStorePruner::prune_objects_and_indexes(
1158 vec![effects],
1159 &db,
1160 0,
1161 metrics,
1162 vec![],
1163 None,
1164 true,
1165 )
1166 .await
1167 .unwrap();
1168 to_keep
1169 };
1170 tokio::time::sleep(Duration::from_secs(3)).await;
1171 to_keep
1172 }
1173
1174 #[cfg(not(tidehunter))]
1176 #[tokio::test]
1177 async fn test_pruning_objects() {
1178 let path = tempfile::tempdir().unwrap().keep();
1179 let to_keep = run_pruner(&path, 3, 2, 1000).await;
1180 assert_eq!(
1181 HashSet::from_iter(to_keep),
1182 get_keys_after_pruning(&path).unwrap()
1183 );
1184 run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1185 }
1186
1187 #[cfg(not(tidehunter))]
1189 #[tokio::test]
1190 async fn test_pruning_tombstones() {
1191 let path = tempfile::tempdir().unwrap().keep();
1192 let to_keep = run_pruner(&path, 0, 0, 1000).await;
1193 assert_eq!(to_keep.len(), 0);
1194 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1195
1196 let path = tempfile::tempdir().unwrap().keep();
1197 let to_keep = run_pruner(&path, 3, 0, 1000).await;
1198 assert_eq!(to_keep.len(), 0);
1199 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1200 }
1201
1202 #[cfg(not(target_env = "msvc"))]
1203 #[tokio::test]
1204 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1205 let primary_path = tempfile::tempdir()?.keep();
1206 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1207 let total_unique_object_ids = 10_000;
1208 let num_versions_per_object = 10;
1209 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1210 let mut to_delete = vec![];
1211 for id in ids {
1212 for i in (0..num_versions_per_object).rev() {
1213 if i < num_versions_per_object - 2 {
1214 to_delete.push((id, SequenceNumber::from(i)));
1215 }
1216 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1217 perpetual_db
1218 .objects
1219 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1220 }
1221 }
1222
1223 fn get_sst_size(path: &Path) -> u64 {
1224 let mut size = 0;
1225 for entry in std::fs::read_dir(path).unwrap() {
1226 let entry = entry.unwrap();
1227 let path = entry.path();
1228 if let Some(ext) = path.extension() {
1229 if ext != "sst" {
1230 continue;
1231 }
1232 size += std::fs::metadata(path).unwrap().len();
1233 }
1234 }
1235 size
1236 }
1237
1238 let db_path = primary_path.clone().join("perpetual");
1239 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1240 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1241
1242 perpetual_db.objects.compact_range(&start, &end)?;
1243 let before_compaction_size = get_sst_size(&db_path);
1244
1245 let mut effects = TransactionEffects::default();
1246 for object in to_delete {
1247 effects.unsafe_add_deleted_live_object_for_testing((
1248 object.0,
1249 object.1,
1250 ObjectDigest::MIN,
1251 ));
1252 }
1253 let registry = Registry::default();
1254 let metrics = AuthorityStorePruningMetrics::new(®istry);
1255 let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1256 vec![effects],
1257 &perpetual_db,
1258 0,
1259 metrics,
1260 vec![],
1261 None,
1262 true,
1263 )
1264 .await;
1265 info!("Total pruned keys = {:?}", total_pruned);
1266
1267 perpetual_db.objects.compact_range(&start, &end)?;
1268 let after_compaction_size = get_sst_size(&db_path);
1269
1270 info!(
1271 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1272 before_compaction_size, after_compaction_size
1273 );
1274 ma::assert_le!(after_compaction_size, before_compaction_size);
1275 Ok(())
1276 }
1277}