1use crate::writer::StateSnapshotWriterV1;
5use anyhow::Result;
6use bytes::Bytes;
7use futures::StreamExt;
8use object_store::DynObjectStore;
9use prometheus::{
10 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
11 register_int_gauge_with_registry,
12};
13use std::num::NonZeroUsize;
14use std::path::PathBuf;
15use std::sync::Arc;
16use std::time::Duration;
17use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
18use sui_core::authority::authority_store_tables::AuthorityPerpetualTables;
19use sui_core::checkpoints::CheckpointStore;
20use sui_core::db_checkpoint_handler::{STATE_SNAPSHOT_COMPLETED_MARKER, SUCCESS_MARKER};
21use sui_storage::FileCompression;
22use sui_storage::object_store::ObjectStoreListExt;
23use sui_storage::object_store::util::{
24 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, get, path_to_filesystem, put,
25 run_manifest_update_loop,
26};
27use sui_types::digests::ChainIdentifier;
28use sui_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
29use tracing::{debug, error, info};
30
31pub struct StateSnapshotUploaderMetrics {
32 pub first_missing_state_snapshot_epoch: IntGauge,
33 pub state_snapshot_upload_err: IntCounter,
34}
35
36impl StateSnapshotUploaderMetrics {
37 pub fn new(registry: &Registry) -> Arc<Self> {
38 let this = Self {
39 first_missing_state_snapshot_epoch: register_int_gauge_with_registry!(
40 "first_missing_state_snapshot_epoch",
41 "First epoch for which we have no state snapshot in remote store",
42 registry
43 )
44 .unwrap(),
45 state_snapshot_upload_err: register_int_counter_with_registry!(
46 "state_snapshot_upload_err",
47 "Track upload errors we can alert on",
48 registry
49 )
50 .unwrap(),
51 };
52 Arc::new(this)
53 }
54}
55
56pub struct StateSnapshotUploader {
57 db_checkpoint_path: PathBuf,
59 db_checkpoint_store: Arc<DynObjectStore>,
61 checkpoint_store: Arc<CheckpointStore>,
63 staging_path: PathBuf,
65 staging_store: Arc<DynObjectStore>,
67 snapshot_store: Arc<DynObjectStore>,
69 interval: Duration,
71 metrics: Arc<StateSnapshotUploaderMetrics>,
72 chain_identifier: ChainIdentifier,
75 archive_interval_epochs: u64,
77}
78
79impl StateSnapshotUploader {
80 pub fn new(
81 db_checkpoint_path: &std::path::Path,
82 staging_path: &std::path::Path,
83 snapshot_store_config: ObjectStoreConfig,
84 interval_s: u64,
85 registry: &Registry,
86 checkpoint_store: Arc<CheckpointStore>,
87 chain_identifier: ChainIdentifier,
88 archive_interval_epochs: u64,
89 ) -> Result<Arc<Self>> {
90 let db_checkpoint_store_config = ObjectStoreConfig {
91 object_store: Some(ObjectStoreType::File),
92 directory: Some(db_checkpoint_path.to_path_buf()),
93 ..Default::default()
94 };
95 let staging_store_config = ObjectStoreConfig {
96 object_store: Some(ObjectStoreType::File),
97 directory: Some(staging_path.to_path_buf()),
98 ..Default::default()
99 };
100 Ok(Arc::new(StateSnapshotUploader {
101 db_checkpoint_path: db_checkpoint_path.to_path_buf(),
102 db_checkpoint_store: db_checkpoint_store_config.make()?,
103 checkpoint_store,
104 staging_path: staging_path.to_path_buf(),
105 staging_store: staging_store_config.make()?,
106 snapshot_store: snapshot_store_config.make()?,
107 interval: Duration::from_secs(interval_s),
108 metrics: StateSnapshotUploaderMetrics::new(registry),
109 chain_identifier,
110 archive_interval_epochs,
111 }))
112 }
113
114 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
115 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
116 tokio::task::spawn(Self::run_upload_loop(self.clone(), kill_sender.subscribe()));
117 tokio::task::spawn(run_manifest_update_loop(
118 self.snapshot_store.clone(),
119 kill_sender.subscribe(),
120 ));
121 kill_sender
122 }
123
124 async fn upload_state_snapshot_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
125 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
126 let local_checkpoints_by_epoch =
127 find_all_dirs_with_epoch_prefix(&self.db_checkpoint_store, None).await?;
128 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
129 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
130 for (epoch, db_path) in dirs {
131 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
132 info!("Starting state snapshot creation for epoch: {}", *epoch);
133 let state_snapshot_writer = StateSnapshotWriterV1::new_from_store(
134 &self.staging_path,
135 &self.staging_store,
136 &self.snapshot_store,
137 FileCompression::Zstd,
138 NonZeroUsize::new(20).unwrap(),
139 )
140 .await?;
141 let db = Arc::new(AuthorityPerpetualTables::open(
142 &path_to_filesystem(self.db_checkpoint_path.clone(), &db_path.child("store"))?,
143 None,
144 None,
145 ));
146 let commitments = self
147 .checkpoint_store
148 .get_epoch_state_commitments(*epoch)
149 .expect("Expected last checkpoint of epoch to have end of epoch data")
150 .expect("Expected end of epoch data to be present");
151 let state_hash_commitment = match commitments
152 .last()
153 .expect("Expected at least one commitment")
154 .clone()
155 {
156 ECMHLiveObjectSetDigest(digest) => digest,
157 _ => return Err(anyhow::anyhow!("Expected ECMHLiveObjectSetDigest")),
158 };
159 state_snapshot_writer
160 .write(*epoch, db, state_hash_commitment, self.chain_identifier)
161 .await?;
162 info!("State snapshot creation successful for epoch: {}", *epoch);
163 let bytes = Bytes::from_static(b"success");
165 let success_marker = db_path.child(SUCCESS_MARKER);
166 put(&self.snapshot_store, &success_marker, bytes.clone()).await?;
167 let bytes = Bytes::from_static(b"success");
168 let state_snapshot_completed_marker =
169 db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
170 put(
171 &self.db_checkpoint_store.clone(),
172 &state_snapshot_completed_marker,
173 bytes.clone(),
174 )
175 .await?;
176 info!("State snapshot completed for epoch: {epoch}");
177
178 if let Err(e) = self.archive_epoch_if_needed(*epoch).await {
180 error!(
181 "Failed to archive epoch {} (non-fatal, continuing): {:?}",
182 epoch, e
183 );
184 }
185 } else {
186 let bytes = Bytes::from_static(b"success");
187 let state_snapshot_completed_marker =
188 db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
189 put(
190 &self.db_checkpoint_store.clone(),
191 &state_snapshot_completed_marker,
192 bytes.clone(),
193 )
194 .await?;
195 info!("State snapshot skipped for epoch: {epoch}");
196 }
197 }
198 Ok(())
199 }
200
201 async fn run_upload_loop(
202 self: Arc<Self>,
203 mut recv: tokio::sync::broadcast::Receiver<()>,
204 ) -> Result<()> {
205 let mut interval = tokio::time::interval(self.interval);
206 info!("State snapshot uploader loop started");
207 loop {
208 tokio::select! {
209 _now = interval.tick() => {
210 let missing_epochs = self.get_missing_epochs().await;
211 match missing_epochs {
212 Ok(epochs) => {
213 let first_missing_epoch = epochs.first().cloned().unwrap_or(0);
214 self.metrics.first_missing_state_snapshot_epoch.set(first_missing_epoch as i64);
215 if let Err(err) = self.upload_state_snapshot_to_object_store(epochs).await {
216 self.metrics.state_snapshot_upload_err.inc();
217 error!("Failed to upload state snapshot to remote store with err: {:?}", err);
218 } else {
219 debug!("Successfully completed snapshot upload loop");
220 }
221 }
222 Err(err) => {
223 error!("Failed to find missing state snapshot in remote store: {:?}", err);
224 }
225 }
226 },
227 _ = recv.recv() => break,
228 }
229 }
230 Ok(())
231 }
232
233 async fn get_missing_epochs(&self) -> Result<Vec<u64>> {
234 let missing_epochs = find_missing_epochs_dirs(&self.snapshot_store, SUCCESS_MARKER).await?;
235 Ok(missing_epochs.to_vec())
236 }
237
238 pub(crate) async fn archive_epoch_if_needed(&self, epoch: u64) -> Result<()> {
239 if self.archive_interval_epochs == 0 {
240 return Ok(());
241 }
242
243 if !epoch.is_multiple_of(self.archive_interval_epochs) {
244 debug!(
245 "Epoch {} is not divisible by archive interval {}, skipping archival",
246 epoch, self.archive_interval_epochs
247 );
248 return Ok(());
249 }
250
251 info!(
252 "Epoch {} is divisible by {}, archiving to archive/ subdirectory",
253 epoch, self.archive_interval_epochs
254 );
255
256 let source_prefix = object_store::path::Path::from(format!("epoch_{}", epoch));
257
258 info!("Listing files in {} for archival", source_prefix);
259
260 let mut paths = self.snapshot_store.list_objects(Some(&source_prefix)).await;
261 let mut files_copied = 0;
262
263 while let Some(res) = paths.next().await {
264 match res {
265 Ok(object_metadata) => {
266 let source_path = &object_metadata.location;
267 let relative_path = source_path
268 .as_ref()
269 .strip_prefix(&format!("epoch_{}/", epoch))
270 .unwrap_or(source_path.as_ref());
271 let dest_path = object_store::path::Path::from(format!(
272 "archive/epoch_{}/{}",
273 epoch, relative_path
274 ));
275
276 debug!("Copying {} to {}", source_path, dest_path);
277
278 let bytes = get(&self.snapshot_store, source_path).await?;
279 put(&self.snapshot_store, &dest_path, bytes).await?;
280
281 files_copied += 1;
282 }
283 Err(e) => {
284 error!("Failed to list objects for archival: {:?}", e);
285 return Err(e.into());
286 }
287 }
288 }
289
290 info!(
291 "Successfully archived epoch {} ({} files copied to archive/epoch_{})",
292 epoch, files_copied, epoch
293 );
294 Ok(())
295 }
296}