Skip to main content

sui_rpc/light_client/
ratchet.rs

1//! Committee ratchet driver.
2//!
3//! [`ratchet_to_checkpoint`] is the entry point. Given a [`Client`] and an
4//! [`EpochCache`], it advances the cache forward until the cache covers the
5//! requested target checkpoint sequence number — in three phases:
6//!
7//! 1. **Discover** the set of epochs the cache needs to advance through by
8//!    walking `LedgerService.GetEpoch` calls sequentially. The discovery
9//!    phase reads only `["last_checkpoint"]` (no signature verification)
10//!    and terminates as soon as it finds an epoch that either is still
11//!    ongoing on the server side or whose last checkpoint covers the
12//!    target.
13//! 2. **Fetch** the end-of-epoch checkpoint summaries for the discovered
14//!    epochs in parallel via `LedgerService.GetCheckpoint`. The
15//!    concurrency is capped by [`RatchetConfig::concurrency`]. Each
16//!    summary comes back with its aggregate signature.
17//! 3. **Apply** each fetched summary sequentially: BLS-verify the
18//!    signature against the cache's current committee, extract the next
19//!    epoch's committee from the summary's `end_of_epoch_data`, and apply
20//!    the ratchet update. Sequencing here is load-bearing — each summary
21//!    is verified against the committee that the *previous* summary just
22//!    installed.
23//!
24//! Splitting discovery from fetch lets a fresh ratchet across N epochs
25//! incur roughly `N + N/concurrency` round trips of latency rather than
26//! `2N` serial round trips. Discovery stays cheap; the heavy
27//! summary-fetch round trips are pipelined.
28
29use sui_crypto::bls12381::ValidatorCommitteeSignatureVerifier;
30use sui_sdk_types::CheckpointSummary;
31use sui_sdk_types::ValidatorAggregatedSignature;
32use sui_sdk_types::ValidatorCommittee;
33
34use crate::Client;
35use crate::field::FieldMask;
36use crate::field::FieldMaskUtil;
37use crate::proto::sui::rpc::v2::GetCheckpointRequest;
38use crate::proto::sui::rpc::v2::GetEpochRequest;
39
40use super::EpochCache;
41use super::RatchetConfig;
42use super::error::LightClientError;
43use super::retry;
44
45/// Advance `cache` forward so that it covers `target_seq`, with the
46/// default [`RatchetConfig`] and no archive endpoint.
47pub async fn ratchet_to_checkpoint(
48    client: &mut Client,
49    cache: &mut EpochCache,
50    target_seq: u64,
51) -> Result<(), LightClientError> {
52    ratchet_to_checkpoint_with_config(client, None, cache, target_seq, &RatchetConfig::default())
53        .await
54}
55
56/// Advance `cache` forward so that it covers `target_seq`.
57///
58/// If `archive` is provided the ratchet uses it for historical reads
59/// and falls back to `fullnode` on archive misses or errors. On
60/// return, the cache holds the committee for the epoch that contains
61/// `target_seq` (subject to the server returning truthful epoch
62/// boundaries; the cache independently BLS-verifies each end-of-epoch
63/// summary it accepts).
64pub async fn ratchet_to_checkpoint_with_config(
65    fullnode: &mut Client,
66    archive: Option<&mut Client>,
67    cache: &mut EpochCache,
68    target_seq: u64,
69    config: &RatchetConfig,
70) -> Result<(), LightClientError> {
71    // The discovery and parallel-fetch helpers both need to consult
72    // the archive on a per-call basis. Take it as `&mut` reborrowed
73    // through `Option::as_deref_mut` so we can split it across phases.
74    let mut archive = archive;
75    let to_advance = discover_epochs_to_advance(
76        fullnode,
77        archive.as_deref_mut(),
78        cache.current_epoch(),
79        target_seq,
80        config,
81    )
82    .await?;
83    if to_advance.is_empty() {
84        return Ok(());
85    }
86
87    let fetched = fetch_end_of_epoch_summaries(fullnode, archive, &to_advance, config).await?;
88
89    for (end_seq, summary, signature) in fetched {
90        apply_verified_end_of_epoch(cache, end_seq, &summary, &signature)?;
91    }
92    Ok(())
93}
94
95/// One discovered epoch to ratchet through: the epoch's index, and the
96/// sequence number of its last checkpoint (the one whose summary needs
97/// fetching to install the next epoch's committee).
98#[derive(Debug, Clone, Copy)]
99struct EpochToAdvance {
100    epoch: u64,
101    end_of_epoch_seq: u64,
102}
103
104/// Walk `GetEpoch` calls forward from `start_epoch` to determine which
105/// epochs need ratcheting through to cover `target_seq`.
106///
107/// Returns the discovered epochs in ascending order. An empty result
108/// means the cache's current epoch already covers `target_seq` (either
109/// because it is still ongoing on the server side, or because it has
110/// already ended at a checkpoint at or after `target_seq`).
111///
112/// The walk is capped at `max_gap` discovered epochs. Hitting the cap
113/// is a strong signal that the server is misbehaving — either it
114/// genuinely thinks the chain is that far ahead of the cache (in
115/// which case the client should bootstrap from a more recent trust
116/// anchor instead of ratcheting forever) or it is malicious. Either
117/// way, returning [`LightClientError::RatchetGapTooLarge`] is more
118/// useful than continuing to issue `GetEpoch` calls.
119async fn discover_epochs_to_advance(
120    fullnode: &mut Client,
121    mut archive: Option<&mut Client>,
122    start_epoch: u64,
123    target_seq: u64,
124    config: &RatchetConfig,
125) -> Result<Vec<EpochToAdvance>, LightClientError> {
126    let mut to_advance = Vec::new();
127    let mut epoch_number = start_epoch;
128    loop {
129        if to_advance.len() as u64 >= config.max_ratchet_gap {
130            return Err(LightClientError::RatchetGapTooLarge {
131                current: start_epoch,
132                target: epoch_number,
133                max: config.max_ratchet_gap,
134            });
135        }
136
137        let epoch = fetch_one_epoch(fullnode, archive.as_deref_mut(), epoch_number, config).await?;
138
139        let Some(last_of_epoch) = epoch.last_checkpoint else {
140            // Epoch is still ongoing on the server side; everything
141            // beyond it is unreachable.
142            break;
143        };
144
145        if last_of_epoch >= target_seq {
146            // This epoch covers the target; no further advance needed.
147            break;
148        }
149
150        to_advance.push(EpochToAdvance {
151            epoch: epoch_number,
152            end_of_epoch_seq: last_of_epoch,
153        });
154        epoch_number += 1;
155    }
156    Ok(to_advance)
157}
158
159/// Fetch a single `Epoch` proto for `epoch_number` with the
160/// archive-first / fullnode-fallback policy.
161///
162/// The archive is tried with a single attempt — no retry, no
163/// signature checks. Any failure (NotFound, RPC error, or a response
164/// missing `last_checkpoint`) is treated as an archive miss and the
165/// fullnode is consulted. Fullnode failures retry per
166/// [`RatchetConfig::max_retries`]; a sustained `NotFound` from the
167/// fullnode surfaces as [`LightClientError::EpochNotFound`].
168async fn fetch_one_epoch(
169    fullnode: &mut Client,
170    archive: Option<&mut Client>,
171    epoch_number: u64,
172    config: &RatchetConfig,
173) -> Result<crate::proto::sui::rpc::v2::Epoch, LightClientError> {
174    if let Some(archive) = archive {
175        let request = GetEpochRequest::new(epoch_number)
176            .with_read_mask(FieldMask::from_paths(["last_checkpoint"]));
177        if let Ok(resp) = archive.ledger_client().get_epoch(request).await
178            && let Some(epoch) = resp.into_inner().epoch
179            && epoch.last_checkpoint.is_some()
180        {
181            return Ok(epoch);
182        }
183        // Fall through to the fullnode on any archive miss or
184        // missing-data case.
185    }
186
187    let mut attempt: u32 = 0;
188    loop {
189        let request = GetEpochRequest::new(epoch_number)
190            .with_read_mask(FieldMask::from_paths(["last_checkpoint"]));
191        match fullnode.ledger_client().get_epoch(request).await {
192            Ok(resp) => {
193                return resp.into_inner().epoch.ok_or_else(|| {
194                    LightClientError::Proto(crate::proto::TryFromProtoError::missing("epoch"))
195                });
196            }
197            Err(status) if status.code() == tonic::Code::NotFound => {
198                return Err(LightClientError::EpochNotFound {
199                    epoch: epoch_number,
200                });
201            }
202            Err(status) => {
203                retry::step(config, LightClientError::Rpc(status), &mut attempt).await?;
204            }
205        }
206    }
207}
208
209/// Fetch the end-of-epoch checkpoint summaries for `to_advance` in
210/// parallel, returning them in ascending epoch order.
211///
212/// Each future fetches `["summary.bcs", "signature"]` and BCS-decodes
213/// the summary directly, skipping the proto-to-SDK conversion layer.
214/// When `archive` is provided each future tries it first (single
215/// attempt) before falling back to the fullnode (with retry).
216async fn fetch_end_of_epoch_summaries(
217    fullnode: &mut Client,
218    archive: Option<&mut Client>,
219    to_advance: &[EpochToAdvance],
220    config: &RatchetConfig,
221) -> Result<Vec<(u64, CheckpointSummary, ValidatorAggregatedSignature)>, LightClientError> {
222    use futures::stream::StreamExt;
223    use futures::stream::TryStreamExt;
224
225    // Build per-task `LedgerServiceClient`s up front so the futures
226    // can run concurrently without re-borrowing `&mut Client`. Each
227    // future owns its own fullnode ledger and, when an archive is
228    // configured, its own archive ledger.
229    let fullnode_clients: Vec<_> = (0..to_advance.len())
230        .map(|_| fullnode.ledger_client())
231        .collect();
232    let archive_clients: Vec<_> = match archive {
233        Some(archive) => (0..to_advance.len())
234            .map(|_| Some(archive.ledger_client()))
235            .collect(),
236        None => (0..to_advance.len()).map(|_| None).collect(),
237    };
238
239    let concurrency = config.concurrency.max(1);
240    let futures = to_advance
241        .iter()
242        .copied()
243        .zip(fullnode_clients.into_iter().zip(archive_clients))
244        .map(|(item, (mut fullnode_ledger, archive_ledger))| async move {
245            // Archive first: a single attempt, no retry. Any failure
246            // — NotFound, RPC error, or anything else — falls through
247            // to the fullnode.
248            let mut response = None;
249            if let Some(mut archive_ledger) = archive_ledger {
250                let request = GetCheckpointRequest::by_sequence_number(item.end_of_epoch_seq)
251                    .with_read_mask(FieldMask::from_paths(["summary.bcs", "signature"]));
252                if let Ok(resp) = archive_ledger.get_checkpoint(request).await {
253                    response = Some(resp.into_inner());
254                }
255            }
256
257            let response = if let Some(resp) = response {
258                resp
259            } else {
260                // Fullnode with retry. `LedgerServiceClient<BoxedChannel>`
261                // isn't `Clone` (its `BoxService` backend isn't), so
262                // we can't drop the ledger into a `FnMut`-callable
263                // closure; inline the loop.
264                let mut attempt: u32 = 0;
265                loop {
266                    let request = GetCheckpointRequest::by_sequence_number(item.end_of_epoch_seq)
267                        .with_read_mask(FieldMask::from_paths(["summary.bcs", "signature"]));
268                    match fullnode_ledger.get_checkpoint(request).await {
269                        Ok(resp) => break resp.into_inner(),
270                        Err(status) if status.code() == tonic::Code::NotFound => {
271                            return Err(LightClientError::EpochNotFound { epoch: item.epoch });
272                        }
273                        Err(status) => {
274                            retry::step(config, LightClientError::Rpc(status), &mut attempt)
275                                .await?;
276                        }
277                    }
278                }
279            };
280            let checkpoint = response.checkpoint.ok_or_else(|| {
281                LightClientError::Proto(crate::proto::TryFromProtoError::missing("checkpoint"))
282            })?;
283            let summary_bcs = checkpoint
284                .summary
285                .as_ref()
286                .and_then(|s| s.bcs.as_ref())
287                .ok_or_else(|| {
288                    LightClientError::Proto(crate::proto::TryFromProtoError::missing("summary.bcs"))
289                })?;
290            let signature_proto = checkpoint.signature.as_ref().ok_or_else(|| {
291                LightClientError::Proto(crate::proto::TryFromProtoError::missing("signature"))
292            })?;
293            let summary: CheckpointSummary = summary_bcs.deserialize()?;
294            let signature: ValidatorAggregatedSignature = signature_proto.try_into()?;
295            Ok::<_, LightClientError>((item.epoch, item.end_of_epoch_seq, summary, signature))
296        });
297
298    let mut fetched: Vec<_> = futures::stream::iter(futures)
299        .buffer_unordered(concurrency)
300        .try_collect()
301        .await?;
302
303    // `buffer_unordered` may return results out of order; the apply
304    // phase requires ascending epoch order to feed the state machine
305    // correctly.
306    fetched.sort_by_key(|(epoch, _, _, _)| *epoch);
307    Ok(fetched
308        .into_iter()
309        .map(|(_, end_seq, summary, signature)| (end_seq, summary, signature))
310        .collect())
311}
312
313/// Verify `summary`'s BLS aggregate signature against `cache`'s current
314/// committee, then extract the next committee from
315/// `summary.end_of_epoch_data` and apply it as a ratchet update.
316///
317/// Exposed for testing the pure (non-RPC) half of the ratchet step.
318pub(crate) fn apply_verified_end_of_epoch(
319    cache: &mut EpochCache,
320    end_of_epoch_seq: u64,
321    summary: &CheckpointSummary,
322    signature: &ValidatorAggregatedSignature,
323) -> Result<(), LightClientError> {
324    let verifier = ValidatorCommitteeSignatureVerifier::new(cache.current_committee().clone())?;
325    verifier.verify_checkpoint_summary(summary, signature)?;
326
327    let next_committee = extract_next_epoch_committee(cache, summary, end_of_epoch_seq)?;
328    cache.apply_ratchet_update(next_committee)?;
329    Ok(())
330}
331
332/// Build the next epoch's committee from `summary.end_of_epoch_data`,
333/// pairing it with the cache's next epoch number.
334///
335/// Returns [`LightClientError::MissingEndOfEpochData`] if `summary` has no
336/// `end_of_epoch_data` (i.e. it was not in fact the last checkpoint of an
337/// epoch, despite the server having advertised it as such).
338fn extract_next_epoch_committee(
339    cache: &EpochCache,
340    summary: &CheckpointSummary,
341    checkpoint: u64,
342) -> Result<ValidatorCommittee, LightClientError> {
343    let end_of_epoch_data = summary
344        .end_of_epoch_data
345        .as_ref()
346        .ok_or(LightClientError::MissingEndOfEpochData { checkpoint })?;
347
348    Ok(ValidatorCommittee {
349        epoch: cache.current_epoch() + 1,
350        members: end_of_epoch_data.next_epoch_committee.clone(),
351    })
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    use sui_sdk_types::CheckpointCommitment;
359    use sui_sdk_types::Digest;
360    use sui_sdk_types::EndOfEpochData;
361    use sui_sdk_types::GasCostSummary;
362    use sui_sdk_types::ValidatorCommitteeMember;
363
364    fn committee(epoch: u64) -> ValidatorCommittee {
365        ValidatorCommittee {
366            epoch,
367            members: Vec::new(),
368        }
369    }
370
371    fn make_end_of_epoch_summary(
372        epoch: u64,
373        members: Vec<ValidatorCommitteeMember>,
374    ) -> CheckpointSummary {
375        CheckpointSummary {
376            epoch,
377            sequence_number: 99,
378            network_total_transactions: 0,
379            content_digest: Digest::ZERO,
380            previous_digest: None,
381            epoch_rolling_gas_cost_summary: GasCostSummary::default(),
382            timestamp_ms: 0,
383            checkpoint_commitments: Vec::<CheckpointCommitment>::new(),
384            end_of_epoch_data: Some(EndOfEpochData {
385                next_epoch_committee: members,
386                next_epoch_protocol_version: 1,
387                epoch_commitments: Vec::new(),
388            }),
389            version_specific_data: Vec::new(),
390        }
391    }
392
393    /// `extract_next_epoch_committee` pairs the cache's `current_epoch + 1`
394    /// with the members carried in the summary's `end_of_epoch_data`.
395    #[test]
396    fn extract_next_epoch_committee_uses_summary_members_and_advances_epoch() {
397        let cache = EpochCache::new(committee(7));
398        // Members content doesn't matter for this test; just check the
399        // count and the epoch number.
400        let members: Vec<ValidatorCommitteeMember> = Vec::new();
401        let summary = make_end_of_epoch_summary(7, members);
402
403        let next = extract_next_epoch_committee(&cache, &summary, 99).unwrap();
404        assert_eq!(next.epoch, 8);
405        assert!(next.members.is_empty());
406    }
407
408    /// A summary with no `end_of_epoch_data` is rejected with the right
409    /// error and the named checkpoint sequence number.
410    #[test]
411    fn extract_next_epoch_committee_requires_end_of_epoch_data() {
412        let cache = EpochCache::new(committee(0));
413        let summary = CheckpointSummary {
414            epoch: 0,
415            sequence_number: 42,
416            network_total_transactions: 0,
417            content_digest: Digest::ZERO,
418            previous_digest: None,
419            epoch_rolling_gas_cost_summary: GasCostSummary::default(),
420            timestamp_ms: 0,
421            checkpoint_commitments: Vec::new(),
422            end_of_epoch_data: None,
423            version_specific_data: Vec::new(),
424        };
425
426        let err = extract_next_epoch_committee(&cache, &summary, 42).unwrap_err();
427        assert!(
428            matches!(
429                err,
430                LightClientError::MissingEndOfEpochData { checkpoint: 42 }
431            ),
432            "got {err:?}"
433        );
434    }
435}