sui_core/
rpc_store_restore_source.rs1use std::sync::Arc;
43
44use async_trait::async_trait;
45use bytes::Bytes;
46use futures::StreamExt;
47use futures::stream;
48use futures::stream::BoxStream;
49use sui_consistent_store::ChainId;
50use sui_consistent_store::restore::RestoreChunk;
51use sui_consistent_store::restore::RestoreSource;
52use sui_types::base_types::ObjectID;
53use sui_types::object::Object;
54use tokio::sync::mpsc;
55use tokio_stream::wrappers::ReceiverStream;
56
57use crate::authority::authority_store_tables::AuthorityPerpetualTables;
58use crate::authority::authority_store_tables::LiveObject;
59
60const SHARD_BITS: u32 = 5;
64
65const SHARDS: u32 = 1 << SHARD_BITS;
67
68const SHARD_PREFIX_SHIFT: u32 = 8 - SHARD_BITS;
71
72pub const CHUNK_SIZE: usize = 50_000;
76
77pub struct PerpetualStoreRestoreSource {
81 perpetual: Arc<AuthorityPerpetualTables>,
82 target_checkpoint: u64,
83 chain_id: ChainId,
84 chunk_size: usize,
85}
86
87impl PerpetualStoreRestoreSource {
88 pub fn new(
96 perpetual: Arc<AuthorityPerpetualTables>,
97 target_checkpoint: u64,
98 chain_id: ChainId,
99 ) -> Self {
100 Self {
101 perpetual,
102 target_checkpoint,
103 chain_id,
104 chunk_size: CHUNK_SIZE,
105 }
106 }
107
108 pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
112 assert!(chunk_size > 0, "chunk_size must be > 0");
113 self.chunk_size = chunk_size;
114 self
115 }
116}
117
118fn shard_range(shard_id: u32) -> (ObjectID, ObjectID) {
120 let prefix = (shard_id as u8) << SHARD_PREFIX_SHIFT;
121 let mut start = [0u8; ObjectID::LENGTH];
122 start[0] = prefix;
123 let mut end = [0xffu8; ObjectID::LENGTH];
124 end[0] = prefix | ((1 << SHARD_PREFIX_SHIFT) - 1);
125 (ObjectID::new(start), ObjectID::new(end))
126}
127
128fn next_id(id: ObjectID) -> Option<ObjectID> {
131 let mut bytes = id.into_bytes();
132 for byte in bytes.iter_mut().rev() {
133 if *byte == 0xff {
134 *byte = 0;
135 } else {
136 *byte += 1;
137 return Some(ObjectID::new(bytes));
138 }
139 }
140 None
141}
142
143#[async_trait]
144impl RestoreSource for PerpetualStoreRestoreSource {
145 fn target_checkpoint(&self) -> u64 {
146 self.target_checkpoint
147 }
148
149 fn target_chain_id(&self) -> ChainId {
150 self.chain_id
151 }
152
153 fn shards(&self) -> u32 {
154 SHARDS
155 }
156
157 fn stream(
158 &self,
159 shard_id: u32,
160 cursor: Option<Bytes>,
161 ) -> BoxStream<'_, anyhow::Result<RestoreChunk>> {
162 let (shard_start, shard_end) = shard_range(shard_id);
163
164 let start_id = match cursor {
165 None => Some(shard_start),
166 Some(bytes) => match ObjectID::from_bytes(&bytes[..]) {
167 Ok(id) => next_id(id).filter(|n| *n <= shard_end),
168 Err(e) => {
169 return stream::once(async move {
170 Err(anyhow::anyhow!("invalid perpetual-store cursor: {e}"))
171 })
172 .boxed();
173 }
174 },
175 };
176
177 let Some(start_id) = start_id else {
178 return stream::empty().boxed();
179 };
180
181 let (tx, rx) = mpsc::channel::<anyhow::Result<RestoreChunk>>(2);
185 let perpetual = self.perpetual.clone();
186 let chunk_size = self.chunk_size;
187
188 tokio::task::spawn_blocking(move || {
189 iterate_shard(perpetual, start_id, shard_end, chunk_size, tx);
190 });
191
192 ReceiverStream::new(rx).boxed()
193 }
194}
195
196fn iterate_shard(
208 perpetual: Arc<AuthorityPerpetualTables>,
209 start_id: ObjectID,
210 shard_end: ObjectID,
211 chunk_size: usize,
212 tx: mpsc::Sender<anyhow::Result<RestoreChunk>>,
213) {
214 let iter = perpetual.range_iter_live_object_set(Some(start_id), Some(shard_end), false);
215 let mut buffer: Vec<Object> = Vec::with_capacity(chunk_size.min(1024));
216
217 for live in iter {
218 let LiveObject::Normal(obj) = live else {
219 continue;
220 };
221 buffer.push(obj);
222 if buffer.len() >= chunk_size {
223 let chunk = std::mem::replace(&mut buffer, Vec::with_capacity(chunk_size.min(1024)));
224 if send_chunk(&tx, chunk).is_err() {
225 return;
226 }
227 }
228 }
229
230 if !buffer.is_empty() {
231 let _ = send_chunk(&tx, buffer);
232 }
233}
234
235fn send_chunk(
239 tx: &mpsc::Sender<anyhow::Result<RestoreChunk>>,
240 objects: Vec<Object>,
241) -> Result<(), ()> {
242 let last_id = objects.last().expect("non-empty chunk").id();
243 let chunk = RestoreChunk {
244 objects,
245 cursor: Bytes::copy_from_slice(&last_id.into_bytes()),
246 };
247 tx.blocking_send(Ok(chunk)).map_err(|_| ())
248}
249
250#[cfg(test)]
251mod tests {
252 use std::collections::BTreeSet;
253
254 use tempfile::TempDir;
255
256 use super::*;
257
258 fn open_perpetual() -> (TempDir, Arc<AuthorityPerpetualTables>) {
259 let dir = TempDir::new().unwrap();
260 let perpetual = Arc::new(AuthorityPerpetualTables::open(dir.path(), None, None));
261 (dir, perpetual)
262 }
263
264 fn obj_with_first_byte(first: u8, last: u8) -> Object {
265 let mut bytes = [0u8; ObjectID::LENGTH];
266 bytes[0] = first;
267 bytes[ObjectID::LENGTH - 1] = last;
268 Object::immutable_with_id_for_testing(ObjectID::new(bytes))
269 }
270
271 #[test]
274 fn shard_range_covers_correct_prefixes() {
275 let (s0, e0) = shard_range(0);
276 assert_eq!(s0.into_bytes()[0], 0x00);
277 assert_eq!(e0.into_bytes()[0], 0x07);
278
279 let (s1, e1) = shard_range(1);
280 assert_eq!(s1.into_bytes()[0], 0x08);
281 assert_eq!(e1.into_bytes()[0], 0x0F);
282
283 let (s31, e31) = shard_range(31);
284 assert_eq!(s31.into_bytes()[0], 0xF8);
285 assert_eq!(e31.into_bytes()[0], 0xFF);
286 assert_eq!(e31.into_bytes()[ObjectID::LENGTH - 1], 0xFF);
288 }
289
290 #[test]
291 fn next_id_increments_with_carry() {
292 let mut bytes = [0u8; ObjectID::LENGTH];
293 bytes[ObjectID::LENGTH - 1] = 0xff;
294 bytes[ObjectID::LENGTH - 2] = 0x01;
295 let inc = next_id(ObjectID::new(bytes)).unwrap().into_bytes();
296 let mut expected = [0u8; ObjectID::LENGTH];
297 expected[ObjectID::LENGTH - 1] = 0x00;
298 expected[ObjectID::LENGTH - 2] = 0x02;
299 assert_eq!(inc, expected);
300 }
301
302 #[test]
303 fn next_id_overflow_returns_none() {
304 let max = ObjectID::new([0xff; ObjectID::LENGTH]);
305 assert_eq!(next_id(max), None);
306 }
307
308 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
312 async fn streams_objects_across_shards() {
313 let (_dir, perpetual) = open_perpetual();
314
315 let inserted: Vec<Object> = [(0x01, 0xaa), (0x05, 0xbb), (0x0a, 0xcc), (0x0f, 0xdd)]
318 .into_iter()
319 .map(|(first, last)| obj_with_first_byte(first, last))
320 .collect();
321 for o in &inserted {
322 perpetual.insert_object_test_only(o.clone()).unwrap();
323 }
324
325 let source = PerpetualStoreRestoreSource::new(perpetual.clone(), 7, ChainId([9u8; 32]))
326 .with_chunk_size(1);
327 assert_eq!(source.target_checkpoint(), 7);
328 assert_eq!(source.shards(), SHARDS);
329
330 let mut got = BTreeSet::new();
332 for shard in 0..SHARDS {
333 let mut stream = source.stream(shard, None);
334 while let Some(chunk) = stream.next().await {
335 let chunk = chunk.unwrap();
336 for o in chunk.objects {
337 got.insert(o.id());
338 }
339 }
340 }
341 let want: BTreeSet<_> = inserted.iter().map(|o| o.id()).collect();
342 assert_eq!(got, want);
343 }
344
345 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
349 async fn resume_from_cursor_skips_already_yielded() {
350 let (_dir, perpetual) = open_perpetual();
351
352 let a = obj_with_first_byte(0x01, 0x10);
353 let b = obj_with_first_byte(0x01, 0x20);
354 perpetual.insert_object_test_only(a.clone()).unwrap();
355 perpetual.insert_object_test_only(b.clone()).unwrap();
356
357 let source = PerpetualStoreRestoreSource::new(perpetual.clone(), 0, ChainId([0u8; 32]));
360 let cursor = Bytes::copy_from_slice(&a.id().into_bytes());
361 let mut stream = source.stream(0, Some(cursor));
362 let mut yielded = Vec::new();
363 while let Some(chunk) = stream.next().await {
364 for o in chunk.unwrap().objects {
365 yielded.push(o.id());
366 }
367 }
368 assert_eq!(yielded, vec![b.id()]);
369 }
370}