sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::{build_object_store, fetch_checkpoint, process_archive_checkpoint};
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::verify_checkpoint;
97
98/// A handle to the StateSync subsystem.
99///
100/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
101/// dropped, the StateSync system will be gracefully shutdown.
102#[derive(Clone, Debug)]
103pub struct Handle {
104    sender: mpsc::Sender<StateSyncMessage>,
105    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
106    metrics: Metrics,
107}
108
109impl Handle {
110    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
111    /// to other nodes on the network.
112    ///
113    /// # Invariant
114    ///
115    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
116    /// persistent storage. This includes CheckpointContents and all Transactions and
117    /// TransactionEffects included therein.
118    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119        self.sender
120            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121            .await
122            .unwrap()
123    }
124
125    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
126    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127        self.checkpoint_event_sender.subscribe()
128    }
129
130    /// Returns the number of peers reported due to consistent state sync failures.
131    pub fn get_peers_reported_for_failure(&self) -> u64 {
132        self.metrics.get_peers_reported_for_failure()
133    }
134}
135
136pub(super) fn compute_adaptive_timeout(
137    tx_count: u64,
138    min_timeout: Duration,
139    max_timeout: Duration,
140) -> Duration {
141    const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
142    const JITTER_FRACTION: f64 = 0.1;
143
144    let max_timeout = max_timeout.max(min_timeout);
145    let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
146    let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
147    let base = min_timeout + extra;
148
149    let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
150    let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
151    Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
152}
153
154#[cfg_attr(test, derive(Debug))]
155pub(super) struct PeerScore {
156    successes: VecDeque<(Instant, u64 /* response_size_bytes */, Duration)>,
157    failures: VecDeque<Instant>,
158    window: Duration,
159    failure_rate: f64,
160    failing_since: Option<Instant>,
161}
162
163impl PeerScore {
164    const MAX_SAMPLES: usize = 20;
165    const MIN_SAMPLES_FOR_FAILURE: usize = 10;
166
167    pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
168        Self {
169            successes: VecDeque::new(),
170            failures: VecDeque::new(),
171            window,
172            failure_rate,
173            failing_since: None,
174        }
175    }
176
177    pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
178        let now = Instant::now();
179        self.successes.push_back((now, size, response_time));
180        while self.successes.len() > Self::MAX_SAMPLES {
181            self.successes.pop_front();
182        }
183        self.failing_since = None;
184    }
185
186    pub(super) fn record_failure(&mut self) {
187        let now = Instant::now();
188        self.failures.push_back(now);
189        while self.failures.len() > Self::MAX_SAMPLES {
190            self.failures.pop_front();
191        }
192    }
193
194    pub(super) fn is_failing(&self) -> bool {
195        let now = Instant::now();
196        let recent_failures = self
197            .failures
198            .iter()
199            .filter(|ts| now.duration_since(**ts) < self.window)
200            .count();
201        let recent_successes = self
202            .successes
203            .iter()
204            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
205            .count();
206
207        let total = recent_failures + recent_successes;
208        if total < Self::MIN_SAMPLES_FOR_FAILURE {
209            return false;
210        }
211
212        let rate = recent_failures as f64 / total as f64;
213        rate >= self.failure_rate
214    }
215
216    pub(super) fn update_failing_state(&mut self) {
217        if self.is_failing() && self.failing_since.is_none() {
218            self.failing_since = Some(Instant::now());
219        }
220    }
221
222    pub(super) fn reset_failing_since(&mut self) {
223        self.failing_since = None;
224    }
225
226    pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
227        match self.failing_since {
228            Some(since) => since.elapsed() >= threshold,
229            None => false,
230        }
231    }
232
233    pub(super) fn effective_throughput(&self) -> Option<f64> {
234        let now = Instant::now();
235        let (total_size, total_time) = self
236            .successes
237            .iter()
238            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
239            .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
240                (size + s, time + *d)
241            });
242
243        if total_size == 0 {
244            return None;
245        }
246
247        if total_time.is_zero() {
248            return None;
249        }
250
251        Some(total_size as f64 / total_time.as_secs_f64())
252    }
253}
254
255struct PeerHeights {
256    /// Table used to track the highest checkpoint for each of our peers.
257    peers: HashMap<PeerId, PeerStateSyncInfo>,
258    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
259    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
260    scores: HashMap<PeerId, PeerScore>,
261
262    wait_interval_when_no_peer_to_sync_content: Duration,
263    peer_scoring_window: Duration,
264    peer_failure_rate: f64,
265    checkpoint_content_timeout_min: Duration,
266    checkpoint_content_timeout_max: Duration,
267    exploration_probability: f64,
268}
269
270#[derive(Copy, Clone, Debug, PartialEq, Eq)]
271struct PeerStateSyncInfo {
272    /// The digest of the Peer's genesis checkpoint.
273    genesis_checkpoint_digest: CheckpointDigest,
274    /// Indicates if this Peer is on the same chain as us.
275    on_same_chain_as_us: bool,
276    /// Highest checkpoint sequence number we know of for this Peer.
277    height: CheckpointSequenceNumber,
278    /// lowest available checkpoint watermark for this Peer.
279    /// This defaults to 0 for now.
280    lowest: CheckpointSequenceNumber,
281}
282
283impl PeerHeights {
284    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
285        self.peers
286            .values()
287            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
288            .max()
289    }
290
291    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
292        self.peers
293            .iter()
294            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
295    }
296
297    // Returns a bool that indicates if the update was done successfully.
298    //
299    // This will return false if the given peer doesn't have an entry or is not on the same chain
300    // as us
301    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
302    pub fn update_peer_info(
303        &mut self,
304        peer_id: PeerId,
305        checkpoint: Checkpoint,
306        low_watermark: Option<CheckpointSequenceNumber>,
307    ) -> bool {
308        debug!("Update peer info");
309
310        let info = match self.peers.get_mut(&peer_id) {
311            Some(info) if info.on_same_chain_as_us => info,
312            _ => return false,
313        };
314
315        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
316        if let Some(low_watermark) = low_watermark {
317            info.lowest = low_watermark;
318        }
319        self.insert_checkpoint(checkpoint);
320
321        true
322    }
323
324    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
325    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
326        use std::collections::hash_map::Entry;
327        debug!("Insert peer info");
328
329        match self.peers.entry(peer_id) {
330            Entry::Occupied(mut entry) => {
331                // If there's already an entry and the genesis checkpoint digests match then update
332                // the maximum height. Otherwise we'll use the more recent one
333                let entry = entry.get_mut();
334                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
335                    entry.height = std::cmp::max(entry.height, info.height);
336                } else {
337                    *entry = info;
338                }
339            }
340            Entry::Vacant(entry) => {
341                entry.insert(info);
342            }
343        }
344    }
345
346    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
347        if let Some(info) = self.peers.get_mut(&peer_id) {
348            info.on_same_chain_as_us = false;
349        }
350        self.scores.remove(&peer_id);
351    }
352
353    /// Updates the peer's height without storing any checkpoint data.
354    /// Returns false if the peer doesn't exist or is not on the same chain.
355    pub fn update_peer_height(
356        &mut self,
357        peer_id: PeerId,
358        height: CheckpointSequenceNumber,
359        low_watermark: Option<CheckpointSequenceNumber>,
360    ) -> bool {
361        let info = match self.peers.get_mut(&peer_id) {
362            Some(info) if info.on_same_chain_as_us => info,
363            _ => return false,
364        };
365
366        info.height = std::cmp::max(height, info.height);
367        if let Some(low_watermark) = low_watermark {
368            info.lowest = low_watermark;
369        }
370
371        true
372    }
373
374    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
375        self.unprocessed_checkpoints
376            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
377        self.sequence_number_to_digest
378            .retain(|&s, _digest| s > sequence_number);
379    }
380
381    /// Inserts a checkpoint into the unprocessed checkpoints store.
382    /// Only one checkpoint per sequence number is stored. If a checkpoint with the same
383    /// sequence number but different digest already exists, the new one is dropped.
384    // TODO: also record who gives this checkpoint info for peer quality measurement?
385    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
386        let digest = *checkpoint.digest();
387        let sequence_number = *checkpoint.sequence_number();
388
389        // Check if we already have a checkpoint for this sequence number.
390        if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
391            if *existing_digest == digest {
392                // Same checkpoint, nothing to do.
393                return;
394            }
395            tracing::info!(
396                ?sequence_number,
397                ?existing_digest,
398                ?digest,
399                "received checkpoint with same sequence number but different digest, dropping new checkpoint"
400            );
401            return;
402        }
403
404        self.unprocessed_checkpoints.insert(digest, checkpoint);
405        self.sequence_number_to_digest
406            .insert(sequence_number, digest);
407    }
408
409    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
410        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
411            self.sequence_number_to_digest
412                .remove(checkpoint.sequence_number());
413        }
414    }
415
416    pub fn get_checkpoint_by_sequence_number(
417        &self,
418        sequence_number: CheckpointSequenceNumber,
419    ) -> Option<&Checkpoint> {
420        self.sequence_number_to_digest
421            .get(&sequence_number)
422            .and_then(|digest| self.get_checkpoint_by_digest(digest))
423    }
424
425    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
426        self.unprocessed_checkpoints.get(digest)
427    }
428
429    #[cfg(test)]
430    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
431        self.wait_interval_when_no_peer_to_sync_content = duration;
432    }
433
434    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
435        self.wait_interval_when_no_peer_to_sync_content
436    }
437
438    pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
439        if !self.peers.contains_key(&peer_id) {
440            return;
441        }
442        self.scores
443            .entry(peer_id)
444            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
445            .record_success(size, response_time);
446    }
447
448    pub fn record_failure(&mut self, peer_id: PeerId) {
449        if !self.peers.contains_key(&peer_id) {
450            return;
451        }
452        self.scores
453            .entry(peer_id)
454            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
455            .record_failure();
456    }
457
458    pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
459        self.scores
460            .get(peer_id)
461            .and_then(|s| s.effective_throughput())
462    }
463
464    pub fn is_failing(&self, peer_id: &PeerId) -> bool {
465        self.scores
466            .get(peer_id)
467            .map(|s| s.is_failing())
468            .unwrap_or(false)
469    }
470
471    pub fn update_failing_states(&mut self) {
472        for score in self.scores.values_mut() {
473            score.update_failing_state();
474        }
475    }
476
477    pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
478        self.scores
479            .iter()
480            .filter(|(peer_id, score)| {
481                score.consistently_failing(threshold)
482                    && self
483                        .peers
484                        .get(peer_id)
485                        .is_some_and(|info| info.on_same_chain_as_us)
486            })
487            .max_by_key(|(_, score)| {
488                score
489                    .failing_since
490                    .map(|since| since.elapsed())
491                    .unwrap_or(Duration::ZERO)
492            })
493            .map(|(peer_id, _)| *peer_id)
494    }
495}
496
497// PeerBalancer selects peers using weighted random selection:
498// - Most of the time: select from known peers, weighted by throughput
499// - With configured probability: select from unknown peers (to explore)
500// - Failing peers: only as last resort
501#[derive(Clone)]
502struct PeerBalancer {
503    known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
504    unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
505    failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
506    requested_checkpoint: Option<CheckpointSequenceNumber>,
507    request_type: PeerCheckpointRequestType,
508    exploration_probability: f64,
509}
510
511#[derive(Clone)]
512enum PeerCheckpointRequestType {
513    Summary,
514    Content,
515}
516
517impl PeerBalancer {
518    pub fn new(
519        network: &anemo::Network,
520        peer_heights: Arc<RwLock<PeerHeights>>,
521        request_type: PeerCheckpointRequestType,
522    ) -> Self {
523        let peer_heights_guard = peer_heights.read().unwrap();
524
525        let mut known_peers = Vec::new();
526        let mut unknown_peers = Vec::new();
527        let mut failing_peers = Vec::new();
528        let exploration_probability = peer_heights_guard.exploration_probability;
529
530        for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
531            let Some(peer) = network.peer(*peer_id) else {
532                continue;
533            };
534            let rtt_secs = peer.connection_rtt().as_secs_f64();
535
536            if peer_heights_guard.is_failing(peer_id) {
537                failing_peers.push((peer, *info));
538            } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
539                known_peers.push((peer, *info, throughput));
540            } else {
541                unknown_peers.push((peer, *info, rtt_secs));
542            }
543        }
544        drop(peer_heights_guard);
545
546        unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
547            rtt_a
548                .partial_cmp(rtt_b)
549                .unwrap_or(std::cmp::Ordering::Equal)
550        });
551
552        Self {
553            known_peers,
554            unknown_peers,
555            failing_peers,
556            requested_checkpoint: None,
557            request_type,
558            exploration_probability,
559        }
560    }
561
562    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
563        self.requested_checkpoint = Some(checkpoint);
564        self
565    }
566
567    fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
568        let requested = self.requested_checkpoint.unwrap_or(0);
569        match &self.request_type {
570            PeerCheckpointRequestType::Summary => info.height >= requested,
571            PeerCheckpointRequestType::Content => {
572                info.height >= requested && info.lowest <= requested
573            }
574        }
575    }
576
577    fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
578        let eligible: Vec<_> = self
579            .known_peers
580            .iter()
581            .enumerate()
582            .filter(|(_, (_, info, _))| self.is_eligible(info))
583            .map(|(i, (_, _, t))| (i, *t))
584            .collect();
585
586        if eligible.is_empty() {
587            return None;
588        }
589
590        let total: f64 = eligible.iter().map(|(_, t)| t).sum();
591        let mut pick = rand::thread_rng().gen_range(0.0..total);
592
593        for (idx, throughput) in &eligible {
594            pick -= throughput;
595            if pick <= 0.0 {
596                let (peer, info, _) = self.known_peers.remove(*idx);
597                return Some((peer, info));
598            }
599        }
600
601        let (idx, _) = eligible.last().unwrap();
602        let (peer, info, _) = self.known_peers.remove(*idx);
603        Some((peer, info))
604    }
605
606    fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
607        let pos = self
608            .unknown_peers
609            .iter()
610            .position(|(_, info, _)| self.is_eligible(info))?;
611        let (peer, info, _) = self.unknown_peers.remove(pos);
612        Some((peer, info))
613    }
614
615    fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
616        let pos = self
617            .failing_peers
618            .iter()
619            .position(|(_, info)| self.is_eligible(info))?;
620        Some(self.failing_peers.remove(pos))
621    }
622}
623
624impl Iterator for PeerBalancer {
625    type Item = StateSyncClient<anemo::Peer>;
626
627    fn next(&mut self) -> Option<Self::Item> {
628        let has_eligible_known = self
629            .known_peers
630            .iter()
631            .any(|(_, info, _)| self.is_eligible(info));
632        let has_eligible_unknown = self
633            .unknown_peers
634            .iter()
635            .any(|(_, info, _)| self.is_eligible(info));
636
637        let explore = has_eligible_unknown
638            && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
639
640        if explore && let Some((peer, _)) = self.select_by_rtt() {
641            return Some(StateSyncClient::new(peer));
642        }
643
644        if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
645            return Some(StateSyncClient::new(peer));
646        }
647
648        if let Some((peer, _)) = self.select_failing() {
649            return Some(StateSyncClient::new(peer));
650        }
651
652        None
653    }
654}
655
656#[derive(Clone, Debug)]
657enum StateSyncMessage {
658    StartSyncJob,
659    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
660    // of the new checkpoint.
661    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
662    // Notification that the checkpoint content sync task will send to the event loop in the event
663    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
664    // synced at the same time, only the highest checkpoint is sent.
665    SyncedCheckpoint(Box<VerifiedCheckpoint>),
666}
667
668struct StateSyncEventLoop<S> {
669    config: StateSyncConfig,
670
671    mailbox: mpsc::Receiver<StateSyncMessage>,
672    /// Weak reference to our own mailbox
673    weak_sender: mpsc::WeakSender<StateSyncMessage>,
674
675    tasks: JoinSet<()>,
676    sync_checkpoint_summaries_task: Option<AbortHandle>,
677    sync_checkpoint_contents_task: Option<AbortHandle>,
678    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
679
680    store: S,
681    peer_heights: Arc<RwLock<PeerHeights>>,
682    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
683    network: anemo::Network,
684    metrics: Metrics,
685
686    sync_checkpoint_from_archive_task: Option<AbortHandle>,
687    archive_config: Option<ArchiveReaderConfig>,
688    discovery_sender: Option<crate::discovery::Sender>,
689}
690
691impl<S> StateSyncEventLoop<S>
692where
693    S: WriteStore + Clone + Send + Sync + 'static,
694{
695    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
696    // and that the only "await" points are from the select macro picking which event to handle.
697    // This ensures that the event loop is able to process events at a high speed and reduce the
698    // chance for building up a backlog of events to process.
699    pub async fn start(mut self) {
700        info!("State-Synchronizer started");
701
702        self.config.pinned_checkpoints.sort();
703
704        let mut interval = tokio::time::interval(self.config.interval_period());
705        let mut peer_events = {
706            let (subscriber, peers) = self.network.subscribe().unwrap();
707            for peer_id in peers {
708                self.spawn_get_latest_from_peer(peer_id);
709            }
710            subscriber
711        };
712        let (
713            target_checkpoint_contents_sequence_sender,
714            target_checkpoint_contents_sequence_receiver,
715        ) = watch::channel(0);
716
717        // Spawn tokio task to update metrics periodically in the background
718        let (_sender, receiver) = oneshot::channel();
719        tokio::spawn(update_checkpoint_watermark_metrics(
720            receiver,
721            self.store.clone(),
722            self.metrics.clone(),
723        ));
724
725        // Start checkpoint contents sync loop.
726        let task = sync_checkpoint_contents(
727            self.network.clone(),
728            self.store.clone(),
729            self.peer_heights.clone(),
730            self.weak_sender.clone(),
731            self.checkpoint_event_sender.clone(),
732            self.config.checkpoint_content_download_concurrency(),
733            self.config.checkpoint_content_download_tx_concurrency(),
734            self.config.use_get_checkpoint_contents_v2(),
735            target_checkpoint_contents_sequence_receiver,
736        );
737        let task_handle = self.tasks.spawn(task);
738        self.sync_checkpoint_contents_task = Some(task_handle);
739
740        // Start archive based checkpoint content sync loop.
741        // TODO: Consider switching to sync from archive only on startup.
742        // Right now because the peer set is fixed at startup, a node may eventually
743        // end up with peers who have all purged their local state. In such a scenario it will be
744        // stuck until restart when it ends up with a different set of peers. Once the discovery
745        // mechanism can dynamically identify and connect to other peers on the network, we will rely
746        // on sync from archive as a fall back.
747        let task = sync_checkpoint_contents_from_archive(
748            self.network.clone(),
749            self.archive_config.clone(),
750            self.store.clone(),
751            self.peer_heights.clone(),
752            self.metrics.clone(),
753        );
754        let task_handle = self.tasks.spawn(task);
755        self.sync_checkpoint_from_archive_task = Some(task_handle);
756
757        // Start main loop.
758        loop {
759            tokio::select! {
760                now = interval.tick() => {
761                    self.handle_tick(now.into_std());
762                },
763                maybe_message = self.mailbox.recv() => {
764                    // Once all handles to our mailbox have been dropped this
765                    // will yield `None` and we can terminate the event loop
766                    if let Some(message) = maybe_message {
767                        self.handle_message(message);
768                    } else {
769                        break;
770                    }
771                },
772                peer_event = peer_events.recv() => {
773                    self.handle_peer_event(peer_event);
774                },
775                Some(task_result) = self.tasks.join_next() => {
776                    match task_result {
777                        Ok(()) => {},
778                        Err(e) => {
779                            if e.is_cancelled() {
780                                // avoid crashing on ungraceful shutdown
781                            } else if e.is_panic() {
782                                // propagate panics.
783                                std::panic::resume_unwind(e.into_panic());
784                            } else {
785                                panic!("task failed: {e}");
786                            }
787                        },
788                    };
789
790                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
791                        panic!("sync_checkpoint_contents task unexpectedly terminated")
792                    }
793
794                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
795                        self.sync_checkpoint_summaries_task = None;
796                    }
797
798                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
799                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
800                    }
801                },
802            }
803
804            self.maybe_start_checkpoint_summary_sync_task();
805            self.maybe_trigger_checkpoint_contents_sync_task(
806                &target_checkpoint_contents_sequence_sender,
807            );
808        }
809
810        info!("State-Synchronizer ended");
811    }
812
813    fn handle_message(&mut self, message: StateSyncMessage) {
814        debug!("Received message: {:?}", message);
815        match message {
816            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
817            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
818                self.handle_checkpoint_from_consensus(checkpoint)
819            }
820            // After we've successfully synced a checkpoint we can notify our peers
821            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
822                self.spawn_notify_peers_of_checkpoint(*checkpoint)
823            }
824        }
825    }
826
827    // Handle a checkpoint that we received from consensus
828    #[instrument(level = "debug", skip_all)]
829    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
830        // Always check previous_digest matches in case there is a gap between
831        // state sync and consensus.
832        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
833            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
834            .digest();
835        if checkpoint.previous_digest != Some(prev_digest) {
836            panic!(
837                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
838                checkpoint.sequence_number(),
839                Some(prev_digest),
840                checkpoint.previous_digest
841            );
842        }
843
844        let latest_checkpoint = self
845            .store
846            .get_highest_verified_checkpoint()
847            .expect("store operation should not fail");
848
849        // If this is an older checkpoint, just ignore it
850        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
851            return;
852        }
853
854        let checkpoint = *checkpoint;
855        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
856        if *checkpoint.sequence_number() > next_sequence_number {
857            debug!(
858                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
859                next_sequence_number,
860                checkpoint.sequence_number()
861            );
862        }
863
864        // Because checkpoint from consensus sends in order, when we have checkpoint n,
865        // we must have all of the checkpoints before n from either state sync or consensus.
866        #[cfg(debug_assertions)]
867        {
868            let _ = (next_sequence_number..=*checkpoint.sequence_number())
869                .map(|n| {
870                    let checkpoint = self
871                        .store
872                        .get_checkpoint_by_sequence_number(n)
873                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
874                    self.store
875                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
876                        .unwrap_or_else(|| {
877                            panic!(
878                                "store should contain checkpoint contents for {:?}",
879                                checkpoint.content_digest
880                            )
881                        });
882                })
883                .collect::<Vec<_>>();
884        }
885
886        // If this is the last checkpoint of a epoch, we need to make sure
887        // new committee is in store before we verify newer checkpoints in next epoch.
888        // This could happen before this validator's reconfiguration finishes, because
889        // state sync does not reconfig.
890        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
891        // committee insertion in two places (only in `insert_checkpoint`).
892        if let Some(EndOfEpochData {
893            next_epoch_committee,
894            ..
895        }) = checkpoint.end_of_epoch_data.as_ref()
896        {
897            let next_committee = next_epoch_committee.iter().cloned().collect();
898            let committee =
899                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
900            self.store
901                .insert_committee(committee)
902                .expect("store operation should not fail");
903        }
904
905        self.store
906            .update_highest_verified_checkpoint(&checkpoint)
907            .expect("store operation should not fail");
908        self.store
909            .update_highest_synced_checkpoint(&checkpoint)
910            .expect("store operation should not fail");
911
912        // We don't care if no one is listening as this is a broadcast channel
913        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
914
915        self.spawn_notify_peers_of_checkpoint(checkpoint);
916    }
917
918    fn handle_peer_event(
919        &mut self,
920        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
921    ) {
922        use tokio::sync::broadcast::error::RecvError;
923
924        match peer_event {
925            Ok(PeerEvent::NewPeer(peer_id)) => {
926                self.spawn_get_latest_from_peer(peer_id);
927            }
928            Ok(PeerEvent::LostPeer(peer_id, _)) => {
929                let mut heights = self.peer_heights.write().unwrap();
930                heights.peers.remove(&peer_id);
931                heights.scores.remove(&peer_id);
932            }
933
934            Err(RecvError::Closed) => {
935                panic!("PeerEvent channel shouldn't be able to be closed");
936            }
937
938            Err(RecvError::Lagged(_)) => {
939                trace!("State-Sync fell behind processing PeerEvents");
940            }
941        }
942    }
943
944    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
945        if let Some(peer) = self.network.peer(peer_id) {
946            let genesis_checkpoint_digest = *self
947                .store
948                .get_checkpoint_by_sequence_number(0)
949                .expect("store should contain genesis checkpoint")
950                .digest();
951            let task = get_latest_from_peer(
952                genesis_checkpoint_digest,
953                peer,
954                self.peer_heights.clone(),
955                self.config.timeout(),
956            );
957            self.tasks.spawn(task);
958        }
959    }
960
961    fn handle_tick(&mut self, _now: std::time::Instant) {
962        let task = query_peers_for_their_latest_checkpoint(
963            self.network.clone(),
964            self.peer_heights.clone(),
965            self.weak_sender.clone(),
966            self.config.timeout(),
967        );
968        self.tasks.spawn(task);
969
970        if let Some(layer) = self.download_limit_layer.as_ref() {
971            layer.maybe_prune_map();
972        }
973
974        self.maybe_report_failing_peer();
975    }
976
977    fn maybe_report_failing_peer(&mut self) {
978        let threshold = self.config.peer_disconnect_threshold();
979        if threshold.is_zero() {
980            return;
981        }
982
983        let mut peer_heights = self.peer_heights.write().unwrap();
984        peer_heights.update_failing_states();
985
986        let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
987            return;
988        };
989        // Reset the failing clock so this peer isn't re-reported on the next tick.
990        // If the peer is still failing, update_failing_states will restart the clock
991        // and the peer must fail for another full threshold before being reported again.
992        // Discovery may decline to disconnect (e.g. too few peers), so this also
993        // prevents spamming reports every tick in that case.
994        if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
995            score.reset_failing_since();
996        }
997        drop(peer_heights);
998
999        info!("reporting peer {peer_id} for consistent state sync failures");
1000        if let Some(sender) = &self.discovery_sender {
1001            sender.report_peer_failure(peer_id);
1002        } else {
1003            let _ = self.network.disconnect(peer_id);
1004        }
1005        self.metrics.inc_peers_reported_for_failure();
1006    }
1007
1008    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1009        // Only run one sync task at a time
1010        if self.sync_checkpoint_summaries_task.is_some() {
1011            return;
1012        }
1013
1014        let highest_processed_checkpoint = self
1015            .store
1016            .get_highest_verified_checkpoint()
1017            .expect("store operation should not fail");
1018
1019        let highest_known_sequence_number = self
1020            .peer_heights
1021            .read()
1022            .unwrap()
1023            .highest_known_checkpoint_sequence_number();
1024
1025        if let Some(target_seq) = highest_known_sequence_number
1026            && *highest_processed_checkpoint.sequence_number() < target_seq
1027        {
1028            // Limit the per-sync batch size according to config.
1029            let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1030            let limited_target = std::cmp::min(
1031                target_seq,
1032                highest_processed_checkpoint
1033                    .sequence_number()
1034                    .saturating_add(max_batch_size),
1035            );
1036            let was_limited = limited_target < target_seq;
1037
1038            // Start sync job.
1039            let weak_sender = self.weak_sender.clone();
1040            let task = sync_to_checkpoint(
1041                self.network.clone(),
1042                self.store.clone(),
1043                self.peer_heights.clone(),
1044                self.metrics.clone(),
1045                self.config.pinned_checkpoints.clone(),
1046                self.config.checkpoint_header_download_concurrency(),
1047                self.config.timeout(),
1048                limited_target,
1049            )
1050            .map(move |result| match result {
1051                Ok(()) => {
1052                    // If we limited the sync range, immediately trigger
1053                    // another sync to continue catching up.
1054                    if was_limited && let Some(sender) = weak_sender.upgrade() {
1055                        let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1056                    }
1057                }
1058                Err(e) => {
1059                    debug!("error syncing checkpoint {e}");
1060                }
1061            });
1062            let task_handle = self.tasks.spawn(task);
1063            self.sync_checkpoint_summaries_task = Some(task_handle);
1064        }
1065    }
1066
1067    fn maybe_trigger_checkpoint_contents_sync_task(
1068        &mut self,
1069        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1070    ) {
1071        let highest_verified_checkpoint = self
1072            .store
1073            .get_highest_verified_checkpoint()
1074            .expect("store operation should not fail");
1075        let highest_synced_checkpoint = self
1076            .store
1077            .get_highest_synced_checkpoint()
1078            .expect("store operation should not fail");
1079
1080        if highest_verified_checkpoint.sequence_number()
1081            > highest_synced_checkpoint.sequence_number()
1082            // skip if we aren't connected to any peers that can help
1083            && self
1084                .peer_heights
1085                .read()
1086                .unwrap()
1087                .highest_known_checkpoint_sequence_number()
1088                > Some(*highest_synced_checkpoint.sequence_number())
1089        {
1090            let _ = target_sequence_channel.send_if_modified(|num| {
1091                let new_num = *highest_verified_checkpoint.sequence_number();
1092                if *num == new_num {
1093                    return false;
1094                }
1095                *num = new_num;
1096                true
1097            });
1098        }
1099    }
1100
1101    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1102        let task = notify_peers_of_checkpoint(
1103            self.network.clone(),
1104            self.peer_heights.clone(),
1105            checkpoint,
1106            self.config.timeout(),
1107        );
1108        self.tasks.spawn(task);
1109    }
1110}
1111
1112async fn notify_peers_of_checkpoint(
1113    network: anemo::Network,
1114    peer_heights: Arc<RwLock<PeerHeights>>,
1115    checkpoint: VerifiedCheckpoint,
1116    timeout: Duration,
1117) {
1118    let futs = peer_heights
1119        .read()
1120        .unwrap()
1121        .peers_on_same_chain()
1122        // Filter out any peers who we know already have a checkpoint higher than this one
1123        .filter_map(|(peer_id, info)| {
1124            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1125        })
1126        // Filter out any peers who we aren't connected with
1127        .flat_map(|peer_id| network.peer(*peer_id))
1128        .map(StateSyncClient::new)
1129        .map(|mut client| {
1130            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1131            async move { client.push_checkpoint_summary(request).await }
1132        })
1133        .collect::<Vec<_>>();
1134    futures::future::join_all(futs).await;
1135}
1136
1137async fn get_latest_from_peer(
1138    our_genesis_checkpoint_digest: CheckpointDigest,
1139    peer: anemo::Peer,
1140    peer_heights: Arc<RwLock<PeerHeights>>,
1141    timeout: Duration,
1142) {
1143    let peer_id = peer.peer_id();
1144    let mut client = StateSyncClient::new(peer);
1145
1146    let info = {
1147        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1148
1149        if let Some(info) = maybe_info {
1150            info
1151        } else {
1152            // TODO do we want to create a new API just for querying a node's chainid?
1153            //
1154            // We need to query this node's genesis checkpoint to see if they're on the same chain
1155            // as us
1156            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1157                .with_timeout(timeout);
1158            let response = client
1159                .get_checkpoint_summary(request)
1160                .await
1161                .map(Response::into_inner);
1162
1163            let info = match response {
1164                Ok(Some(checkpoint)) => {
1165                    let digest = *checkpoint.digest();
1166                    PeerStateSyncInfo {
1167                        genesis_checkpoint_digest: digest,
1168                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1169                        height: *checkpoint.sequence_number(),
1170                        lowest: CheckpointSequenceNumber::default(),
1171                    }
1172                }
1173                Ok(None) => PeerStateSyncInfo {
1174                    genesis_checkpoint_digest: CheckpointDigest::default(),
1175                    on_same_chain_as_us: false,
1176                    height: CheckpointSequenceNumber::default(),
1177                    lowest: CheckpointSequenceNumber::default(),
1178                },
1179                Err(status) => {
1180                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
1181                    return;
1182                }
1183            };
1184            peer_heights
1185                .write()
1186                .unwrap()
1187                .insert_peer_info(peer_id, info);
1188            info
1189        }
1190    };
1191
1192    // Bail early if this node isn't on the same chain as us
1193    if !info.on_same_chain_as_us {
1194        trace!(?info, "Peer {peer_id} not on same chain as us");
1195        return;
1196    }
1197    let Some((highest_checkpoint, low_watermark)) =
1198        query_peer_for_latest_info(&mut client, timeout).await
1199    else {
1200        return;
1201    };
1202    peer_heights
1203        .write()
1204        .unwrap()
1205        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1206}
1207
1208/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark
1209async fn query_peer_for_latest_info(
1210    client: &mut StateSyncClient<anemo::Peer>,
1211    timeout: Duration,
1212) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1213    let request = Request::new(()).with_timeout(timeout);
1214    let response = client
1215        .get_checkpoint_availability(request)
1216        .await
1217        .map(Response::into_inner);
1218    match response {
1219        Ok(GetCheckpointAvailabilityResponse {
1220            highest_synced_checkpoint,
1221            lowest_available_checkpoint,
1222        }) => {
1223            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1224        }
1225        Err(status) => {
1226            // If peer hasn't upgraded they would return 404 NotFound error
1227            if status.status() != anemo::types::response::StatusCode::NotFound {
1228                trace!("get_checkpoint_availability request failed: {status:?}");
1229                return None;
1230            }
1231        }
1232    };
1233
1234    // Then we try the old query
1235    // TODO: remove this once the new feature stabilizes
1236    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1237    let response = client
1238        .get_checkpoint_summary(request)
1239        .await
1240        .map(Response::into_inner);
1241    match response {
1242        Ok(Some(checkpoint)) => Some((checkpoint, None)),
1243        Ok(None) => None,
1244        Err(status) => {
1245            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1246            None
1247        }
1248    }
1249}
1250
1251#[instrument(level = "debug", skip_all)]
1252async fn query_peers_for_their_latest_checkpoint(
1253    network: anemo::Network,
1254    peer_heights: Arc<RwLock<PeerHeights>>,
1255    sender: mpsc::WeakSender<StateSyncMessage>,
1256    timeout: Duration,
1257) {
1258    let peer_heights = &peer_heights;
1259    let futs = peer_heights
1260        .read()
1261        .unwrap()
1262        .peers_on_same_chain()
1263        // Filter out any peers who we aren't connected with
1264        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1265        .map(|peer| {
1266            let peer_id = peer.peer_id();
1267            let mut client = StateSyncClient::new(peer);
1268
1269            async move {
1270                let response = query_peer_for_latest_info(&mut client, timeout).await;
1271                match response {
1272                    Some((highest_checkpoint, low_watermark)) => peer_heights
1273                        .write()
1274                        .unwrap()
1275                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1276                        .then_some(highest_checkpoint),
1277                    None => None,
1278                }
1279            }
1280        })
1281        .collect::<Vec<_>>();
1282
1283    debug!("Query {} peers for latest checkpoint", futs.len());
1284
1285    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1286
1287    let highest_checkpoint_seq = checkpoints
1288        .map(|checkpoint| *checkpoint.sequence_number())
1289        .max();
1290
1291    let our_highest_seq = peer_heights
1292        .read()
1293        .unwrap()
1294        .highest_known_checkpoint_sequence_number();
1295
1296    debug!(
1297        "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1298    );
1299
1300    let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1301        (Some(theirs), None) => theirs,
1302        (Some(theirs), Some(ours)) if theirs > ours => theirs,
1303        _ => return,
1304    };
1305
1306    if let Some(sender) = sender.upgrade() {
1307        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1308    }
1309}
1310
1311async fn sync_to_checkpoint<S>(
1312    network: anemo::Network,
1313    store: S,
1314    peer_heights: Arc<RwLock<PeerHeights>>,
1315    metrics: Metrics,
1316    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1317    checkpoint_header_download_concurrency: usize,
1318    timeout: Duration,
1319    target_sequence_number: CheckpointSequenceNumber,
1320) -> Result<()>
1321where
1322    S: WriteStore,
1323{
1324    metrics.set_highest_known_checkpoint(target_sequence_number);
1325
1326    let mut current = store
1327        .get_highest_verified_checkpoint()
1328        .expect("store operation should not fail");
1329    if *current.sequence_number() >= target_sequence_number {
1330        return Err(anyhow::anyhow!(
1331            "target checkpoint {} is older than highest verified checkpoint {}",
1332            target_sequence_number,
1333            current.sequence_number(),
1334        ));
1335    }
1336
1337    let peer_balancer = PeerBalancer::new(
1338        &network,
1339        peer_heights.clone(),
1340        PeerCheckpointRequestType::Summary,
1341    );
1342    // range of the next sequence_numbers to fetch
1343    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1344        ..=target_sequence_number)
1345        .map(|next| {
1346            let peers = peer_balancer.clone().with_checkpoint(next);
1347            let peer_heights = peer_heights.clone();
1348            let pinned_checkpoints = &pinned_checkpoints;
1349            async move {
1350                if let Some(checkpoint) = peer_heights
1351                    .read()
1352                    .unwrap()
1353                    .get_checkpoint_by_sequence_number(next)
1354                {
1355                    return (Some(checkpoint.to_owned()), next, None);
1356                }
1357
1358                // Iterate through peers trying each one in turn until we're able to
1359                // successfully get the target checkpoint.
1360                for mut peer in peers {
1361                    let peer_id = peer.inner().peer_id();
1362                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1363                        .with_timeout(timeout);
1364                    let start = Instant::now();
1365                    let result = peer.get_checkpoint_summary(request).await;
1366                    let elapsed = start.elapsed();
1367
1368                    let checkpoint = match result {
1369                        Ok(response) => match response.into_inner() {
1370                            Some(cp) => Some(cp),
1371                            None => {
1372                                trace!("peer unable to help sync");
1373                                peer_heights.write().unwrap().record_failure(peer_id);
1374                                None
1375                            }
1376                        },
1377                        Err(e) => {
1378                            trace!("{e:?}");
1379                            peer_heights.write().unwrap().record_failure(peer_id);
1380                            None
1381                        }
1382                    };
1383
1384                    let Some(checkpoint) = checkpoint else {
1385                        continue;
1386                    };
1387
1388                    let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1389                    peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1390
1391                    // peer didn't give us a checkpoint with the height that we requested
1392                    if *checkpoint.sequence_number() != next {
1393                        tracing::debug!(
1394                            "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1395                            checkpoint.sequence_number()
1396                        );
1397                        peer_heights
1398                            .write()
1399                            .unwrap()
1400                            .mark_peer_as_not_on_same_chain(peer_id);
1401                        continue;
1402                    }
1403
1404                    // peer gave us a checkpoint whose digest does not match pinned digest
1405                    let checkpoint_digest = checkpoint.digest();
1406                    if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1407                        checkpoint.sequence_number(),
1408                        |(seq_num, _digest)| *seq_num
1409                    )
1410                        && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1411                    {
1412                        tracing::debug!(
1413                            "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1414                            pinned_checkpoints[pinned_digest_index].1,
1415                            checkpoint_digest
1416                        );
1417                        peer_heights
1418                            .write()
1419                            .unwrap()
1420                            .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1421                        continue;
1422                    }
1423
1424                    // Insert in our store in the event that things fail and we need to retry
1425                    peer_heights
1426                        .write()
1427                        .unwrap()
1428                        .insert_checkpoint(checkpoint.clone());
1429                    return (Some(checkpoint), next, Some(peer_id));
1430                }
1431                (None, next, None)
1432            }
1433        })
1434        .pipe(futures::stream::iter)
1435        .buffered(checkpoint_header_download_concurrency);
1436
1437    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1438        assert_eq!(
1439            current
1440                .sequence_number()
1441                .checked_add(1)
1442                .expect("exhausted u64"),
1443            next
1444        );
1445
1446        // Verify the checkpoint
1447        let checkpoint = 'cp: {
1448            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1449                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1450            })?;
1451            // Skip verification for manually pinned checkpoints.
1452            if pinned_checkpoints
1453                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1454                .is_ok()
1455            {
1456                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1457            }
1458            match verify_checkpoint(&current, &store, checkpoint) {
1459                Ok(verified_checkpoint) => verified_checkpoint,
1460                Err(checkpoint) => {
1461                    let mut peer_heights = peer_heights.write().unwrap();
1462                    // Remove the checkpoint from our temporary store so that we can try querying
1463                    // another peer for a different one
1464                    peer_heights.remove_checkpoint(checkpoint.digest());
1465
1466                    // Mark peer as not on the same chain as us
1467                    if let Some(peer_id) = maybe_peer_id {
1468                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1469                    }
1470
1471                    return Err(anyhow::anyhow!(
1472                        "unable to verify checkpoint {checkpoint:?}"
1473                    ));
1474                }
1475            }
1476        };
1477
1478        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1479        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1480            metrics.checkpoint_summary_age_metrics()
1481        {
1482            checkpoint.report_checkpoint_age(
1483                checkpoint_summary_age_metric,
1484                checkpoint_summary_age_metric_deprecated,
1485            );
1486        }
1487
1488        // Insert the newly verified checkpoint into our store, which will bump our highest
1489        // verified checkpoint watermark as well.
1490        store
1491            .insert_checkpoint(&checkpoint)
1492            .expect("store operation should not fail");
1493
1494        current = checkpoint;
1495    }
1496
1497    peer_heights
1498        .write()
1499        .unwrap()
1500        .cleanup_old_checkpoints(*current.sequence_number());
1501
1502    Ok(())
1503}
1504
1505async fn sync_checkpoint_contents_from_archive<S>(
1506    network: anemo::Network,
1507    archive_config: Option<ArchiveReaderConfig>,
1508    store: S,
1509    peer_heights: Arc<RwLock<PeerHeights>>,
1510    metrics: Metrics,
1511) where
1512    S: WriteStore + Clone + Send + Sync + 'static,
1513{
1514    loop {
1515        sync_checkpoint_contents_from_archive_iteration(
1516            &network,
1517            &archive_config,
1518            store.clone(),
1519            peer_heights.clone(),
1520            metrics.clone(),
1521        )
1522        .await;
1523        tokio::time::sleep(Duration::from_secs(5)).await;
1524    }
1525}
1526
1527async fn sync_checkpoint_contents_from_archive_iteration<S>(
1528    network: &anemo::Network,
1529    archive_config: &Option<ArchiveReaderConfig>,
1530    store: S,
1531    peer_heights: Arc<RwLock<PeerHeights>>,
1532    metrics: Metrics,
1533) where
1534    S: WriteStore + Clone + Send + Sync + 'static,
1535{
1536    let peers: Vec<_> = peer_heights
1537        .read()
1538        .unwrap()
1539        .peers_on_same_chain()
1540        // Filter out any peers who we aren't connected with.
1541        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1542        .collect();
1543    let lowest_checkpoint_on_peers = peers
1544        .iter()
1545        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1546        .min();
1547    let highest_synced = store
1548        .get_highest_synced_checkpoint()
1549        .expect("store operation should not fail")
1550        .sequence_number;
1551    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1552        highest_synced < lowest_checkpoint_on_peers
1553    } else {
1554        false
1555    };
1556    debug!(
1557        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1558        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1559    );
1560    if sync_from_archive {
1561        let start = highest_synced
1562            .checked_add(1)
1563            .expect("Checkpoint seq num overflow");
1564        let end = lowest_checkpoint_on_peers.unwrap();
1565
1566        let Some(archive_config) = archive_config else {
1567            warn!("Failed to find an archive reader to complete the state sync request");
1568            return;
1569        };
1570        let Some(ingestion_url) = &archive_config.ingestion_url else {
1571            warn!("Archival ingestion url for state sync is not configured");
1572            return;
1573        };
1574        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1575            warn!("{} can't be used as an archival fallback", ingestion_url);
1576            return;
1577        }
1578        let obj_store =
1579            build_object_store(ingestion_url, archive_config.remote_store_options.clone());
1580        let mut checkpoint_stream = futures::stream::iter(start..=end)
1581            .map(|seq| {
1582                let obj_store = obj_store.clone();
1583                async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1584            })
1585            .buffered(archive_config.download_concurrency.get());
1586
1587        while let Some((seq, result)) = checkpoint_stream.next().await {
1588            let checkpoint = match result {
1589                Ok(checkpoint) => checkpoint,
1590                Err(err) => {
1591                    warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1592                    return;
1593                }
1594            };
1595            if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1596                warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1597                return;
1598            }
1599        }
1600    }
1601}
1602
1603async fn sync_checkpoint_contents<S>(
1604    network: anemo::Network,
1605    store: S,
1606    peer_heights: Arc<RwLock<PeerHeights>>,
1607    sender: mpsc::WeakSender<StateSyncMessage>,
1608    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1609    checkpoint_content_download_concurrency: usize,
1610    checkpoint_content_download_tx_concurrency: u64,
1611    use_get_checkpoint_contents_v2: bool,
1612    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1613) where
1614    S: WriteStore + Clone,
1615{
1616    let mut highest_synced = store
1617        .get_highest_synced_checkpoint()
1618        .expect("store operation should not fail");
1619
1620    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1621    let mut target_sequence_cursor = 0;
1622    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1623    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1624
1625    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1626
1627    loop {
1628        tokio::select! {
1629            result = target_sequence_channel.changed() => {
1630                match result {
1631                    Ok(()) => {
1632                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1633                    }
1634                    Err(_) => {
1635                        // Watch channel is closed, exit loop.
1636                        return
1637                    }
1638                }
1639            },
1640            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1641                match maybe_checkpoint {
1642                    Ok(checkpoint) => {
1643                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1644
1645                        store
1646                            .update_highest_synced_checkpoint(&checkpoint)
1647                            .expect("store operation should not fail");
1648                        // We don't care if no one is listening as this is a broadcast channel
1649                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1650                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1651                        highest_synced = checkpoint;
1652
1653                    }
1654                    Err(checkpoint) => {
1655                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1656                        if let Some(lowest_peer_checkpoint) =
1657                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1658                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1659                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1660                            }
1661                        } else {
1662                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1663
1664                        }
1665                        // Calculate tx_count for retry by getting previous checkpoint
1666                        let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1667                            checkpoint.network_total_transactions
1668                        } else {
1669                            let prev = store
1670                                .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1671                                .expect("previous checkpoint must exist")
1672                                .network_total_transactions;
1673                            checkpoint.network_total_transactions - prev
1674                        };
1675                        // Retry contents sync on failure.
1676                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1677                            network.clone(),
1678                            &store,
1679                            peer_heights.clone(),
1680                            use_get_checkpoint_contents_v2,
1681                            retry_tx_count,
1682                            checkpoint,
1683                        ));
1684                    }
1685                }
1686            },
1687        }
1688
1689        // Start new tasks up to configured concurrency limits.
1690        while current_sequence < target_sequence_cursor
1691            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1692        {
1693            let next_checkpoint = store
1694                .get_checkpoint_by_sequence_number(current_sequence)
1695                .unwrap_or_else(|| panic!(
1696                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1697                    current_sequence
1698                ));
1699
1700            // Enforce transaction count concurrency limit.
1701            let tx_count = next_checkpoint.network_total_transactions
1702                - highest_started_network_total_transactions;
1703            if tx_count > tx_concurrency_remaining {
1704                break;
1705            }
1706            tx_concurrency_remaining -= tx_count;
1707
1708            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1709            current_sequence += 1;
1710            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1711                network.clone(),
1712                &store,
1713                peer_heights.clone(),
1714                use_get_checkpoint_contents_v2,
1715                tx_count,
1716                next_checkpoint,
1717            ));
1718        }
1719
1720        if highest_synced
1721            .sequence_number()
1722            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1723            || checkpoint_contents_tasks.is_empty()
1724        {
1725            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1726            if let Some(sender) = sender.upgrade() {
1727                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1728                let _ = sender.send(message).await;
1729            }
1730        }
1731    }
1732}
1733
1734#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1735async fn sync_one_checkpoint_contents<S>(
1736    network: anemo::Network,
1737    store: S,
1738    peer_heights: Arc<RwLock<PeerHeights>>,
1739    use_get_checkpoint_contents_v2: bool,
1740    tx_count: u64,
1741    checkpoint: VerifiedCheckpoint,
1742) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1743where
1744    S: WriteStore + Clone,
1745{
1746    let (timeout_min, timeout_max) = {
1747        let ph = peer_heights.read().unwrap();
1748        (
1749            ph.checkpoint_content_timeout_min,
1750            ph.checkpoint_content_timeout_max,
1751        )
1752    };
1753    let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1754    debug!(
1755        "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1756        timeout, tx_count
1757    );
1758
1759    // Check if we already have produced this checkpoint locally. If so, we don't need
1760    // to get it from peers anymore.
1761    if store
1762        .get_highest_synced_checkpoint()
1763        .expect("store operation should not fail")
1764        .sequence_number()
1765        >= checkpoint.sequence_number()
1766    {
1767        debug!("checkpoint was already created via consensus output");
1768        return Ok(checkpoint);
1769    }
1770
1771    // Request checkpoint contents from peers.
1772    let peers = PeerBalancer::new(
1773        &network,
1774        peer_heights.clone(),
1775        PeerCheckpointRequestType::Content,
1776    )
1777    .with_checkpoint(*checkpoint.sequence_number());
1778    let now = tokio::time::Instant::now();
1779    let Some(_contents) = get_full_checkpoint_contents(
1780        peers,
1781        &store,
1782        peer_heights.clone(),
1783        &checkpoint,
1784        use_get_checkpoint_contents_v2,
1785        timeout,
1786    )
1787    .await
1788    else {
1789        // Delay completion in case of error so we don't hammer the network with retries.
1790        let duration = peer_heights
1791            .read()
1792            .unwrap()
1793            .wait_interval_when_no_peer_to_sync_content();
1794        if now.elapsed() < duration {
1795            let duration = duration - now.elapsed();
1796            info!("retrying checkpoint sync after {:?}", duration);
1797            tokio::time::sleep(duration).await;
1798        }
1799        return Err(checkpoint);
1800    };
1801    debug!("completed checkpoint contents sync");
1802    Ok(checkpoint)
1803}
1804
1805#[instrument(level = "debug", skip_all)]
1806async fn get_full_checkpoint_contents<S>(
1807    peers: PeerBalancer,
1808    store: S,
1809    peer_heights: Arc<RwLock<PeerHeights>>,
1810    checkpoint: &VerifiedCheckpoint,
1811    use_get_checkpoint_contents_v2: bool,
1812    timeout: Duration,
1813) -> Option<VersionedFullCheckpointContents>
1814where
1815    S: WriteStore,
1816{
1817    let sequence_number = checkpoint.sequence_number;
1818    let digest = checkpoint.content_digest;
1819    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1820        debug!("store already contains checkpoint contents");
1821        return Some(contents);
1822    }
1823
1824    // Iterate through our selected peers trying each one in turn until we're able to
1825    // successfully get the target checkpoint
1826    for mut peer in peers {
1827        let peer_id = peer.inner().peer_id();
1828        debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1829        let request = Request::new(digest).with_timeout(timeout);
1830        let start = Instant::now();
1831        let result = if use_get_checkpoint_contents_v2 {
1832            peer.get_checkpoint_contents_v2(request).await
1833        } else {
1834            peer.get_checkpoint_contents(request)
1835                .await
1836                .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1837        };
1838        let elapsed = start.elapsed();
1839
1840        let contents = match result {
1841            Ok(response) => match response.into_inner() {
1842                Some(c) => Some(c),
1843                None => {
1844                    trace!("peer unable to help sync");
1845                    peer_heights.write().unwrap().record_failure(peer_id);
1846                    None
1847                }
1848            },
1849            Err(e) => {
1850                trace!("{e:?}");
1851                peer_heights.write().unwrap().record_failure(peer_id);
1852                None
1853            }
1854        };
1855
1856        let Some(contents) = contents else {
1857            continue;
1858        };
1859
1860        if contents.verify_digests(digest).is_ok() {
1861            let size =
1862                bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1863            peer_heights
1864                .write()
1865                .unwrap()
1866                .record_success(peer_id, size, elapsed);
1867            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1868            store
1869                .insert_checkpoint_contents(checkpoint, verified_contents)
1870                .expect("store operation should not fail");
1871            return Some(contents);
1872        }
1873    }
1874    debug!("no peers had checkpoint contents");
1875    None
1876}
1877
1878async fn update_checkpoint_watermark_metrics<S>(
1879    mut recv: oneshot::Receiver<()>,
1880    store: S,
1881    metrics: Metrics,
1882) -> Result<()>
1883where
1884    S: WriteStore + Clone + Send + Sync,
1885{
1886    let mut interval = tokio::time::interval(Duration::from_secs(5));
1887    loop {
1888        tokio::select! {
1889             _now = interval.tick() => {
1890                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1891                    .expect("store operation should not fail");
1892                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1893                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1894                    .expect("store operation should not fail");
1895                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1896             },
1897            _ = &mut recv => break,
1898        }
1899    }
1900    Ok(())
1901}