sui_snapshot/
uploader.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::writer::StateSnapshotWriterV1;
5use anyhow::Result;
6use bytes::Bytes;
7use futures::StreamExt;
8use object_store::DynObjectStore;
9use prometheus::{
10    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
11    register_int_gauge_with_registry,
12};
13use std::num::NonZeroUsize;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
18use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
19use sui_core::checkpoints::CheckpointStore;
20use sui_core::db_checkpoint_handler::{STATE_SNAPSHOT_COMPLETED_MARKER, SUCCESS_MARKER};
21use sui_storage::FileCompression;
22use sui_storage::object_store::ObjectStoreListExt;
23use sui_storage::object_store::util::{
24    find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, get, path_to_filesystem, put,
25    run_manifest_update_loop,
26};
27use sui_types::digests::ChainIdentifier;
28use sui_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
29use tracing::{debug, error, info};
30
31pub struct StateSnapshotUploaderMetrics {
32    pub first_missing_state_snapshot_epoch: IntGauge,
33    pub state_snapshot_upload_err: IntCounter,
34}
35
36impl StateSnapshotUploaderMetrics {
37    pub fn new(registry: &Registry) -> Arc<Self> {
38        let this = Self {
39            first_missing_state_snapshot_epoch: register_int_gauge_with_registry!(
40                "first_missing_state_snapshot_epoch",
41                "First epoch for which we have no state snapshot in remote store",
42                registry
43            )
44            .unwrap(),
45            state_snapshot_upload_err: register_int_counter_with_registry!(
46                "state_snapshot_upload_err",
47                "Track upload errors we can alert on",
48                registry
49            )
50            .unwrap(),
51        };
52        Arc::new(this)
53    }
54}
55
56pub struct StateSnapshotUploader {
57    /// Directory path on local disk where db checkpoints are stored
58    db_checkpoint_path: PathBuf,
59    /// Store on local disk where db checkpoints are written to
60    db_checkpoint_store: Arc<DynObjectStore>,
61    /// Checkpoint store; needed to fetch epoch state commitments for verification
62    checkpoint_store: Arc<CheckpointStore>,
63    /// Directory path on local disk where state snapshots are staged for upload
64    staging_path: PathBuf,
65    /// Store on local disk where state snapshots are staged for upload
66    staging_store: Arc<DynObjectStore>,
67    /// Remote store i.e. S3, GCS, etc where state snapshots are uploaded to
68    snapshot_store: Arc<DynObjectStore>,
69    /// Time interval to check for presence of new db checkpoint
70    interval: Duration,
71    metrics: Arc<StateSnapshotUploaderMetrics>,
72    /// The chain identifier is derived from the genesis checkpoint and used to identify the
73    /// network.
74    chain_identifier: ChainIdentifier,
75    /// Archive snapshots every N epochs (0 = disabled)
76    archive_interval_epochs: u64,
77}
78
79impl StateSnapshotUploader {
80    pub fn new(
81        db_checkpoint_path: &std::path::Path,
82        staging_path: &std::path::Path,
83        snapshot_store_config: ObjectStoreConfig,
84        interval_s: u64,
85        registry: &Registry,
86        checkpoint_store: Arc<CheckpointStore>,
87        chain_identifier: ChainIdentifier,
88        archive_interval_epochs: u64,
89    ) -> Result<Arc<Self>> {
90        let db_checkpoint_store_config = ObjectStoreConfig {
91            object_store: Some(ObjectStoreType::File),
92            directory: Some(db_checkpoint_path.to_path_buf()),
93            ..Default::default()
94        };
95        let staging_store_config = ObjectStoreConfig {
96            object_store: Some(ObjectStoreType::File),
97            directory: Some(staging_path.to_path_buf()),
98            ..Default::default()
99        };
100        Ok(Arc::new(StateSnapshotUploader {
101            db_checkpoint_path: db_checkpoint_path.to_path_buf(),
102            db_checkpoint_store: db_checkpoint_store_config.make()?,
103            checkpoint_store,
104            staging_path: staging_path.to_path_buf(),
105            staging_store: staging_store_config.make()?,
106            snapshot_store: snapshot_store_config.make()?,
107            interval: Duration::from_secs(interval_s),
108            metrics: StateSnapshotUploaderMetrics::new(registry),
109            chain_identifier,
110            archive_interval_epochs,
111        }))
112    }
113
114    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
115        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
116        tokio::task::spawn(Self::run_upload_loop(self.clone(), kill_sender.subscribe()));
117        tokio::task::spawn(run_manifest_update_loop(
118            self.snapshot_store.clone(),
119            kill_sender.subscribe(),
120        ));
121        kill_sender
122    }
123
124    async fn upload_state_snapshot_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
125        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
126        let local_checkpoints_by_epoch =
127            find_all_dirs_with_epoch_prefix(&self.db_checkpoint_store, None).await?;
128        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
129        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
130        for (epoch, db_path) in dirs {
131            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
132                info!("Starting state snapshot creation for epoch: {}", *epoch);
133                let state_snapshot_writer = StateSnapshotWriterV1::new_from_store(
134                    &self.staging_path,
135                    &self.staging_store,
136                    &self.snapshot_store,
137                    FileCompression::Zstd,
138                    NonZeroUsize::new(20).unwrap(),
139                )
140                .await?;
141                let db = Arc::new(AuthorityPerpetualTables::open(
142                    &path_to_filesystem(self.db_checkpoint_path.clone(), &db_path.child("store"))?,
143                    None,
144                    None,
145                ));
146                let commitments = self
147                    .checkpoint_store
148                    .get_epoch_state_commitments(*epoch)
149                    .expect("Expected last checkpoint of epoch to have end of epoch data")
150                    .expect("Expected end of epoch data to be present");
151                let state_hash_commitment = match commitments
152                    .last()
153                    .expect("Expected at least one commitment")
154                    .clone()
155                {
156                    ECMHLiveObjectSetDigest(digest) => digest,
157                    _ => return Err(anyhow::anyhow!("Expected ECMHLiveObjectSetDigest")),
158                };
159                state_snapshot_writer
160                    .write(*epoch, db, state_hash_commitment, self.chain_identifier)
161                    .await?;
162                info!("State snapshot creation successful for epoch: {}", *epoch);
163                // Drop marker in the output directory that upload completed successfully
164                let bytes = Bytes::from_static(b"success");
165                let success_marker = db_path.child(SUCCESS_MARKER);
166                put(&self.snapshot_store, &success_marker, bytes.clone()).await?;
167                let bytes = Bytes::from_static(b"success");
168                let state_snapshot_completed_marker =
169                    db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
170                put(
171                    &self.db_checkpoint_store.clone(),
172                    &state_snapshot_completed_marker,
173                    bytes.clone(),
174                )
175                .await?;
176                info!("State snapshot completed for epoch: {epoch}");
177
178                // Archive snapshot if epoch meets archival criteria
179                if let Err(e) = self.archive_epoch_if_needed(*epoch).await {
180                    error!(
181                        "Failed to archive epoch {} (non-fatal, continuing): {:?}",
182                        epoch, e
183                    );
184                }
185            } else {
186                let bytes = Bytes::from_static(b"success");
187                let state_snapshot_completed_marker =
188                    db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
189                put(
190                    &self.db_checkpoint_store.clone(),
191                    &state_snapshot_completed_marker,
192                    bytes.clone(),
193                )
194                .await?;
195                info!("State snapshot skipped for epoch: {epoch}");
196            }
197        }
198        Ok(())
199    }
200
201    async fn run_upload_loop(
202        self: Arc<Self>,
203        mut recv: tokio::sync::broadcast::Receiver<()>,
204    ) -> Result<()> {
205        let mut interval = tokio::time::interval(self.interval);
206        info!("State snapshot uploader loop started");
207        loop {
208            tokio::select! {
209                _now = interval.tick() => {
210                    let missing_epochs = self.get_missing_epochs().await;
211                    match missing_epochs {
212                        Ok(epochs) => {
213                            let first_missing_epoch = epochs.first().cloned().unwrap_or(0);
214                            self.metrics.first_missing_state_snapshot_epoch.set(first_missing_epoch as i64);
215                            if let Err(err) = self.upload_state_snapshot_to_object_store(epochs).await {
216                                self.metrics.state_snapshot_upload_err.inc();
217                                error!("Failed to upload state snapshot to remote store with err: {:?}", err);
218                            } else {
219                                debug!("Successfully completed snapshot upload loop");
220                            }
221                        }
222                        Err(err) => {
223                            error!("Failed to find missing state snapshot in remote store: {:?}", err);
224                        }
225                    }
226                },
227                _ = recv.recv() => break,
228            }
229        }
230        Ok(())
231    }
232
233    async fn get_missing_epochs(&self) -> Result<Vec<u64>> {
234        let missing_epochs = find_missing_epochs_dirs(&self.snapshot_store, SUCCESS_MARKER).await?;
235        Ok(missing_epochs.to_vec())
236    }
237
238    pub(crate) async fn archive_epoch_if_needed(&self, epoch: u64) -> Result<()> {
239        if self.archive_interval_epochs == 0 {
240            return Ok(());
241        }
242
243        if !epoch.is_multiple_of(self.archive_interval_epochs) {
244            debug!(
245                "Epoch {} is not divisible by archive interval {}, skipping archival",
246                epoch, self.archive_interval_epochs
247            );
248            return Ok(());
249        }
250
251        info!(
252            "Epoch {} is divisible by {}, archiving to archive/ subdirectory",
253            epoch, self.archive_interval_epochs
254        );
255
256        let source_prefix = object_store::path::Path::from(format!("epoch_{}", epoch));
257
258        info!("Listing files in {} for archival", source_prefix);
259
260        let mut paths = self.snapshot_store.list_objects(Some(&source_prefix)).await;
261        let mut files_copied = 0;
262
263        while let Some(res) = paths.next().await {
264            match res {
265                Ok(object_metadata) => {
266                    let source_path = &object_metadata.location;
267                    let relative_path = source_path
268                        .as_ref()
269                        .strip_prefix(&format!("epoch_{}/", epoch))
270                        .unwrap_or(source_path.as_ref());
271                    let dest_path = object_store::path::Path::from(format!(
272                        "archive/epoch_{}/{}",
273                        epoch, relative_path
274                    ));
275
276                    debug!("Copying {} to {}", source_path, dest_path);
277
278                    let bytes = get(&self.snapshot_store, source_path).await?;
279                    put(&self.snapshot_store, &dest_path, bytes).await?;
280
281                    files_copied += 1;
282                }
283                Err(e) => {
284                    error!("Failed to list objects for archival: {:?}", e);
285                    return Err(e.into());
286                }
287            }
288        }
289
290        info!(
291            "Successfully archived epoch {} ({} files copied to archive/epoch_{})",
292            epoch, files_copied, epoch
293        );
294        Ok(())
295    }
296}