1use super::authority_store_tables::AuthorityPerpetualTables;
5use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
6use crate::jsonrpc_index::IndexStore;
7use crate::rpc_index::RpcIndexStore;
8use anyhow::anyhow;
9use mysten_metrics::{monitored_scope, spawn_monitored_task};
10use once_cell::sync::Lazy;
11use prometheus::{
12 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
13 register_int_gauge_with_registry,
14};
15#[cfg(tidehunter)]
16use serde::de::DeserializeOwned;
17use std::cmp::{max, min};
18use std::collections::{BTreeSet, HashMap};
19use std::sync::Mutex;
20use std::sync::atomic::AtomicU64;
21use std::time::{SystemTime, UNIX_EPOCH};
22use std::{sync::Arc, time::Duration};
23use sui_config::node::AuthorityStorePruningConfig;
24use sui_types::committee::EpochId;
25use sui_types::effects::TransactionEffects;
26use sui_types::effects::TransactionEffectsAPI;
27use sui_types::message_envelope::Message;
28use sui_types::messages_checkpoint::{
29 CheckpointContents, CheckpointDigest, CheckpointSequenceNumber,
30};
31use sui_types::{
32 base_types::{ObjectID, SequenceNumber, TransactionDigest, VersionNumber},
33 storage::ObjectKey,
34};
35use tokio::sync::oneshot::{self, Sender};
36use tokio::time::Instant;
37use tracing::{debug, error, info, warn};
38use typed_store::rocksdb::LiveFile;
39use typed_store::{Map, TypedStoreError};
40
41static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
42 [
43 "objects",
44 "effects",
45 "transactions",
46 "events",
47 "executed_effects",
48 "executed_transactions_to_checkpoint",
49 ]
50 .into_iter()
51 .map(|cf| cf.to_string())
52 .collect()
53});
54pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
55pub struct AuthorityStorePruner {
56 _objects_pruner_cancel_handle: oneshot::Sender<()>,
57}
58
59#[derive(Default)]
60pub struct PrunerWatermarks {
61 pub epoch_id: Arc<AtomicU64>,
62 pub checkpoint_id: Arc<AtomicU64>,
63}
64
65static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
66
67pub struct AuthorityStorePruningMetrics {
68 pub last_pruned_checkpoint: IntGauge,
69 pub num_pruned_objects: IntCounter,
70 pub num_pruned_tombstones: IntCounter,
71 pub last_pruned_effects_checkpoint: IntGauge,
72 pub last_pruned_indexes_transaction: IntGauge,
73 pub num_epochs_to_retain_for_objects: IntGauge,
74 pub num_epochs_to_retain_for_checkpoints: IntGauge,
75}
76
77impl AuthorityStorePruningMetrics {
78 pub fn new(registry: &Registry) -> Arc<Self> {
79 let this = Self {
80 last_pruned_checkpoint: register_int_gauge_with_registry!(
81 "last_pruned_checkpoint",
82 "Last pruned checkpoint",
83 registry
84 )
85 .unwrap(),
86 num_pruned_objects: register_int_counter_with_registry!(
87 "num_pruned_objects",
88 "Number of pruned objects",
89 registry
90 )
91 .unwrap(),
92 num_pruned_tombstones: register_int_counter_with_registry!(
93 "num_pruned_tombstones",
94 "Number of pruned tombstones",
95 registry
96 )
97 .unwrap(),
98 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
99 "last_pruned_effects_checkpoint",
100 "Last pruned effects checkpoint",
101 registry
102 )
103 .unwrap(),
104 last_pruned_indexes_transaction: register_int_gauge_with_registry!(
105 "last_pruned_indexes_transaction",
106 "Last pruned indexes transaction",
107 registry
108 )
109 .unwrap(),
110 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
111 "num_epochs_to_retain_for_objects",
112 "Number of epochs to retain for objects",
113 registry
114 )
115 .unwrap(),
116 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
117 "num_epochs_to_retain_for_checkpoints",
118 "Number of epochs to retain for checkpoints",
119 registry
120 )
121 .unwrap(),
122 };
123 Arc::new(this)
124 }
125
126 pub fn new_for_test() -> Arc<Self> {
127 Self::new(&Registry::new())
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq)]
132pub enum PruningMode {
133 Objects,
134 Checkpoints,
135}
136
137impl AuthorityStorePruner {
138 #[cfg(not(tidehunter))]
140 async fn prune_objects_and_indexes(
141 transaction_effects: Vec<TransactionEffects>,
142 perpetual_db: &Arc<AuthorityPerpetualTables>,
143 checkpoint_number: CheckpointSequenceNumber,
144 metrics: Arc<AuthorityStorePruningMetrics>,
145 checkpoint_content_to_prune: Vec<CheckpointContents>,
146 rpc_index: Option<&RpcIndexStore>,
147 enable_pruning_tombstones: bool,
148 ) -> anyhow::Result<()> {
149 let _scope = monitored_scope("ObjectsLivePruner");
150 let mut wb = perpetual_db.objects.batch();
151
152 let mut live_object_keys_to_prune = vec![];
154 let mut object_tombstones_to_prune = vec![];
155 for effects in &transaction_effects {
156 for (object_id, seq_number) in effects.modified_at_versions() {
157 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158 }
159
160 if enable_pruning_tombstones {
161 for deleted_object_key in effects.all_tombstones() {
162 object_tombstones_to_prune
163 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
164 }
165 }
166 }
167
168 metrics
169 .num_pruned_objects
170 .inc_by(live_object_keys_to_prune.len() as u64);
171 metrics
172 .num_pruned_tombstones
173 .inc_by(object_tombstones_to_prune.len() as u64);
174
175 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
176 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
177 updates
178 .entry(object_id)
179 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
180 .or_insert((seq_number, seq_number));
181 }
182
183 for (object_id, (min_version, max_version)) in updates {
184 debug!(
185 "Pruning object {:?} versions {:?} - {:?}",
186 object_id, min_version, max_version
187 );
188 let start_range = ObjectKey(object_id, min_version);
189 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
190 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
191 }
192
193 if !object_tombstones_to_prune.is_empty() {
199 let mut object_keys_to_delete = vec![];
200 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
201 for result in perpetual_db.objects.safe_iter_with_bounds(
202 Some(ObjectKey(object_id, VersionNumber::MIN)),
203 Some(ObjectKey(object_id, seq_number.next())),
204 ) {
205 let (object_key, _) = result?;
206 assert_eq!(object_key.0, object_id);
207 object_keys_to_delete.push(object_key);
208 }
209 }
210
211 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
212 }
213
214 if let Some(rpc_index) = rpc_index {
215 rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
216 }
217
218 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
219 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
220
221 wb.write()?;
222 Ok(())
223 }
224
225 #[cfg(tidehunter)]
226 async fn prune_objects_and_indexes(
227 transaction_effects: Vec<TransactionEffects>,
228 perpetual_db: &Arc<AuthorityPerpetualTables>,
229 checkpoint_number: CheckpointSequenceNumber,
230 metrics: Arc<AuthorityStorePruningMetrics>,
231 checkpoint_content_to_prune: Vec<CheckpointContents>,
232 rpc_index: Option<&RpcIndexStore>,
233 _: bool,
234 ) -> anyhow::Result<()> {
235 let _scope = monitored_scope("ObjectsLivePruner");
236 let mut wb = perpetual_db.objects.batch();
237 let mut objects_to_prune = vec![];
238
239 for effects in &transaction_effects {
240 for (object_id, version) in effects
241 .modified_at_versions()
242 .into_iter()
243 .chain(effects.all_tombstones())
244 {
245 debug!("Pruning object {:?} version {:?}", object_id, version);
246 objects_to_prune.push(ObjectKey(object_id, version));
247 }
248 }
249 metrics
250 .num_pruned_objects
251 .inc_by(objects_to_prune.len() as u64);
252 wb.delete_batch(&perpetual_db.objects, &objects_to_prune)?;
253
254 if let Some(rpc_index) = rpc_index {
255 rpc_index.prune(checkpoint_number, &checkpoint_content_to_prune)?;
256 }
257
258 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
259 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
260 wb.write()?;
261 Ok(())
262 }
263
264 fn prune_checkpoints(
265 perpetual_db: &Arc<AuthorityPerpetualTables>,
266 checkpoint_db: &Arc<CheckpointStore>,
267 checkpoint_number: CheckpointSequenceNumber,
268 checkpoints_to_prune: Vec<CheckpointDigest>,
269 checkpoint_content_to_prune: Vec<CheckpointContents>,
270 effects_to_prune: &Vec<TransactionEffects>,
271 metrics: Arc<AuthorityStorePruningMetrics>,
272 ) -> anyhow::Result<()> {
273 let _scope = monitored_scope("EffectsLivePruner");
274
275 let mut perpetual_batch = perpetual_db.objects.batch();
276 let transactions: Vec<_> = checkpoint_content_to_prune
277 .iter()
278 .flat_map(|content| content.iter().map(|tx| tx.transaction))
279 .collect();
280
281 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
282 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
283 perpetual_batch.delete_batch(
284 &perpetual_db.executed_transactions_to_checkpoint,
285 transactions.iter(),
286 )?;
287
288 let mut effect_digests = vec![];
289 for effects in effects_to_prune {
290 let effects_digest = effects.digest();
291 debug!("Pruning effects {:?}", effects_digest);
292 effect_digests.push(effects_digest);
293
294 if effects.events_digest().is_some() {
295 perpetual_batch
296 .delete_batch(&perpetual_db.events_2, [effects.transaction_digest()])?;
297 }
298 }
299 perpetual_batch.delete_batch(
300 &perpetual_db.unchanged_loaded_runtime_objects,
301 transactions.iter(),
302 )?;
303 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
304
305 let mut checkpoints_batch = checkpoint_db.tables.certified_checkpoints.batch();
306
307 let checkpoint_content_digests =
308 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
309 checkpoints_batch.delete_batch(
310 &checkpoint_db.tables.checkpoint_content,
311 checkpoint_content_digests.clone(),
312 )?;
313 checkpoints_batch.delete_batch(
314 &checkpoint_db.tables.checkpoint_sequence_by_contents_digest,
315 checkpoint_content_digests,
316 )?;
317
318 checkpoints_batch.delete_batch(
319 &checkpoint_db.tables.checkpoint_by_digest,
320 checkpoints_to_prune,
321 )?;
322
323 checkpoints_batch.insert_batch(
324 &checkpoint_db.tables.watermarks,
325 [(
326 &CheckpointWatermark::HighestPruned,
327 &(checkpoint_number, CheckpointDigest::random()),
328 )],
329 )?;
330
331 perpetual_batch.write()?;
332 checkpoints_batch.write()?;
333 metrics
334 .last_pruned_effects_checkpoint
335 .set(checkpoint_number as i64);
336
337 Ok(())
338 }
339
340 pub async fn prune_objects_for_eligible_epochs(
342 perpetual_db: &Arc<AuthorityPerpetualTables>,
343 checkpoint_store: &Arc<CheckpointStore>,
344 rpc_index: Option<&RpcIndexStore>,
345 config: AuthorityStorePruningConfig,
346 metrics: Arc<AuthorityStorePruningMetrics>,
347 epoch_duration_ms: u64,
348 ) -> anyhow::Result<()> {
349 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
350 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
351 .get_highest_executed_checkpoint()?
352 .map(|c| (*c.sequence_number(), c.epoch))
353 .unwrap_or_default();
354 let pruned_checkpoint_number = perpetual_db
355 .get_highest_pruned_checkpoint()?
356 .unwrap_or_default();
357 if config.smooth && config.num_epochs_to_retain > 0 {
358 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
359 checkpoint_store,
360 max_eligible_checkpoint_number,
361 pruned_checkpoint_number,
362 epoch_id,
363 epoch_duration_ms,
364 config.num_epochs_to_retain,
365 )?;
366 }
367 Self::prune_for_eligible_epochs(
368 perpetual_db,
369 checkpoint_store,
370 rpc_index,
371 PruningMode::Objects,
372 config.num_epochs_to_retain,
373 pruned_checkpoint_number,
374 max_eligible_checkpoint_number,
375 config,
376 metrics.clone(),
377 )
378 .await
379 }
380
381 pub async fn prune_checkpoints_for_eligible_epochs(
382 perpetual_db: &Arc<AuthorityPerpetualTables>,
383 checkpoint_store: &Arc<CheckpointStore>,
384 rpc_index: Option<&RpcIndexStore>,
385 config: AuthorityStorePruningConfig,
386 metrics: Arc<AuthorityStorePruningMetrics>,
387 epoch_duration_ms: u64,
388 pruner_watermarks: &Arc<PrunerWatermarks>,
389 ) -> anyhow::Result<()> {
390 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
391 let pruned_checkpoint_number = checkpoint_store
392 .get_highest_pruned_checkpoint_seq_number()?
393 .unwrap_or(0);
394 let (mut max_eligible_checkpoint, epoch_id) = checkpoint_store
395 .get_highest_executed_checkpoint()?
396 .map(|c| (*c.sequence_number(), c.epoch))
397 .unwrap_or_default();
398 if config.num_epochs_to_retain != u64::MAX {
399 max_eligible_checkpoint = min(
400 max_eligible_checkpoint,
401 perpetual_db
402 .get_highest_pruned_checkpoint()?
403 .unwrap_or_default(),
404 );
405 }
406 if config.smooth
407 && let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints
408 {
409 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
410 checkpoint_store,
411 max_eligible_checkpoint,
412 pruned_checkpoint_number,
413 epoch_id,
414 epoch_duration_ms,
415 num_epochs_to_retain,
416 )?;
417 }
418 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
419 Self::prune_for_eligible_epochs(
420 perpetual_db,
421 checkpoint_store,
422 rpc_index,
423 PruningMode::Checkpoints,
424 config
425 .num_epochs_to_retain_for_checkpoints()
426 .ok_or_else(|| anyhow!("config value not set"))?,
427 pruned_checkpoint_number,
428 max_eligible_checkpoint,
429 config.clone(),
430 metrics.clone(),
431 )
432 .await?;
433
434 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
435 Self::update_pruning_watermarks(
436 perpetual_db,
437 checkpoint_store,
438 num_epochs_to_retain,
439 pruner_watermarks,
440 )?;
441 }
442 Ok(())
443 }
444
445 pub async fn prune_for_eligible_epochs(
447 perpetual_db: &Arc<AuthorityPerpetualTables>,
448 checkpoint_store: &Arc<CheckpointStore>,
449 rpc_index: Option<&RpcIndexStore>,
450 mode: PruningMode,
451 num_epochs_to_retain: u64,
452 starting_checkpoint_number: CheckpointSequenceNumber,
453 max_eligible_checkpoint: CheckpointSequenceNumber,
454 config: AuthorityStorePruningConfig,
455 metrics: Arc<AuthorityStorePruningMetrics>,
456 ) -> anyhow::Result<()> {
457 let _scope = monitored_scope("PruneForEligibleEpochs");
458
459 let mut checkpoint_number = starting_checkpoint_number;
460 let current_epoch = checkpoint_store
461 .get_highest_executed_checkpoint()?
462 .map(|c| c.epoch())
463 .unwrap_or_default();
464
465 let mut checkpoints_to_prune = vec![];
466 let mut checkpoint_content_to_prune = vec![];
467 let mut effects_to_prune = vec![];
468
469 loop {
470 let Some(ckpt) = checkpoint_store
471 .tables
472 .certified_checkpoints
473 .get(&(checkpoint_number + 1))?
474 else {
475 break;
476 };
477 let checkpoint = ckpt.into_inner();
478 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484 {
485 break;
486 }
487 checkpoint_number = *checkpoint.sequence_number();
488
489 let content = checkpoint_store
490 .get_checkpoint_contents(&checkpoint.content_digest)?
491 .ok_or_else(|| {
492 anyhow::anyhow!(
493 "checkpoint content data is missing: {}",
494 checkpoint.sequence_number
495 )
496 })?;
497 let effects = perpetual_db
498 .effects
499 .multi_get(content.iter().map(|tx| tx.effects))?;
500
501 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502 checkpoints_to_prune.push(*checkpoint.digest());
503 checkpoint_content_to_prune.push(content);
504 effects_to_prune.extend(effects.into_iter().flatten());
505
506 if effects_to_prune.len() >= config.max_transactions_in_batch
507 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508 {
509 match mode {
510 PruningMode::Objects => {
511 Self::prune_objects_and_indexes(
512 effects_to_prune,
513 perpetual_db,
514 checkpoint_number,
515 metrics.clone(),
516 checkpoint_content_to_prune,
517 rpc_index,
518 !config.killswitch_tombstone_pruning,
519 )
520 .await?
521 }
522 PruningMode::Checkpoints => Self::prune_checkpoints(
523 perpetual_db,
524 checkpoint_store,
525 checkpoint_number,
526 checkpoints_to_prune,
527 checkpoint_content_to_prune,
528 &effects_to_prune,
529 metrics.clone(),
530 )?,
531 };
532 checkpoints_to_prune = vec![];
533 checkpoint_content_to_prune = vec![];
534 effects_to_prune = vec![];
535 tokio::task::yield_now().await;
537 }
538 }
539
540 if !checkpoints_to_prune.is_empty() {
541 match mode {
542 PruningMode::Objects => {
543 Self::prune_objects_and_indexes(
544 effects_to_prune,
545 perpetual_db,
546 checkpoint_number,
547 metrics.clone(),
548 checkpoint_content_to_prune,
549 rpc_index,
550 !config.killswitch_tombstone_pruning,
551 )
552 .await?
553 }
554 PruningMode::Checkpoints => Self::prune_checkpoints(
555 perpetual_db,
556 checkpoint_store,
557 checkpoint_number,
558 checkpoints_to_prune,
559 checkpoint_content_to_prune,
560 &effects_to_prune,
561 metrics.clone(),
562 )?,
563 };
564 }
565 Ok(())
566 }
567
568 fn prune_indexes(
569 indexes: Option<&IndexStore>,
570 config: &AuthorityStorePruningConfig,
571 epoch_duration_ms: u64,
572 metrics: &AuthorityStorePruningMetrics,
573 ) -> anyhow::Result<()> {
574 if let (Some(mut epochs_to_retain), Some(indexes)) =
575 (config.num_epochs_to_retain_for_indexes, indexes)
576 {
577 if epochs_to_retain < 7 {
578 warn!("num_epochs_to_retain_for_indexes is too low. Reseting it to 7");
579 epochs_to_retain = 7;
580 }
581 let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
582 if let Some(cut_time_ms) =
583 u64::try_from(now)?.checked_sub(epochs_to_retain * epoch_duration_ms)
584 {
585 let transaction_id = indexes.prune(cut_time_ms)?;
586 metrics
587 .last_pruned_indexes_transaction
588 .set(transaction_id as i64);
589 }
590 }
591 Ok(())
592 }
593
594 async fn prune_executed_tx_digests(
595 perpetual_db: &Arc<AuthorityPerpetualTables>,
596 checkpoint_store: &Arc<CheckpointStore>,
597 ) -> anyhow::Result<()> {
598 let current_epoch = checkpoint_store
599 .get_highest_executed_checkpoint()?
600 .map(|c| c.epoch)
601 .unwrap_or_default();
602
603 if current_epoch < 2 {
604 return Ok(());
605 }
606
607 let target_epoch = current_epoch - 1;
608
609 let start_key = (0u64, TransactionDigest::ZERO);
610 let end_key = (target_epoch, TransactionDigest::ZERO);
611
612 info!(
613 "Pruning executed_transaction_digests for epochs < {} (current epoch: {})",
614 target_epoch, current_epoch
615 );
616
617 let mut batch = perpetual_db.executed_transaction_digests.batch();
618 batch.schedule_delete_range(
619 &perpetual_db.executed_transaction_digests,
620 &start_key,
621 &end_key,
623 )?;
624 batch.write()?;
625 Ok(())
626 }
627
628 #[cfg(tidehunter)]
629 fn prune_executed_tx_digests_th(
630 perpetual_db: &Arc<AuthorityPerpetualTables>,
631 checkpoint_store: &Arc<CheckpointStore>,
632 ) -> anyhow::Result<()> {
633 let current_epoch = checkpoint_store
634 .get_highest_executed_checkpoint()?
635 .map(|c| c.epoch)
636 .unwrap_or_default();
637
638 if current_epoch < 2 {
639 return Ok(());
640 }
641
642 let last_epoch_to_delete = current_epoch - 2;
643 let from_key = (0u64, TransactionDigest::ZERO);
644 let to_key = (last_epoch_to_delete, TransactionDigest::new([0xff; 32]));
645 info!(
646 "Pruning executed_transaction_digests for epochs 0 to {} (current epoch: {})",
647 last_epoch_to_delete, current_epoch
648 );
649 perpetual_db
650 .executed_transaction_digests
651 .drop_cells_in_range(&from_key, &to_key)?;
652 Ok(())
653 }
654
655 fn update_pruning_watermarks(
656 perpetual_db: &Arc<AuthorityPerpetualTables>,
657 checkpoint_store: &Arc<CheckpointStore>,
658 num_epochs_to_retain: u64,
659 pruning_watermark: &Arc<PrunerWatermarks>,
660 ) -> anyhow::Result<bool> {
661 use std::sync::atomic::Ordering;
662 let objects_pruning_checkpoint_id = perpetual_db
663 .get_highest_pruned_checkpoint()?
664 .unwrap_or_default();
665 let objects_pruning_epoch_id = checkpoint_store
666 .get_checkpoint_by_sequence_number(objects_pruning_checkpoint_id)?
667 .map(|chk| chk.epoch)
668 .unwrap_or_default();
669
670 let current_watermark = pruning_watermark.epoch_id.load(Ordering::Relaxed);
671 let current_epoch_id = checkpoint_store
672 .get_highest_executed_checkpoint()?
673 .map(|c| c.epoch)
674 .unwrap_or_default();
675 if current_epoch_id < num_epochs_to_retain {
676 return Ok(false);
677 }
678 let target_epoch_id = current_epoch_id - num_epochs_to_retain;
679 let checkpoint_id =
680 checkpoint_store.get_epoch_last_checkpoint_seq_number(target_epoch_id)?;
681
682 let new_watermark = min(target_epoch_id + 1, objects_pruning_epoch_id);
683 if current_watermark == new_watermark {
684 return Ok(false);
685 }
686 info!("relocation: setting epoch watermark to {}", new_watermark);
687 pruning_watermark
688 .epoch_id
689 .store(new_watermark, Ordering::Relaxed);
690 if let Some(checkpoint_id) = checkpoint_id {
691 let watermark = min(checkpoint_id, objects_pruning_checkpoint_id);
692 info!("relocation: setting checkpoint watermark to {}", watermark);
693 pruning_watermark
694 .checkpoint_id
695 .store(watermark, Ordering::Relaxed);
696 }
697 Ok(true)
698 }
699
700 #[cfg(tidehunter)]
701 fn prune_th(
702 perpetual_db: &Arc<AuthorityPerpetualTables>,
703 checkpoint_store: &Arc<CheckpointStore>,
704 num_epochs_to_retain: u64,
705 pruning_watermark: Arc<PrunerWatermarks>,
706 ) -> anyhow::Result<()> {
707 let watermark_updated = Self::update_pruning_watermarks(
708 perpetual_db,
709 checkpoint_store,
710 num_epochs_to_retain,
711 &pruning_watermark,
712 )?;
713 if !watermark_updated {
714 info!("skip relocation. Watermark hasn't changed");
715 return Ok(());
716 }
717 perpetual_db.objects.db.start_relocation()?;
718 checkpoint_store.tables.watermarks.db.start_relocation()?;
719 Self::prune_executed_tx_digests_th(perpetual_db, checkpoint_store)?;
720 Ok(())
721 }
722
723 fn compact_next_sst_file(
724 perpetual_db: Arc<AuthorityPerpetualTables>,
725 delay_days: usize,
726 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
727 ) -> anyhow::Result<Option<LiveFile>> {
728 let db_path = perpetual_db.objects.db.path_for_pruning();
729 let mut state = last_processed
730 .lock()
731 .expect("failed to obtain a lock for last processed SST files");
732 let mut sst_file_for_compaction: Option<LiveFile> = None;
733 let time_threshold =
734 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
735 for sst_file in perpetual_db.objects.db.live_files()? {
736 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
737 let last_modified = std::fs::metadata(file_path)?.modified()?;
738 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
739 || sst_file.level < 1
740 || sst_file.start_key.is_none()
741 || sst_file.end_key.is_none()
742 || last_modified > time_threshold
743 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
744 {
745 continue;
746 }
747 if let Some(candidate) = &sst_file_for_compaction
748 && candidate.size > sst_file.size
749 {
750 continue;
751 }
752 sst_file_for_compaction = Some(sst_file);
753 }
754 let Some(sst_file) = sst_file_for_compaction else {
755 return Ok(None);
756 };
757 info!(
758 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
759 sst_file.name, sst_file.size, sst_file.level
760 );
761 perpetual_db.objects.compact_range_raw(
762 &sst_file.column_family_name,
763 sst_file.start_key.clone().unwrap(),
764 sst_file.end_key.clone().unwrap(),
765 )?;
766 state.insert(sst_file.name.clone(), SystemTime::now());
767 Ok(Some(sst_file))
768 }
769
770 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
771 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
772 }
773
774 fn smoothed_max_eligible_checkpoint_number(
775 checkpoint_store: &Arc<CheckpointStore>,
776 mut max_eligible_checkpoint: CheckpointSequenceNumber,
777 pruned_checkpoint: CheckpointSequenceNumber,
778 epoch_id: EpochId,
779 epoch_duration_ms: u64,
780 num_epochs_to_retain: u64,
781 ) -> anyhow::Result<CheckpointSequenceNumber> {
782 if epoch_id < num_epochs_to_retain {
783 return Ok(0);
784 }
785 let last_checkpoint_in_epoch = checkpoint_store
786 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
787 .map(|checkpoint| checkpoint.sequence_number)
788 .unwrap_or_default();
789 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
790 if max_eligible_checkpoint == 0 {
791 return Ok(max_eligible_checkpoint);
792 }
793 let num_intervals = epoch_duration_ms
794 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
795 .unwrap_or(1);
796 let delta = max_eligible_checkpoint
797 .checked_sub(pruned_checkpoint)
798 .unwrap_or_default()
799 .checked_div(num_intervals)
800 .unwrap_or(1);
801 Ok(pruned_checkpoint + delta)
802 }
803
804 fn setup_pruning(
805 config: AuthorityStorePruningConfig,
806 epoch_duration_ms: u64,
807 perpetual_db: Arc<AuthorityPerpetualTables>,
808 checkpoint_store: Arc<CheckpointStore>,
809 rpc_index: Option<Arc<RpcIndexStore>>,
810 jsonrpc_index: Option<Arc<IndexStore>>,
811 metrics: Arc<AuthorityStorePruningMetrics>,
812 pruner_watermarks: Arc<PrunerWatermarks>,
813 ) -> Sender<()> {
814 let (sender, mut recv) = tokio::sync::oneshot::channel();
815 debug!(
816 "Starting object pruning service with num_epochs_to_retain={}",
817 config.num_epochs_to_retain
818 );
819
820 let tick_duration =
821 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
822 let pruning_initial_delay = if cfg!(msim) {
823 Duration::from_millis(1)
824 } else {
825 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
826 };
827 let mut objects_prune_interval =
828 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
829 let mut checkpoints_prune_interval =
830 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
831
832 metrics
833 .num_epochs_to_retain_for_objects
834 .set(config.num_epochs_to_retain as i64);
835 metrics.num_epochs_to_retain_for_checkpoints.set(
836 config
837 .num_epochs_to_retain_for_checkpoints
838 .unwrap_or_default() as i64,
839 );
840
841 #[cfg(tidehunter)]
842 {
843 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints() {
844 tokio::task::spawn(async move {
845 loop {
846 tokio::select! {
847 _ = objects_prune_interval.tick() => {
848 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
849 error!("Failed to prune objects: {:?}", err);
850 }
851 },
852 _ = checkpoints_prune_interval.tick() => {
853 if let Err(err) = Self::prune_th(&perpetual_db, &checkpoint_store, num_epochs_to_retain, pruner_watermarks.clone()) {
854 error!("Failed to prune checkpoints: {:?}", err);
855 }
856 },
857 _ = &mut recv => break,
858 }
859 }
860 });
861 }
862 }
863 #[cfg(not(tidehunter))]
864 {
865 let mut indexes_prune_interval =
866 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
867
868 let perpetual_db_for_compaction = perpetual_db.clone();
869 if let Some(delay_days) = config.periodic_compaction_threshold_days {
870 spawn_monitored_task!(async move {
871 let last_processed = Arc::new(Mutex::new(HashMap::new()));
872 loop {
873 let db = perpetual_db_for_compaction.clone();
874 let state = Arc::clone(&last_processed);
875 let result = tokio::task::spawn_blocking(move || {
876 Self::compact_next_sst_file(db, delay_days, state)
877 })
878 .await;
879 let mut sleep_interval_secs = 1;
880 match result {
881 Err(err) => error!("Failed to compact sst file: {:?}", err),
882 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
883 Ok(Ok(None)) => {
884 sleep_interval_secs = 3600;
885 }
886 _ => {}
887 }
888 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
889 }
890 });
891 }
892 tokio::task::spawn(async move {
893 loop {
894 tokio::select! {
895 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
896 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms).await {
897 error!("Failed to prune objects: {:?}", err);
898 }
899 if let Err(err) = Self::prune_executed_tx_digests(&perpetual_db, &checkpoint_store).await {
900 error!("Failed to prune executed_tx_digests: {:?}", err);
901 }
902 },
903 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
904 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), config.clone(), metrics.clone(), epoch_duration_ms, &pruner_watermarks).await {
905 error!("Failed to prune checkpoints: {:?}", err);
906 }
907 },
908 _ = indexes_prune_interval.tick(), if config.num_epochs_to_retain_for_indexes.is_some() => {
909 if let Err(err) = Self::prune_indexes(jsonrpc_index.as_deref(), &config, epoch_duration_ms, &metrics) {
910 error!("Failed to prune indexes: {:?}", err);
911 }
912 }
913 _ = &mut recv => break,
914 }
915 }
916 });
917 }
918 sender
919 }
920
921 pub fn new(
922 perpetual_db: Arc<AuthorityPerpetualTables>,
923 checkpoint_store: Arc<CheckpointStore>,
924 rpc_index: Option<Arc<RpcIndexStore>>,
925 jsonrpc_index: Option<Arc<IndexStore>>,
926 mut pruning_config: AuthorityStorePruningConfig,
927 is_validator: bool,
928 epoch_duration_ms: u64,
929 registry: &Registry,
930 pruner_watermarks: Arc<PrunerWatermarks>, ) -> Self {
932 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
933 {
934 warn!(
935 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
936 pruning_config.num_epochs_to_retain
937 );
938 if is_validator {
939 warn!("Resetting to aggressive pruner.");
940 pruning_config.num_epochs_to_retain = 0;
941 } else {
942 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
943 }
944 }
945 AuthorityStorePruner {
946 _objects_pruner_cancel_handle: Self::setup_pruning(
947 pruning_config,
948 epoch_duration_ms,
949 perpetual_db,
950 checkpoint_store,
951 rpc_index,
952 jsonrpc_index,
953 AuthorityStorePruningMetrics::new(registry),
954 pruner_watermarks,
955 ),
956 }
957 }
958
959 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
960 perpetual_db.objects.compact_range(
961 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
962 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
963 )
964 }
965}
966
967#[cfg(tidehunter)]
968pub(crate) fn apply_relocation_filter<T: DeserializeOwned>(
969 config: typed_store::tidehunter_util::KeySpaceConfig,
970 pruner_watermark: Arc<AtomicU64>,
971 extractor: impl Fn(T) -> u64 + Send + Sync + 'static,
972 by_key: bool,
973) -> typed_store::tidehunter_util::KeySpaceConfig {
974 use bincode::Options;
975 use std::sync::atomic::Ordering;
976 use typed_store::tidehunter_util::Decision;
977 config.with_relocation_filter(move |key, value| {
978 let data = if by_key {
979 bincode::DefaultOptions::new()
980 .with_big_endian()
981 .with_fixint_encoding()
982 .deserialize(&key)
983 .expect("relocation filter deserialization error")
984 } else {
985 bcs::from_bytes(&value).expect("relocation filter deserialization error")
986 };
987 if extractor(data) < pruner_watermark.load(Ordering::Relaxed) {
988 Decision::Remove
989 } else {
990 Decision::StopRelocation
991 }
992 })
993}
994
995#[cfg(test)]
996mod tests {
997 use more_asserts as ma;
998 use std::path::Path;
999 use std::time::Duration;
1000 use std::{collections::HashSet, sync::Arc};
1001 use tracing::log::info;
1002
1003 use crate::authority::authority_store_pruner::AuthorityStorePruningMetrics;
1004 use crate::authority::authority_store_tables::AuthorityPerpetualTables;
1005 use crate::authority::authority_store_types::{
1006 StoreObject, StoreObjectWrapper, get_store_object,
1007 };
1008 use prometheus::Registry;
1009 use sui_types::base_types::ObjectDigest;
1010 use sui_types::effects::TransactionEffects;
1011 use sui_types::effects::TransactionEffectsAPI;
1012 use sui_types::{
1013 base_types::{ObjectID, SequenceNumber},
1014 object::Object,
1015 storage::ObjectKey,
1016 };
1017 use typed_store::Map;
1018 use typed_store::rocks::{DBMap, MetricConf, ReadWriteOptions, default_db_options};
1019
1020 use super::AuthorityStorePruner;
1021
1022 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
1023 let perpetual_db_path = path.join(Path::new("perpetual"));
1024 let cf_names = AuthorityPerpetualTables::describe_tables();
1025 let cfs: Vec<_> = cf_names
1026 .keys()
1027 .map(|x| (x.as_str(), default_db_options().options))
1028 .collect();
1029 let perpetual_db = typed_store::rocks::open_cf_opts(
1030 perpetual_db_path,
1031 None,
1032 MetricConf::new("perpetual_pruning"),
1033 &cfs,
1034 );
1035
1036 let mut after_pruning = HashSet::new();
1037 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
1038 &perpetual_db?,
1039 Some("objects"),
1040 &ReadWriteOptions::default(),
1043 false,
1044 )?;
1045 let iter = objects.safe_iter();
1046 for item in iter {
1047 after_pruning.insert(item?.0);
1048 }
1049 Ok(after_pruning)
1050 }
1051
1052 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
1053
1054 fn generate_test_data(
1055 db: Arc<AuthorityPerpetualTables>,
1056 num_versions_per_object: u64,
1057 num_object_versions_to_retain: u64,
1058 total_unique_object_ids: u32,
1059 ) -> Result<GenerateTestDataResult, anyhow::Error> {
1060 assert!(num_versions_per_object >= num_object_versions_to_retain);
1061
1062 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
1063 let mut batch = db.objects.batch();
1064
1065 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
1066 for id in ids {
1067 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
1068 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
1069 if counter < num_object_versions_to_retain.try_into().unwrap() {
1070 to_keep.push(object_key);
1072 } else {
1073 to_delete.push(object_key);
1074 }
1075 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1076 batch.insert_batch(
1077 &db.objects,
1078 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
1079 )?;
1080 }
1081
1082 if num_object_versions_to_retain == 0 {
1084 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
1085 println!("Adding tombstone object {:?}", tombstone_key);
1086 batch.insert_batch(
1087 &db.objects,
1088 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
1089 )?;
1090 tombstones.push(tombstone_key);
1091 }
1092 }
1093 batch.write().unwrap();
1094 assert_eq!(
1095 to_keep.len() as u64,
1096 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
1097 * total_unique_object_ids as u64
1098 );
1099 assert_eq!(
1100 tombstones.len() as u64,
1101 if num_object_versions_to_retain == 0 {
1102 total_unique_object_ids as u64
1103 } else {
1104 0
1105 }
1106 );
1107 Ok((to_keep, to_delete, tombstones))
1108 }
1109
1110 async fn run_pruner(
1111 path: &Path,
1112 num_versions_per_object: u64,
1113 num_object_versions_to_retain: u64,
1114 total_unique_object_ids: u32,
1115 ) -> Vec<ObjectKey> {
1116 let registry = Registry::default();
1117 let metrics = AuthorityStorePruningMetrics::new(®istry);
1118 let to_keep = {
1119 let db = Arc::new(AuthorityPerpetualTables::open(path, None, None));
1120 let (to_keep, to_delete, tombstones) = generate_test_data(
1121 db.clone(),
1122 num_versions_per_object,
1123 num_object_versions_to_retain,
1124 total_unique_object_ids,
1125 )
1126 .unwrap();
1127 let mut effects = TransactionEffects::default();
1128 for object in to_delete {
1129 effects.unsafe_add_deleted_live_object_for_testing((
1130 object.0,
1131 object.1,
1132 ObjectDigest::MIN,
1133 ));
1134 }
1135 for object in tombstones {
1136 effects.unsafe_add_object_tombstone_for_testing((
1137 object.0,
1138 object.1,
1139 ObjectDigest::MIN,
1140 ));
1141 }
1142 AuthorityStorePruner::prune_objects_and_indexes(
1143 vec![effects],
1144 &db,
1145 0,
1146 metrics,
1147 vec![],
1148 None,
1149 true,
1150 )
1151 .await
1152 .unwrap();
1153 to_keep
1154 };
1155 tokio::time::sleep(Duration::from_secs(3)).await;
1156 to_keep
1157 }
1158
1159 #[cfg(not(tidehunter))]
1161 #[tokio::test]
1162 async fn test_pruning_objects() {
1163 let path = tempfile::tempdir().unwrap().keep();
1164 let to_keep = run_pruner(&path, 3, 2, 1000).await;
1165 assert_eq!(
1166 HashSet::from_iter(to_keep),
1167 get_keys_after_pruning(&path).unwrap()
1168 );
1169 run_pruner(&tempfile::tempdir().unwrap().keep(), 3, 2, 1000).await;
1170 }
1171
1172 #[cfg(not(tidehunter))]
1174 #[tokio::test]
1175 async fn test_pruning_tombstones() {
1176 let path = tempfile::tempdir().unwrap().keep();
1177 let to_keep = run_pruner(&path, 0, 0, 1000).await;
1178 assert_eq!(to_keep.len(), 0);
1179 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1180
1181 let path = tempfile::tempdir().unwrap().keep();
1182 let to_keep = run_pruner(&path, 3, 0, 1000).await;
1183 assert_eq!(to_keep.len(), 0);
1184 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1185 }
1186
1187 #[cfg(not(target_env = "msvc"))]
1188 #[tokio::test]
1189 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1190 let primary_path = tempfile::tempdir()?.keep();
1191 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None, None));
1192 let total_unique_object_ids = 10_000;
1193 let num_versions_per_object = 10;
1194 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1195 let mut to_delete = vec![];
1196 for id in ids {
1197 for i in (0..num_versions_per_object).rev() {
1198 if i < num_versions_per_object - 2 {
1199 to_delete.push((id, SequenceNumber::from(i)));
1200 }
1201 let obj = get_store_object(Object::immutable_with_id_for_testing(id));
1202 perpetual_db
1203 .objects
1204 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1205 }
1206 }
1207
1208 fn get_sst_size(path: &Path) -> u64 {
1209 let mut size = 0;
1210 for entry in std::fs::read_dir(path).unwrap() {
1211 let entry = entry.unwrap();
1212 let path = entry.path();
1213 if let Some(ext) = path.extension() {
1214 if ext != "sst" {
1215 continue;
1216 }
1217 size += std::fs::metadata(path).unwrap().len();
1218 }
1219 }
1220 size
1221 }
1222
1223 let db_path = primary_path.clone().join("perpetual");
1224 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1225 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1226
1227 perpetual_db.objects.compact_range(&start, &end)?;
1228 let before_compaction_size = get_sst_size(&db_path);
1229
1230 let mut effects = TransactionEffects::default();
1231 for object in to_delete {
1232 effects.unsafe_add_deleted_live_object_for_testing((
1233 object.0,
1234 object.1,
1235 ObjectDigest::MIN,
1236 ));
1237 }
1238 let registry = Registry::default();
1239 let metrics = AuthorityStorePruningMetrics::new(®istry);
1240 let total_pruned = AuthorityStorePruner::prune_objects_and_indexes(
1241 vec![effects],
1242 &perpetual_db,
1243 0,
1244 metrics,
1245 vec![],
1246 None,
1247 true,
1248 )
1249 .await;
1250 info!("Total pruned keys = {:?}", total_pruned);
1251
1252 perpetual_db.objects.compact_range(&start, &end)?;
1253 let after_compaction_size = get_sst_size(&db_path);
1254
1255 info!(
1256 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1257 before_compaction_size, after_compaction_size
1258 );
1259 ma::assert_le!(after_compaction_size, before_compaction_size);
1260 Ok(())
1261 }
1262}