sui_core/
rpc_store_restore_source.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! [`RestoreSource`] backed by a validator's
5//! [`AuthorityPerpetualTables`].
6//!
7//! Streams every `LiveObject::Normal` in the perpetual store
8//! into the `sui-consistent-store` restore driver, sharded by
9//! `ObjectID` prefix so multiple shards can iterate in parallel.
10//!
11//! # Sharding
12//!
13//! The `ObjectID` space is split into 32 shards by the top
14//! `SHARD_BITS = 5` bits of the first byte (matching the split
15//! used by `par_index_live_object_set`).
16//! Each shard yields chunks of [`CHUNK_SIZE`] objects; the
17//! `RestoreChunk::cursor` is the 32-byte ObjectID of the last
18//! object in that chunk, so resuming with `Some(c)` starts the
19//! next iteration immediately after that id.
20//!
21//! # Snapshot consistency
22//!
23//! Each shard's stream opens exactly one RocksDB iterator and
24//! drives it to completion from a single `spawn_blocking` task,
25//! pushing chunks back over a tokio mpsc. RocksDB iterators
26//! created without an explicit snapshot implicitly pin one at
27//! construction time, so a shard sees a single point-in-time
28//! view for its full run — including the merge-based `balance`
29//! pipeline, which is safe against concurrent execution.
30//!
31//! Different shards take their snapshots at the moments their
32//! `spawn_blocking` tasks start, so cross-shard skew can still
33//! exist if the validator commits between shard launches. This
34//! does not affect any of the `sui-rpc-store` pipelines because
35//! every object lives in exactly one shard.
36//!
37//! A side-effect of holding open one iterator per shard for the
38//! full restore is that the SSTs it references stay pinned and
39//! cannot compact away for the duration. That is acceptable for
40//! a one-shot bootstrap.
41
42use std::sync::Arc;
43
44use async_trait::async_trait;
45use bytes::Bytes;
46use futures::StreamExt;
47use futures::stream;
48use futures::stream::BoxStream;
49use sui_consistent_store::ChainId;
50use sui_consistent_store::restore::RestoreChunk;
51use sui_consistent_store::restore::RestoreSource;
52use sui_types::base_types::ObjectID;
53use sui_types::object::Object;
54use tokio::sync::mpsc;
55use tokio_stream::wrappers::ReceiverStream;
56
57use crate::authority::authority_store_tables::AuthorityPerpetualTables;
58use crate::authority::authority_store_tables::LiveObject;
59
60/// Bits of the first `ObjectID` byte used to choose a shard.
61/// `1 << SHARD_BITS` shards, matching the constant in
62/// `par_index_live_object_set`.
63const SHARD_BITS: u32 = 5;
64
65/// Total number of shards (`1 << SHARD_BITS`).
66const SHARDS: u32 = 1 << SHARD_BITS;
67
68/// Bit shift placing the shard id in the high bits of the first
69/// `ObjectID` byte.
70const SHARD_PREFIX_SHIFT: u32 = 8 - SHARD_BITS;
71
72/// Default objects per [`RestoreChunk`]. Tuned to keep the
73/// per-pipeline batch comfortably under a few MB of writes
74/// while still amortising the per-chunk commit overhead.
75pub const CHUNK_SIZE: usize = 50_000;
76
77/// [`RestoreSource`] over an
78/// [`AuthorityPerpetualTables`]. Construct via
79/// [`PerpetualStoreRestoreSource::new`].
80pub struct PerpetualStoreRestoreSource {
81    perpetual: Arc<AuthorityPerpetualTables>,
82    target_checkpoint: u64,
83    chain_id: ChainId,
84    chunk_size: usize,
85}
86
87impl PerpetualStoreRestoreSource {
88    /// Build a source rooted at `perpetual`, anchored at
89    /// `target_checkpoint` and `chain_id`. Tip indexing will
90    /// resume at `target_checkpoint + 1` once restore finishes
91    /// — pick the highest executed checkpoint the perpetual
92    /// store has seen at restore time. `chain_id` is pinned
93    /// into `__chain_id` on finalise so subsequent tip
94    /// indexing refuses checkpoints from the wrong chain.
95    pub fn new(
96        perpetual: Arc<AuthorityPerpetualTables>,
97        target_checkpoint: u64,
98        chain_id: ChainId,
99    ) -> Self {
100        Self {
101            perpetual,
102            target_checkpoint,
103            chain_id,
104            chunk_size: CHUNK_SIZE,
105        }
106    }
107
108    /// Override the per-chunk object count. Useful for tests
109    /// that want to exercise multi-chunk shards without
110    /// materialising 50k objects.
111    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
112        assert!(chunk_size > 0, "chunk_size must be > 0");
113        self.chunk_size = chunk_size;
114        self
115    }
116}
117
118/// Inclusive `[start, end]` `ObjectID` range covered by `shard_id`.
119fn shard_range(shard_id: u32) -> (ObjectID, ObjectID) {
120    let prefix = (shard_id as u8) << SHARD_PREFIX_SHIFT;
121    let mut start = [0u8; ObjectID::LENGTH];
122    start[0] = prefix;
123    let mut end = [0xffu8; ObjectID::LENGTH];
124    end[0] = prefix | ((1 << SHARD_PREFIX_SHIFT) - 1);
125    (ObjectID::new(start), ObjectID::new(end))
126}
127
128/// Increment `id` as a 256-bit big-endian integer, returning
129/// `None` on overflow.
130fn next_id(id: ObjectID) -> Option<ObjectID> {
131    let mut bytes = id.into_bytes();
132    for byte in bytes.iter_mut().rev() {
133        if *byte == 0xff {
134            *byte = 0;
135        } else {
136            *byte += 1;
137            return Some(ObjectID::new(bytes));
138        }
139    }
140    None
141}
142
143#[async_trait]
144impl RestoreSource for PerpetualStoreRestoreSource {
145    fn target_checkpoint(&self) -> u64 {
146        self.target_checkpoint
147    }
148
149    fn target_chain_id(&self) -> ChainId {
150        self.chain_id
151    }
152
153    fn shards(&self) -> u32 {
154        SHARDS
155    }
156
157    fn stream(
158        &self,
159        shard_id: u32,
160        cursor: Option<Bytes>,
161    ) -> BoxStream<'_, anyhow::Result<RestoreChunk>> {
162        let (shard_start, shard_end) = shard_range(shard_id);
163
164        let start_id = match cursor {
165            None => Some(shard_start),
166            Some(bytes) => match ObjectID::from_bytes(&bytes[..]) {
167                Ok(id) => next_id(id).filter(|n| *n <= shard_end),
168                Err(e) => {
169                    return stream::once(async move {
170                        Err(anyhow::anyhow!("invalid perpetual-store cursor: {e}"))
171                    })
172                    .boxed();
173                }
174            },
175        };
176
177        let Some(start_id) = start_id else {
178            return stream::empty().boxed();
179        };
180
181        // Bounded mpsc applies backpressure on the iterator
182        // task so it pauses when the driver hasn't committed
183        // the previous chunk yet.
184        let (tx, rx) = mpsc::channel::<anyhow::Result<RestoreChunk>>(2);
185        let perpetual = self.perpetual.clone();
186        let chunk_size = self.chunk_size;
187
188        tokio::task::spawn_blocking(move || {
189            iterate_shard(perpetual, start_id, shard_end, chunk_size, tx);
190        });
191
192        ReceiverStream::new(rx).boxed()
193    }
194}
195
196/// Drive one shard's iteration end-to-end in a single
197/// `spawn_blocking` task.
198///
199/// Opens exactly one `range_iter_live_object_set` and pushes
200/// chunks of up to `chunk_size` `LiveObject::Normal` rows over
201/// `tx`. The iterator's implicit RocksDB snapshot is held for
202/// the lifetime of this function, so the whole shard observes
203/// a single point-in-time view of the perpetual store.
204///
205/// Returns early without sending anything if the receiver is
206/// dropped (e.g. the driver was cancelled).
207fn iterate_shard(
208    perpetual: Arc<AuthorityPerpetualTables>,
209    start_id: ObjectID,
210    shard_end: ObjectID,
211    chunk_size: usize,
212    tx: mpsc::Sender<anyhow::Result<RestoreChunk>>,
213) {
214    let iter = perpetual.range_iter_live_object_set(Some(start_id), Some(shard_end), false);
215    let mut buffer: Vec<Object> = Vec::with_capacity(chunk_size.min(1024));
216
217    for live in iter {
218        let LiveObject::Normal(obj) = live else {
219            continue;
220        };
221        buffer.push(obj);
222        if buffer.len() >= chunk_size {
223            let chunk = std::mem::replace(&mut buffer, Vec::with_capacity(chunk_size.min(1024)));
224            if send_chunk(&tx, chunk).is_err() {
225                return;
226            }
227        }
228    }
229
230    if !buffer.is_empty() {
231        let _ = send_chunk(&tx, buffer);
232    }
233}
234
235/// Wrap `objects` in a [`RestoreChunk`] (cursor = last object's
236/// id) and blocking-send it. Returns `Err(())` if the receiver
237/// is closed so the caller can stop iterating.
238fn send_chunk(
239    tx: &mpsc::Sender<anyhow::Result<RestoreChunk>>,
240    objects: Vec<Object>,
241) -> Result<(), ()> {
242    let last_id = objects.last().expect("non-empty chunk").id();
243    let chunk = RestoreChunk {
244        objects,
245        cursor: Bytes::copy_from_slice(&last_id.into_bytes()),
246    };
247    tx.blocking_send(Ok(chunk)).map_err(|_| ())
248}
249
250#[cfg(test)]
251mod tests {
252    use std::collections::BTreeSet;
253
254    use tempfile::TempDir;
255
256    use super::*;
257
258    fn open_perpetual() -> (TempDir, Arc<AuthorityPerpetualTables>) {
259        let dir = TempDir::new().unwrap();
260        let perpetual = Arc::new(AuthorityPerpetualTables::open(dir.path(), None, None));
261        (dir, perpetual)
262    }
263
264    fn obj_with_first_byte(first: u8, last: u8) -> Object {
265        let mut bytes = [0u8; ObjectID::LENGTH];
266        bytes[0] = first;
267        bytes[ObjectID::LENGTH - 1] = last;
268        Object::immutable_with_id_for_testing(ObjectID::new(bytes))
269    }
270
271    /// Hand-pick a representative shard and verify the shard
272    /// range covers the right ObjectID prefixes.
273    #[test]
274    fn shard_range_covers_correct_prefixes() {
275        let (s0, e0) = shard_range(0);
276        assert_eq!(s0.into_bytes()[0], 0x00);
277        assert_eq!(e0.into_bytes()[0], 0x07);
278
279        let (s1, e1) = shard_range(1);
280        assert_eq!(s1.into_bytes()[0], 0x08);
281        assert_eq!(e1.into_bytes()[0], 0x0F);
282
283        let (s31, e31) = shard_range(31);
284        assert_eq!(s31.into_bytes()[0], 0xF8);
285        assert_eq!(e31.into_bytes()[0], 0xFF);
286        // Last byte of the upper bound is 0xFF.
287        assert_eq!(e31.into_bytes()[ObjectID::LENGTH - 1], 0xFF);
288    }
289
290    #[test]
291    fn next_id_increments_with_carry() {
292        let mut bytes = [0u8; ObjectID::LENGTH];
293        bytes[ObjectID::LENGTH - 1] = 0xff;
294        bytes[ObjectID::LENGTH - 2] = 0x01;
295        let inc = next_id(ObjectID::new(bytes)).unwrap().into_bytes();
296        let mut expected = [0u8; ObjectID::LENGTH];
297        expected[ObjectID::LENGTH - 1] = 0x00;
298        expected[ObjectID::LENGTH - 2] = 0x02;
299        assert_eq!(inc, expected);
300    }
301
302    #[test]
303    fn next_id_overflow_returns_none() {
304        let max = ObjectID::new([0xff; ObjectID::LENGTH]);
305        assert_eq!(next_id(max), None);
306    }
307
308    /// End-to-end smoke: seed objects across two shards, drain
309    /// every shard's stream, confirm every object lands exactly
310    /// once and shard boundaries are respected.
311    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
312    async fn streams_objects_across_shards() {
313        let (_dir, perpetual) = open_perpetual();
314
315        // Insert four objects across shard 0 (first byte in
316        // 0x00..=0x07) and shard 1 (0x08..=0x0F).
317        let inserted: Vec<Object> = [(0x01, 0xaa), (0x05, 0xbb), (0x0a, 0xcc), (0x0f, 0xdd)]
318            .into_iter()
319            .map(|(first, last)| obj_with_first_byte(first, last))
320            .collect();
321        for o in &inserted {
322            perpetual.insert_object_test_only(o.clone()).unwrap();
323        }
324
325        let source = PerpetualStoreRestoreSource::new(perpetual.clone(), 7, ChainId([9u8; 32]))
326            .with_chunk_size(1);
327        assert_eq!(source.target_checkpoint(), 7);
328        assert_eq!(source.shards(), SHARDS);
329
330        // Drain shard 0 and shard 1; assert every other shard is empty.
331        let mut got = BTreeSet::new();
332        for shard in 0..SHARDS {
333            let mut stream = source.stream(shard, None);
334            while let Some(chunk) = stream.next().await {
335                let chunk = chunk.unwrap();
336                for o in chunk.objects {
337                    got.insert(o.id());
338                }
339            }
340        }
341        let want: BTreeSet<_> = inserted.iter().map(|o| o.id()).collect();
342        assert_eq!(got, want);
343    }
344
345    /// Resume from a cursor that points at the first object in
346    /// a shard and confirm the second object (and only the
347    /// second) is yielded.
348    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
349    async fn resume_from_cursor_skips_already_yielded() {
350        let (_dir, perpetual) = open_perpetual();
351
352        let a = obj_with_first_byte(0x01, 0x10);
353        let b = obj_with_first_byte(0x01, 0x20);
354        perpetual.insert_object_test_only(a.clone()).unwrap();
355        perpetual.insert_object_test_only(b.clone()).unwrap();
356
357        // Shard 0 covers first byte 0x00..=0x07, so both
358        // objects live there.
359        let source = PerpetualStoreRestoreSource::new(perpetual.clone(), 0, ChainId([0u8; 32]));
360        let cursor = Bytes::copy_from_slice(&a.id().into_bytes());
361        let mut stream = source.stream(0, Some(cursor));
362        let mut yielded = Vec::new();
363        while let Some(chunk) = stream.next().await {
364            for o in chunk.unwrap().objects {
365                yielded.push(o.id());
366            }
367        }
368        assert_eq!(yielded, vec![b.id()]);
369    }
370}