sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::process_archive_checkpoint;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::object_store::util::{build_object_store, fetch_checkpoint};
97use sui_storage::verify_checkpoint;
98
99/// A handle to the StateSync subsystem.
100///
101/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
102/// dropped, the StateSync system will be gracefully shutdown.
103#[derive(Clone, Debug)]
104pub struct Handle {
105    sender: mpsc::Sender<StateSyncMessage>,
106    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107    metrics: Metrics,
108}
109
110impl Handle {
111    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
112    /// to other nodes on the network.
113    ///
114    /// # Invariant
115    ///
116    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
117    /// persistent storage. This includes CheckpointContents and all Transactions and
118    /// TransactionEffects included therein.
119    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
120        self.sender
121            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
122            .await
123            .unwrap()
124    }
125
126    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
127    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
128        self.checkpoint_event_sender.subscribe()
129    }
130
131    /// Returns the number of peers reported due to consistent state sync failures.
132    pub fn get_peers_reported_for_failure(&self) -> u64 {
133        self.metrics.get_peers_reported_for_failure()
134    }
135}
136
137pub(super) fn compute_adaptive_timeout(
138    tx_count: u64,
139    min_timeout: Duration,
140    max_timeout: Duration,
141) -> Duration {
142    const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
143    const JITTER_FRACTION: f64 = 0.1;
144
145    let max_timeout = max_timeout.max(min_timeout);
146    let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
147    let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
148    let base = min_timeout + extra;
149
150    let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
151    let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
152    Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
153}
154
155#[cfg_attr(test, derive(Debug))]
156pub(super) struct PeerScore {
157    successes: VecDeque<(Instant, u64 /* response_size_bytes */, Duration)>,
158    failures: VecDeque<Instant>,
159    window: Duration,
160    failure_rate: f64,
161    failing_since: Option<Instant>,
162}
163
164impl PeerScore {
165    const MAX_SAMPLES: usize = 20;
166    const MIN_SAMPLES_FOR_FAILURE: usize = 10;
167
168    pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
169        Self {
170            successes: VecDeque::new(),
171            failures: VecDeque::new(),
172            window,
173            failure_rate,
174            failing_since: None,
175        }
176    }
177
178    pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
179        let now = Instant::now();
180        self.successes.push_back((now, size, response_time));
181        while self.successes.len() > Self::MAX_SAMPLES {
182            self.successes.pop_front();
183        }
184        // Note: a single success does NOT clear `failing_since`. Whether the peer is
185        // "consistently failing" is driven by the windowed failure RATE (see
186        // `update_failing_state`), so an occasional success while the rate stays high (e.g.
187        // a brief reconnect during connection churn) must not keep resetting the clock and
188        // prevent the peer from ever being reported.
189    }
190
191    pub(super) fn record_failure(&mut self) {
192        let now = Instant::now();
193        self.failures.push_back(now);
194        while self.failures.len() > Self::MAX_SAMPLES {
195            self.failures.pop_front();
196        }
197    }
198
199    pub(super) fn is_failing(&self) -> bool {
200        let now = Instant::now();
201        let recent_failures = self
202            .failures
203            .iter()
204            .filter(|ts| now.duration_since(**ts) < self.window)
205            .count();
206        let recent_successes = self
207            .successes
208            .iter()
209            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
210            .count();
211
212        let total = recent_failures + recent_successes;
213        if total < Self::MIN_SAMPLES_FOR_FAILURE {
214            return false;
215        }
216
217        let rate = recent_failures as f64 / total as f64;
218        rate >= self.failure_rate
219    }
220
221    pub(super) fn update_failing_state(&mut self) {
222        if self.is_failing() {
223            // Start the consistently-failing clock when the windowed failure rate first
224            // crosses the threshold, and let it accumulate while the rate stays high.
225            if self.failing_since.is_none() {
226                self.failing_since = Some(Instant::now());
227            }
228        } else {
229            // The windowed failure rate dropped below the threshold: the peer is no longer
230            // failing, so clear the clock. Only an actual drop in the rate resets it.
231            self.failing_since = None;
232        }
233    }
234
235    pub(super) fn reset_failing_since(&mut self) {
236        self.failing_since = None;
237    }
238
239    pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
240        match self.failing_since {
241            Some(since) => since.elapsed() >= threshold,
242            None => false,
243        }
244    }
245
246    pub(super) fn effective_throughput(&self) -> Option<f64> {
247        let now = Instant::now();
248        let (total_size, total_time) = self
249            .successes
250            .iter()
251            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
252            .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
253                (size + s, time + *d)
254            });
255
256        if total_size == 0 {
257            return None;
258        }
259
260        if total_time.is_zero() {
261            return None;
262        }
263
264        Some(total_size as f64 / total_time.as_secs_f64())
265    }
266}
267
268struct PeerHeights {
269    /// Table used to track the highest checkpoint for each of our peers.
270    peers: HashMap<PeerId, PeerStateSyncInfo>,
271    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
272    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
273    scores: HashMap<PeerId, PeerScore>,
274
275    wait_interval_when_no_peer_to_sync_content: Duration,
276    peer_scoring_window: Duration,
277    peer_failure_rate: f64,
278    checkpoint_content_timeout_min: Duration,
279    checkpoint_content_timeout_max: Duration,
280    exploration_probability: f64,
281}
282
283#[derive(Copy, Clone, Debug, PartialEq, Eq)]
284struct PeerStateSyncInfo {
285    /// The digest of the Peer's genesis checkpoint.
286    genesis_checkpoint_digest: CheckpointDigest,
287    /// Indicates if this Peer is on the same chain as us.
288    on_same_chain_as_us: bool,
289    /// Highest checkpoint sequence number we know of for this Peer.
290    height: CheckpointSequenceNumber,
291    /// lowest available checkpoint watermark for this Peer.
292    /// This defaults to 0 for now.
293    lowest: CheckpointSequenceNumber,
294}
295
296impl PeerHeights {
297    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
298        self.peers
299            .values()
300            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
301            .max()
302    }
303
304    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
305        self.peers
306            .iter()
307            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
308    }
309
310    // Returns a bool that indicates if the update was done successfully.
311    //
312    // This will return false if the given peer doesn't have an entry or is not on the same chain
313    // as us
314    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
315    pub fn update_peer_info(
316        &mut self,
317        peer_id: PeerId,
318        checkpoint: Checkpoint,
319        low_watermark: Option<CheckpointSequenceNumber>,
320    ) -> bool {
321        debug!("Update peer info");
322
323        let info = match self.peers.get_mut(&peer_id) {
324            Some(info) if info.on_same_chain_as_us => info,
325            _ => return false,
326        };
327
328        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
329        if let Some(low_watermark) = low_watermark {
330            info.lowest = low_watermark;
331        }
332        self.insert_checkpoint(checkpoint);
333
334        true
335    }
336
337    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
338    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
339        use std::collections::hash_map::Entry;
340        debug!("Insert peer info");
341
342        match self.peers.entry(peer_id) {
343            Entry::Occupied(mut entry) => {
344                // If there's already an entry and the genesis checkpoint digests match then update
345                // the maximum height. Otherwise we'll use the more recent one
346                let entry = entry.get_mut();
347                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
348                    entry.height = std::cmp::max(entry.height, info.height);
349                } else {
350                    *entry = info;
351                }
352            }
353            Entry::Vacant(entry) => {
354                entry.insert(info);
355            }
356        }
357    }
358
359    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
360        if let Some(info) = self.peers.get_mut(&peer_id) {
361            info.on_same_chain_as_us = false;
362        }
363        self.scores.remove(&peer_id);
364    }
365
366    /// Updates the peer's height without storing any checkpoint data.
367    /// Returns false if the peer doesn't exist or is not on the same chain.
368    pub fn update_peer_height(
369        &mut self,
370        peer_id: PeerId,
371        height: CheckpointSequenceNumber,
372        low_watermark: Option<CheckpointSequenceNumber>,
373    ) -> bool {
374        let info = match self.peers.get_mut(&peer_id) {
375            Some(info) if info.on_same_chain_as_us => info,
376            _ => return false,
377        };
378
379        info.height = std::cmp::max(height, info.height);
380        if let Some(low_watermark) = low_watermark {
381            info.lowest = low_watermark;
382        }
383
384        true
385    }
386
387    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
388        self.unprocessed_checkpoints
389            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
390        self.sequence_number_to_digest
391            .retain(|&s, _digest| s > sequence_number);
392    }
393
394    /// Inserts a checkpoint into the unprocessed checkpoints store.
395    /// Only one checkpoint per sequence number is stored. If a checkpoint with the same
396    /// sequence number but different digest already exists, the new one is dropped.
397    // TODO: also record who gives this checkpoint info for peer quality measurement?
398    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
399        let digest = *checkpoint.digest();
400        let sequence_number = *checkpoint.sequence_number();
401
402        // Check if we already have a checkpoint for this sequence number.
403        if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
404            if *existing_digest == digest {
405                // Same checkpoint, nothing to do.
406                return;
407            }
408            tracing::info!(
409                ?sequence_number,
410                ?existing_digest,
411                ?digest,
412                "received checkpoint with same sequence number but different digest, dropping new checkpoint"
413            );
414            return;
415        }
416
417        self.unprocessed_checkpoints.insert(digest, checkpoint);
418        self.sequence_number_to_digest
419            .insert(sequence_number, digest);
420    }
421
422    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
423        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
424            self.sequence_number_to_digest
425                .remove(checkpoint.sequence_number());
426        }
427    }
428
429    pub fn get_checkpoint_by_sequence_number(
430        &self,
431        sequence_number: CheckpointSequenceNumber,
432    ) -> Option<&Checkpoint> {
433        self.sequence_number_to_digest
434            .get(&sequence_number)
435            .and_then(|digest| self.get_checkpoint_by_digest(digest))
436    }
437
438    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
439        self.unprocessed_checkpoints.get(digest)
440    }
441
442    #[cfg(test)]
443    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
444        self.wait_interval_when_no_peer_to_sync_content = duration;
445    }
446
447    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
448        self.wait_interval_when_no_peer_to_sync_content
449    }
450
451    pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
452        if !self.peers.contains_key(&peer_id) {
453            return;
454        }
455        self.scores
456            .entry(peer_id)
457            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
458            .record_success(size, response_time);
459    }
460
461    pub fn record_failure(&mut self, peer_id: PeerId) {
462        if !self.peers.contains_key(&peer_id) {
463            return;
464        }
465        self.scores
466            .entry(peer_id)
467            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
468            .record_failure();
469    }
470
471    pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
472        self.scores
473            .get(peer_id)
474            .and_then(|s| s.effective_throughput())
475    }
476
477    pub fn is_failing(&self, peer_id: &PeerId) -> bool {
478        self.scores
479            .get(peer_id)
480            .map(|s| s.is_failing())
481            .unwrap_or(false)
482    }
483
484    pub fn update_failing_states(&mut self) {
485        for score in self.scores.values_mut() {
486            score.update_failing_state();
487        }
488    }
489
490    pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
491        self.scores
492            .iter()
493            .filter(|(peer_id, score)| {
494                score.consistently_failing(threshold)
495                    && self
496                        .peers
497                        .get(peer_id)
498                        .is_some_and(|info| info.on_same_chain_as_us)
499            })
500            .max_by_key(|(_, score)| {
501                score
502                    .failing_since
503                    .map(|since| since.elapsed())
504                    .unwrap_or(Duration::ZERO)
505            })
506            .map(|(peer_id, _)| *peer_id)
507    }
508}
509
510// PeerBalancer selects peers using weighted random selection:
511// - Most of the time: select from known peers, weighted by throughput
512// - With configured probability: select from unknown peers (to explore)
513// - Failing peers: only as last resort
514#[derive(Clone)]
515struct PeerBalancer {
516    known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
517    unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
518    failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
519    requested_checkpoint: Option<CheckpointSequenceNumber>,
520    request_type: PeerCheckpointRequestType,
521    exploration_probability: f64,
522}
523
524#[derive(Clone)]
525enum PeerCheckpointRequestType {
526    Summary,
527    Content,
528}
529
530impl PeerBalancer {
531    pub fn new(
532        network: &anemo::Network,
533        peer_heights: Arc<RwLock<PeerHeights>>,
534        request_type: PeerCheckpointRequestType,
535    ) -> Self {
536        let peer_heights_guard = peer_heights.read().unwrap();
537
538        let mut known_peers = Vec::new();
539        let mut unknown_peers = Vec::new();
540        let mut failing_peers = Vec::new();
541        let exploration_probability = peer_heights_guard.exploration_probability;
542
543        for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
544            let Some(peer) = network.peer(*peer_id) else {
545                continue;
546            };
547            let rtt_secs = peer.connection_rtt().as_secs_f64();
548
549            if peer_heights_guard.is_failing(peer_id) {
550                failing_peers.push((peer, *info));
551            } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
552                known_peers.push((peer, *info, throughput));
553            } else {
554                unknown_peers.push((peer, *info, rtt_secs));
555            }
556        }
557        drop(peer_heights_guard);
558
559        unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
560            rtt_a
561                .partial_cmp(rtt_b)
562                .unwrap_or(std::cmp::Ordering::Equal)
563        });
564
565        Self {
566            known_peers,
567            unknown_peers,
568            failing_peers,
569            requested_checkpoint: None,
570            request_type,
571            exploration_probability,
572        }
573    }
574
575    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
576        self.requested_checkpoint = Some(checkpoint);
577        self
578    }
579
580    fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
581        let requested = self.requested_checkpoint.unwrap_or(0);
582        match &self.request_type {
583            PeerCheckpointRequestType::Summary => info.height >= requested,
584            PeerCheckpointRequestType::Content => {
585                info.height >= requested && info.lowest <= requested
586            }
587        }
588    }
589
590    fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
591        let eligible: Vec<_> = self
592            .known_peers
593            .iter()
594            .enumerate()
595            .filter(|(_, (_, info, _))| self.is_eligible(info))
596            .map(|(i, (_, _, t))| (i, *t))
597            .collect();
598
599        if eligible.is_empty() {
600            return None;
601        }
602
603        let total: f64 = eligible.iter().map(|(_, t)| t).sum();
604        let mut pick = rand::thread_rng().gen_range(0.0..total);
605
606        for (idx, throughput) in &eligible {
607            pick -= throughput;
608            if pick <= 0.0 {
609                let (peer, info, _) = self.known_peers.remove(*idx);
610                return Some((peer, info));
611            }
612        }
613
614        let (idx, _) = eligible.last().unwrap();
615        let (peer, info, _) = self.known_peers.remove(*idx);
616        Some((peer, info))
617    }
618
619    fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
620        let pos = self
621            .unknown_peers
622            .iter()
623            .position(|(_, info, _)| self.is_eligible(info))?;
624        let (peer, info, _) = self.unknown_peers.remove(pos);
625        Some((peer, info))
626    }
627
628    fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
629        let pos = self
630            .failing_peers
631            .iter()
632            .position(|(_, info)| self.is_eligible(info))?;
633        Some(self.failing_peers.remove(pos))
634    }
635}
636
637impl Iterator for PeerBalancer {
638    type Item = StateSyncClient<anemo::Peer>;
639
640    fn next(&mut self) -> Option<Self::Item> {
641        let has_eligible_known = self
642            .known_peers
643            .iter()
644            .any(|(_, info, _)| self.is_eligible(info));
645        let has_eligible_unknown = self
646            .unknown_peers
647            .iter()
648            .any(|(_, info, _)| self.is_eligible(info));
649
650        let explore = has_eligible_unknown
651            && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
652
653        if explore && let Some((peer, _)) = self.select_by_rtt() {
654            return Some(StateSyncClient::new(peer));
655        }
656
657        if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
658            return Some(StateSyncClient::new(peer));
659        }
660
661        if let Some((peer, _)) = self.select_failing() {
662            return Some(StateSyncClient::new(peer));
663        }
664
665        None
666    }
667}
668
669#[derive(Clone, Debug)]
670enum StateSyncMessage {
671    StartSyncJob,
672    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
673    // of the new checkpoint.
674    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
675    // Notification that the checkpoint content sync task will send to the event loop in the event
676    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
677    // synced at the same time, only the highest checkpoint is sent.
678    SyncedCheckpoint(Box<VerifiedCheckpoint>),
679}
680
681struct StateSyncEventLoop<S> {
682    config: StateSyncConfig,
683
684    mailbox: mpsc::Receiver<StateSyncMessage>,
685    /// Weak reference to our own mailbox
686    weak_sender: mpsc::WeakSender<StateSyncMessage>,
687
688    tasks: JoinSet<()>,
689    sync_checkpoint_summaries_task: Option<AbortHandle>,
690    sync_checkpoint_contents_task: Option<AbortHandle>,
691    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
692
693    store: S,
694    peer_heights: Arc<RwLock<PeerHeights>>,
695    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
696    network: anemo::Network,
697    metrics: Metrics,
698
699    sync_checkpoint_from_archive_task: Option<AbortHandle>,
700    archive_config: Option<ArchiveReaderConfig>,
701    discovery_sender: Option<crate::discovery::Sender>,
702}
703
704impl<S> StateSyncEventLoop<S>
705where
706    S: WriteStore + Clone + Send + Sync + 'static,
707{
708    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
709    // and that the only "await" points are from the select macro picking which event to handle.
710    // This ensures that the event loop is able to process events at a high speed and reduce the
711    // chance for building up a backlog of events to process.
712    pub async fn start(mut self) {
713        info!("State-Synchronizer started");
714
715        self.config.pinned_checkpoints.sort();
716
717        let mut interval = tokio::time::interval(self.config.interval_period());
718        let mut peer_events = {
719            let (subscriber, peers) = self.network.subscribe().unwrap();
720            for peer_id in peers {
721                self.spawn_get_latest_from_peer(peer_id);
722            }
723            subscriber
724        };
725        let (
726            target_checkpoint_contents_sequence_sender,
727            target_checkpoint_contents_sequence_receiver,
728        ) = watch::channel(0);
729
730        // Spawn tokio task to update metrics periodically in the background
731        let (_sender, receiver) = oneshot::channel();
732        tokio::spawn(update_checkpoint_watermark_metrics(
733            receiver,
734            self.store.clone(),
735            self.metrics.clone(),
736        ));
737
738        // Start checkpoint contents sync loop.
739        let task = sync_checkpoint_contents(
740            self.network.clone(),
741            self.store.clone(),
742            self.peer_heights.clone(),
743            self.weak_sender.clone(),
744            self.checkpoint_event_sender.clone(),
745            self.config.checkpoint_content_download_concurrency(),
746            self.config.checkpoint_content_download_tx_concurrency(),
747            self.config.use_get_checkpoint_contents_v2(),
748            target_checkpoint_contents_sequence_receiver,
749        );
750        let task_handle = self.tasks.spawn(task);
751        self.sync_checkpoint_contents_task = Some(task_handle);
752
753        // Start archive based checkpoint content sync loop.
754        // TODO: Consider switching to sync from archive only on startup.
755        // Right now because the peer set is fixed at startup, a node may eventually
756        // end up with peers who have all purged their local state. In such a scenario it will be
757        // stuck until restart when it ends up with a different set of peers. Once the discovery
758        // mechanism can dynamically identify and connect to other peers on the network, we will rely
759        // on sync from archive as a fall back.
760        let task = sync_checkpoint_contents_from_archive(
761            self.network.clone(),
762            self.archive_config.clone(),
763            self.store.clone(),
764            self.peer_heights.clone(),
765            self.metrics.clone(),
766        );
767        let task_handle = self.tasks.spawn(task);
768        self.sync_checkpoint_from_archive_task = Some(task_handle);
769
770        // Start main loop.
771        loop {
772            tokio::select! {
773                now = interval.tick() => {
774                    self.handle_tick(now.into_std());
775                },
776                maybe_message = self.mailbox.recv() => {
777                    // Once all handles to our mailbox have been dropped this
778                    // will yield `None` and we can terminate the event loop
779                    if let Some(message) = maybe_message {
780                        self.handle_message(message);
781                    } else {
782                        break;
783                    }
784                },
785                peer_event = peer_events.recv() => {
786                    self.handle_peer_event(peer_event);
787                },
788                Some(task_result) = self.tasks.join_next() => {
789                    match task_result {
790                        Ok(()) => {},
791                        Err(e) => {
792                            if e.is_cancelled() {
793                                // avoid crashing on ungraceful shutdown
794                            } else if e.is_panic() {
795                                // propagate panics.
796                                std::panic::resume_unwind(e.into_panic());
797                            } else {
798                                panic!("task failed: {e}");
799                            }
800                        },
801                    };
802
803                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
804                        panic!("sync_checkpoint_contents task unexpectedly terminated")
805                    }
806
807                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
808                        self.sync_checkpoint_summaries_task = None;
809                    }
810
811                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
812                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
813                    }
814                },
815            }
816
817            self.maybe_start_checkpoint_summary_sync_task();
818            self.maybe_trigger_checkpoint_contents_sync_task(
819                &target_checkpoint_contents_sequence_sender,
820            );
821        }
822
823        info!("State-Synchronizer ended");
824    }
825
826    fn handle_message(&mut self, message: StateSyncMessage) {
827        debug!("Received message: {:?}", message);
828        match message {
829            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
830            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
831                self.handle_checkpoint_from_consensus(checkpoint)
832            }
833            // After we've successfully synced a checkpoint we can notify our peers
834            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
835                self.spawn_notify_peers_of_checkpoint(*checkpoint)
836            }
837        }
838    }
839
840    // Handle a checkpoint that we received from consensus
841    #[instrument(level = "debug", skip_all)]
842    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
843        // Always check previous_digest matches in case there is a gap between
844        // state sync and consensus.
845        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
846            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
847            .digest();
848        if checkpoint.previous_digest != Some(prev_digest) {
849            panic!(
850                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
851                checkpoint.sequence_number(),
852                Some(prev_digest),
853                checkpoint.previous_digest
854            );
855        }
856
857        let latest_checkpoint = self
858            .store
859            .get_highest_verified_checkpoint()
860            .expect("store operation should not fail");
861
862        // If this is an older checkpoint, just ignore it
863        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
864            return;
865        }
866
867        let checkpoint = *checkpoint;
868        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
869        if *checkpoint.sequence_number() > next_sequence_number {
870            debug!(
871                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
872                next_sequence_number,
873                checkpoint.sequence_number()
874            );
875        }
876
877        // Because checkpoint from consensus sends in order, when we have checkpoint n,
878        // we must have all of the checkpoints before n from either state sync or consensus.
879        #[cfg(debug_assertions)]
880        {
881            let _ = (next_sequence_number..=*checkpoint.sequence_number())
882                .map(|n| {
883                    let checkpoint = self
884                        .store
885                        .get_checkpoint_by_sequence_number(n)
886                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
887                    self.store
888                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
889                        .unwrap_or_else(|| {
890                            panic!(
891                                "store should contain checkpoint contents for {:?}",
892                                checkpoint.content_digest
893                            )
894                        });
895                })
896                .collect::<Vec<_>>();
897        }
898
899        // If this is the last checkpoint of a epoch, we need to make sure
900        // new committee is in store before we verify newer checkpoints in next epoch.
901        // This could happen before this validator's reconfiguration finishes, because
902        // state sync does not reconfig.
903        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
904        // committee insertion in two places (only in `insert_checkpoint`).
905        if let Some(EndOfEpochData {
906            next_epoch_committee,
907            ..
908        }) = checkpoint.end_of_epoch_data.as_ref()
909        {
910            let next_committee = next_epoch_committee.iter().cloned().collect();
911            let committee =
912                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
913            self.store
914                .insert_committee(committee)
915                .expect("store operation should not fail");
916        }
917
918        self.store
919            .update_highest_verified_checkpoint(&checkpoint)
920            .expect("store operation should not fail");
921        self.store
922            .update_highest_synced_checkpoint(&checkpoint)
923            .expect("store operation should not fail");
924
925        // We don't care if no one is listening as this is a broadcast channel
926        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
927
928        self.spawn_notify_peers_of_checkpoint(checkpoint);
929    }
930
931    fn handle_peer_event(
932        &mut self,
933        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
934    ) {
935        use tokio::sync::broadcast::error::RecvError;
936
937        match peer_event {
938            Ok(PeerEvent::NewPeer(peer_id)) => {
939                self.spawn_get_latest_from_peer(peer_id);
940            }
941            Ok(PeerEvent::LostPeer(peer_id, _)) => {
942                let mut heights = self.peer_heights.write().unwrap();
943                heights.peers.remove(&peer_id);
944                heights.scores.remove(&peer_id);
945            }
946
947            Err(RecvError::Closed) => {
948                panic!("PeerEvent channel shouldn't be able to be closed");
949            }
950
951            Err(RecvError::Lagged(_)) => {
952                trace!("State-Sync fell behind processing PeerEvents");
953            }
954        }
955    }
956
957    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
958        if let Some(peer) = self.network.peer(peer_id) {
959            let genesis_checkpoint_digest = *self
960                .store
961                .get_checkpoint_by_sequence_number(0)
962                .expect("store should contain genesis checkpoint")
963                .digest();
964            let task = get_latest_from_peer(
965                genesis_checkpoint_digest,
966                peer,
967                self.peer_heights.clone(),
968                self.config.timeout(),
969            );
970            self.tasks.spawn(task);
971        }
972    }
973
974    fn handle_tick(&mut self, _now: std::time::Instant) {
975        let task = query_peers_for_their_latest_checkpoint(
976            self.network.clone(),
977            self.peer_heights.clone(),
978            self.weak_sender.clone(),
979            self.config.timeout(),
980        );
981        self.tasks.spawn(task);
982
983        if let Some(layer) = self.download_limit_layer.as_ref() {
984            layer.maybe_prune_map();
985        }
986
987        self.maybe_report_failing_peer();
988    }
989
990    fn maybe_report_failing_peer(&mut self) {
991        let threshold = self.config.peer_disconnect_threshold();
992        if threshold.is_zero() {
993            return;
994        }
995
996        let mut peer_heights = self.peer_heights.write().unwrap();
997        peer_heights.update_failing_states();
998
999        let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
1000            return;
1001        };
1002        // Reset the failing clock so this peer isn't re-reported on the next tick.
1003        // If the peer is still failing, update_failing_states will restart the clock
1004        // and the peer must fail for another full threshold before being reported again.
1005        // Discovery may decline to disconnect (e.g. too few peers), so this also
1006        // prevents spamming reports every tick in that case.
1007        if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
1008            score.reset_failing_since();
1009        }
1010        drop(peer_heights);
1011
1012        info!("reporting peer {peer_id} for consistent state sync failures");
1013        if let Some(sender) = &self.discovery_sender {
1014            sender.report_peer_failure(peer_id);
1015        } else {
1016            let _ = self.network.disconnect(peer_id);
1017        }
1018        self.metrics.inc_peers_reported_for_failure();
1019    }
1020
1021    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1022        // Only run one sync task at a time
1023        if self.sync_checkpoint_summaries_task.is_some() {
1024            return;
1025        }
1026
1027        let highest_processed_checkpoint = self
1028            .store
1029            .get_highest_verified_checkpoint()
1030            .expect("store operation should not fail");
1031
1032        let highest_known_sequence_number = self
1033            .peer_heights
1034            .read()
1035            .unwrap()
1036            .highest_known_checkpoint_sequence_number();
1037
1038        if let Some(target_seq) = highest_known_sequence_number
1039            && *highest_processed_checkpoint.sequence_number() < target_seq
1040        {
1041            // Limit the per-sync batch size according to config.
1042            let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1043            let limited_target = std::cmp::min(
1044                target_seq,
1045                highest_processed_checkpoint
1046                    .sequence_number()
1047                    .saturating_add(max_batch_size),
1048            );
1049            let was_limited = limited_target < target_seq;
1050
1051            // Start sync job.
1052            let weak_sender = self.weak_sender.clone();
1053            let task = sync_to_checkpoint(
1054                self.network.clone(),
1055                self.store.clone(),
1056                self.peer_heights.clone(),
1057                self.metrics.clone(),
1058                self.config.pinned_checkpoints.clone(),
1059                self.config.checkpoint_header_download_concurrency(),
1060                self.config.timeout(),
1061                limited_target,
1062            )
1063            .map(move |result| match result {
1064                Ok(()) => {
1065                    // If we limited the sync range, immediately trigger
1066                    // another sync to continue catching up.
1067                    if was_limited && let Some(sender) = weak_sender.upgrade() {
1068                        let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1069                    }
1070                }
1071                Err(e) => {
1072                    debug!("error syncing checkpoint {e}");
1073                }
1074            });
1075            let task_handle = self.tasks.spawn(task);
1076            self.sync_checkpoint_summaries_task = Some(task_handle);
1077        }
1078    }
1079
1080    fn maybe_trigger_checkpoint_contents_sync_task(
1081        &mut self,
1082        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1083    ) {
1084        let highest_verified_checkpoint = self
1085            .store
1086            .get_highest_verified_checkpoint()
1087            .expect("store operation should not fail");
1088        let highest_synced_checkpoint = self
1089            .store
1090            .get_highest_synced_checkpoint()
1091            .expect("store operation should not fail");
1092
1093        if highest_verified_checkpoint.sequence_number()
1094            > highest_synced_checkpoint.sequence_number()
1095            // skip if we aren't connected to any peers that can help
1096            && self
1097                .peer_heights
1098                .read()
1099                .unwrap()
1100                .highest_known_checkpoint_sequence_number()
1101                > Some(*highest_synced_checkpoint.sequence_number())
1102        {
1103            let _ = target_sequence_channel.send_if_modified(|num| {
1104                let new_num = *highest_verified_checkpoint.sequence_number();
1105                if *num == new_num {
1106                    return false;
1107                }
1108                *num = new_num;
1109                true
1110            });
1111        }
1112    }
1113
1114    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1115        let task = notify_peers_of_checkpoint(
1116            self.network.clone(),
1117            self.peer_heights.clone(),
1118            checkpoint,
1119            self.config.timeout(),
1120        );
1121        self.tasks.spawn(task);
1122    }
1123}
1124
1125async fn notify_peers_of_checkpoint(
1126    network: anemo::Network,
1127    peer_heights: Arc<RwLock<PeerHeights>>,
1128    checkpoint: VerifiedCheckpoint,
1129    timeout: Duration,
1130) {
1131    let futs = peer_heights
1132        .read()
1133        .unwrap()
1134        .peers_on_same_chain()
1135        // Filter out any peers who we know already have a checkpoint higher than this one
1136        .filter_map(|(peer_id, info)| {
1137            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1138        })
1139        // Filter out any peers who we aren't connected with
1140        .flat_map(|peer_id| network.peer(*peer_id))
1141        .map(StateSyncClient::new)
1142        .map(|mut client| {
1143            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1144            async move { client.push_checkpoint_summary(request).await }
1145        })
1146        .collect::<Vec<_>>();
1147    futures::future::join_all(futs).await;
1148}
1149
1150async fn get_latest_from_peer(
1151    our_genesis_checkpoint_digest: CheckpointDigest,
1152    peer: anemo::Peer,
1153    peer_heights: Arc<RwLock<PeerHeights>>,
1154    timeout: Duration,
1155) {
1156    let peer_id = peer.peer_id();
1157    let mut client = StateSyncClient::new(peer);
1158
1159    let info = {
1160        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1161
1162        if let Some(info) = maybe_info {
1163            info
1164        } else {
1165            // TODO do we want to create a new API just for querying a node's chainid?
1166            //
1167            // We need to query this node's genesis checkpoint to see if they're on the same chain
1168            // as us
1169            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1170                .with_timeout(timeout);
1171            let response = client
1172                .get_checkpoint_summary(request)
1173                .await
1174                .map(Response::into_inner);
1175
1176            let info = match response {
1177                Ok(Some(checkpoint)) => {
1178                    let digest = *checkpoint.digest();
1179                    PeerStateSyncInfo {
1180                        genesis_checkpoint_digest: digest,
1181                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1182                        height: *checkpoint.sequence_number(),
1183                        lowest: CheckpointSequenceNumber::default(),
1184                    }
1185                }
1186                Ok(None) => PeerStateSyncInfo {
1187                    genesis_checkpoint_digest: CheckpointDigest::default(),
1188                    on_same_chain_as_us: false,
1189                    height: CheckpointSequenceNumber::default(),
1190                    lowest: CheckpointSequenceNumber::default(),
1191                },
1192                Err(status) => {
1193                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
1194                    return;
1195                }
1196            };
1197            peer_heights
1198                .write()
1199                .unwrap()
1200                .insert_peer_info(peer_id, info);
1201            info
1202        }
1203    };
1204
1205    // Bail early if this node isn't on the same chain as us
1206    if !info.on_same_chain_as_us {
1207        trace!(?info, "Peer {peer_id} not on same chain as us");
1208        return;
1209    }
1210    let Ok(Some((highest_checkpoint, low_watermark))) =
1211        query_peer_for_latest_info(&mut client, timeout).await
1212    else {
1213        return;
1214    };
1215    peer_heights
1216        .write()
1217        .unwrap()
1218        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1219}
1220
1221/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark.
1222///
1223/// Distinguishes three outcomes so callers can score the peer:
1224/// - `Ok(Some(..))`: the peer answered with usable latest-checkpoint info.
1225/// - `Ok(None)`: the peer answered but had nothing useful to report (e.g. a pre-upgrade peer
1226///   that returns no latest summary). This is not a failure.
1227/// - `Err(())`: the request itself failed (timeout / transport error). This is a failure: a
1228///   peer that consistently fails these liveness queries is consistently unreachable.
1229async fn query_peer_for_latest_info(
1230    client: &mut StateSyncClient<anemo::Peer>,
1231    timeout: Duration,
1232) -> Result<Option<(Checkpoint, Option<CheckpointSequenceNumber>)>, ()> {
1233    let request = Request::new(()).with_timeout(timeout);
1234    let response = client
1235        .get_checkpoint_availability(request)
1236        .await
1237        .map(Response::into_inner);
1238    match response {
1239        Ok(GetCheckpointAvailabilityResponse {
1240            highest_synced_checkpoint,
1241            lowest_available_checkpoint,
1242        }) => {
1243            return Ok(Some((
1244                highest_synced_checkpoint,
1245                Some(lowest_available_checkpoint),
1246            )));
1247        }
1248        Err(status) => {
1249            // If peer hasn't upgraded they would return 404 NotFound error
1250            if status.status() != anemo::types::response::StatusCode::NotFound {
1251                trace!("get_checkpoint_availability request failed: {status:?}");
1252                return Err(());
1253            }
1254        }
1255    };
1256
1257    // Then we try the old query
1258    // TODO: remove this once the new feature stabilizes
1259    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1260    let response = client
1261        .get_checkpoint_summary(request)
1262        .await
1263        .map(Response::into_inner);
1264    match response {
1265        Ok(Some(checkpoint)) => Ok(Some((checkpoint, None))),
1266        Ok(None) => Ok(None),
1267        Err(status) => {
1268            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1269            Err(())
1270        }
1271    }
1272}
1273
1274#[instrument(level = "debug", skip_all)]
1275async fn query_peers_for_their_latest_checkpoint(
1276    network: anemo::Network,
1277    peer_heights: Arc<RwLock<PeerHeights>>,
1278    sender: mpsc::WeakSender<StateSyncMessage>,
1279    timeout: Duration,
1280) {
1281    let peer_heights = &peer_heights;
1282    let futs = peer_heights
1283        .read()
1284        .unwrap()
1285        .peers_on_same_chain()
1286        // Filter out any peers who we aren't connected with
1287        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1288        .map(|peer| {
1289            let peer_id = peer.peer_id();
1290            let mut client = StateSyncClient::new(peer);
1291
1292            async move {
1293                let start = Instant::now();
1294                let response = query_peer_for_latest_info(&mut client, timeout).await;
1295                let elapsed = start.elapsed();
1296                match response {
1297                    Ok(Some((highest_checkpoint, low_watermark))) => {
1298                        // A peer that answers our periodic liveness/discovery query is healthy;
1299                        // record it so its success rate offsets occasional failures.
1300                        let size = bcs::serialized_size(&highest_checkpoint).unwrap_or(0) as u64;
1301                        let mut peer_heights = peer_heights.write().unwrap();
1302                        peer_heights.record_success(peer_id, size, elapsed);
1303                        peer_heights
1304                            .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1305                            .then_some(highest_checkpoint)
1306                    }
1307                    // Peer responded but had nothing useful; neutral, do not score.
1308                    Ok(None) => None,
1309                    // Request failed (timeout / transport). A peer that consistently fails
1310                    // these liveness queries is consistently unreachable, so record the
1311                    // failure to feed the scorer even when we are not actively syncing from
1312                    // it. Without this, an unreachable peer is only scored while there is
1313                    // active sync traffic, so it can evade the consistently-failing-peer
1314                    // disconnect logic indefinitely.
1315                    Err(()) => {
1316                        peer_heights.write().unwrap().record_failure(peer_id);
1317                        None
1318                    }
1319                }
1320            }
1321        })
1322        .collect::<Vec<_>>();
1323
1324    debug!("Query {} peers for latest checkpoint", futs.len());
1325
1326    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1327
1328    let highest_checkpoint_seq = checkpoints
1329        .map(|checkpoint| *checkpoint.sequence_number())
1330        .max();
1331
1332    let our_highest_seq = peer_heights
1333        .read()
1334        .unwrap()
1335        .highest_known_checkpoint_sequence_number();
1336
1337    debug!(
1338        "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1339    );
1340
1341    let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1342        (Some(theirs), None) => theirs,
1343        (Some(theirs), Some(ours)) if theirs > ours => theirs,
1344        _ => return,
1345    };
1346
1347    if let Some(sender) = sender.upgrade() {
1348        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1349    }
1350}
1351
1352async fn sync_to_checkpoint<S>(
1353    network: anemo::Network,
1354    store: S,
1355    peer_heights: Arc<RwLock<PeerHeights>>,
1356    metrics: Metrics,
1357    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1358    checkpoint_header_download_concurrency: usize,
1359    timeout: Duration,
1360    target_sequence_number: CheckpointSequenceNumber,
1361) -> Result<()>
1362where
1363    S: WriteStore,
1364{
1365    metrics.set_highest_known_checkpoint(target_sequence_number);
1366
1367    let mut current = store
1368        .get_highest_verified_checkpoint()
1369        .expect("store operation should not fail");
1370    if *current.sequence_number() >= target_sequence_number {
1371        return Err(anyhow::anyhow!(
1372            "target checkpoint {} is older than highest verified checkpoint {}",
1373            target_sequence_number,
1374            current.sequence_number(),
1375        ));
1376    }
1377
1378    let peer_balancer = PeerBalancer::new(
1379        &network,
1380        peer_heights.clone(),
1381        PeerCheckpointRequestType::Summary,
1382    );
1383    // range of the next sequence_numbers to fetch
1384    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1385        ..=target_sequence_number)
1386        .map(|next| {
1387            let peers = peer_balancer.clone().with_checkpoint(next);
1388            let peer_heights = peer_heights.clone();
1389            let pinned_checkpoints = &pinned_checkpoints;
1390            async move {
1391                if let Some(checkpoint) = peer_heights
1392                    .read()
1393                    .unwrap()
1394                    .get_checkpoint_by_sequence_number(next)
1395                {
1396                    return (Some(checkpoint.to_owned()), next, None);
1397                }
1398
1399                // Iterate through peers trying each one in turn until we're able to
1400                // successfully get the target checkpoint.
1401                for mut peer in peers {
1402                    let peer_id = peer.inner().peer_id();
1403                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1404                        .with_timeout(timeout);
1405                    let start = Instant::now();
1406                    let result = peer.get_checkpoint_summary(request).await;
1407                    let elapsed = start.elapsed();
1408
1409                    let checkpoint = match result {
1410                        Ok(response) => match response.into_inner() {
1411                            Some(cp) => Some(cp),
1412                            None => {
1413                                trace!("peer unable to help sync");
1414                                peer_heights.write().unwrap().record_failure(peer_id);
1415                                None
1416                            }
1417                        },
1418                        Err(e) => {
1419                            trace!("{e:?}");
1420                            peer_heights.write().unwrap().record_failure(peer_id);
1421                            None
1422                        }
1423                    };
1424
1425                    let Some(checkpoint) = checkpoint else {
1426                        continue;
1427                    };
1428
1429                    let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1430                    peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1431
1432                    // peer didn't give us a checkpoint with the height that we requested
1433                    if *checkpoint.sequence_number() != next {
1434                        tracing::debug!(
1435                            "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1436                            checkpoint.sequence_number()
1437                        );
1438                        peer_heights
1439                            .write()
1440                            .unwrap()
1441                            .mark_peer_as_not_on_same_chain(peer_id);
1442                        continue;
1443                    }
1444
1445                    // peer gave us a checkpoint whose digest does not match pinned digest
1446                    let checkpoint_digest = checkpoint.digest();
1447                    if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1448                        checkpoint.sequence_number(),
1449                        |(seq_num, _digest)| *seq_num
1450                    )
1451                        && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1452                    {
1453                        tracing::debug!(
1454                            "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1455                            pinned_checkpoints[pinned_digest_index].1,
1456                            checkpoint_digest
1457                        );
1458                        peer_heights
1459                            .write()
1460                            .unwrap()
1461                            .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1462                        continue;
1463                    }
1464
1465                    // Insert in our store in the event that things fail and we need to retry
1466                    peer_heights
1467                        .write()
1468                        .unwrap()
1469                        .insert_checkpoint(checkpoint.clone());
1470                    return (Some(checkpoint), next, Some(peer_id));
1471                }
1472                (None, next, None)
1473            }
1474        })
1475        .pipe(futures::stream::iter)
1476        .buffered(checkpoint_header_download_concurrency);
1477
1478    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1479        assert_eq!(
1480            current
1481                .sequence_number()
1482                .checked_add(1)
1483                .expect("exhausted u64"),
1484            next
1485        );
1486
1487        // Verify the checkpoint
1488        let checkpoint = 'cp: {
1489            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1490                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1491            })?;
1492            // Skip verification for manually pinned checkpoints.
1493            if pinned_checkpoints
1494                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1495                .is_ok()
1496            {
1497                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1498            }
1499            match verify_checkpoint(&current, &store, checkpoint) {
1500                Ok(verified_checkpoint) => verified_checkpoint,
1501                Err(checkpoint) => {
1502                    let mut peer_heights = peer_heights.write().unwrap();
1503                    // Remove the checkpoint from our temporary store so that we can try querying
1504                    // another peer for a different one
1505                    peer_heights.remove_checkpoint(checkpoint.digest());
1506
1507                    // Mark peer as not on the same chain as us
1508                    if let Some(peer_id) = maybe_peer_id {
1509                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1510                    }
1511
1512                    return Err(anyhow::anyhow!(
1513                        "unable to verify checkpoint {checkpoint:?}"
1514                    ));
1515                }
1516            }
1517        };
1518
1519        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1520        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1521            metrics.checkpoint_summary_age_metrics()
1522        {
1523            checkpoint.report_checkpoint_age(
1524                checkpoint_summary_age_metric,
1525                checkpoint_summary_age_metric_deprecated,
1526            );
1527        }
1528
1529        // Insert the newly verified checkpoint into our store, which will bump our highest
1530        // verified checkpoint watermark as well.
1531        store
1532            .insert_checkpoint(&checkpoint)
1533            .expect("store operation should not fail");
1534
1535        current = checkpoint;
1536    }
1537
1538    peer_heights
1539        .write()
1540        .unwrap()
1541        .cleanup_old_checkpoints(*current.sequence_number());
1542
1543    Ok(())
1544}
1545
1546async fn sync_checkpoint_contents_from_archive<S>(
1547    network: anemo::Network,
1548    archive_config: Option<ArchiveReaderConfig>,
1549    store: S,
1550    peer_heights: Arc<RwLock<PeerHeights>>,
1551    metrics: Metrics,
1552) where
1553    S: WriteStore + Clone + Send + Sync + 'static,
1554{
1555    loop {
1556        sync_checkpoint_contents_from_archive_iteration(
1557            &network,
1558            &archive_config,
1559            store.clone(),
1560            peer_heights.clone(),
1561            metrics.clone(),
1562        )
1563        .await;
1564        tokio::time::sleep(Duration::from_secs(5)).await;
1565    }
1566}
1567
1568async fn sync_checkpoint_contents_from_archive_iteration<S>(
1569    network: &anemo::Network,
1570    archive_config: &Option<ArchiveReaderConfig>,
1571    store: S,
1572    peer_heights: Arc<RwLock<PeerHeights>>,
1573    metrics: Metrics,
1574) where
1575    S: WriteStore + Clone + Send + Sync + 'static,
1576{
1577    let peers: Vec<_> = peer_heights
1578        .read()
1579        .unwrap()
1580        .peers_on_same_chain()
1581        // Filter out any peers who we aren't connected with.
1582        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1583        .collect();
1584    let lowest_checkpoint_on_peers = peers
1585        .iter()
1586        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1587        .min();
1588    let highest_synced = store
1589        .get_highest_synced_checkpoint()
1590        .expect("store operation should not fail")
1591        .sequence_number;
1592    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1593        highest_synced < lowest_checkpoint_on_peers
1594    } else {
1595        false
1596    };
1597    debug!(
1598        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1599        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1600    );
1601    if sync_from_archive {
1602        let start = highest_synced
1603            .checked_add(1)
1604            .expect("Checkpoint seq num overflow");
1605        let end = lowest_checkpoint_on_peers.unwrap();
1606
1607        let Some(archive_config) = archive_config else {
1608            warn!("Failed to find an archive reader to complete the state sync request");
1609            return;
1610        };
1611        let Some(ingestion_url) = &archive_config.ingestion_url else {
1612            warn!("Archival ingestion url for state sync is not configured");
1613            return;
1614        };
1615        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1616            warn!("{} can't be used as an archival fallback", ingestion_url);
1617            return;
1618        }
1619        let obj_store = build_object_store(
1620            ingestion_url,
1621            archive_config.remote_store_options.clone(),
1622            archive_config.remote_store_headers.clone(),
1623        );
1624        let mut checkpoint_stream = futures::stream::iter(start..=end)
1625            .map(|seq| {
1626                let obj_store = obj_store.clone();
1627                async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1628            })
1629            .buffered(archive_config.download_concurrency.get());
1630
1631        while let Some((seq, result)) = checkpoint_stream.next().await {
1632            let checkpoint = match result {
1633                Ok(checkpoint) => checkpoint,
1634                Err(err) => {
1635                    warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1636                    return;
1637                }
1638            };
1639            if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1640                warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1641                return;
1642            }
1643        }
1644    }
1645}
1646
1647async fn sync_checkpoint_contents<S>(
1648    network: anemo::Network,
1649    store: S,
1650    peer_heights: Arc<RwLock<PeerHeights>>,
1651    sender: mpsc::WeakSender<StateSyncMessage>,
1652    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1653    checkpoint_content_download_concurrency: usize,
1654    checkpoint_content_download_tx_concurrency: u64,
1655    use_get_checkpoint_contents_v2: bool,
1656    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1657) where
1658    S: WriteStore + Clone,
1659{
1660    let mut highest_synced = store
1661        .get_highest_synced_checkpoint()
1662        .expect("store operation should not fail");
1663
1664    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1665    let mut target_sequence_cursor = 0;
1666    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1667    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1668
1669    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1670
1671    loop {
1672        tokio::select! {
1673            result = target_sequence_channel.changed() => {
1674                match result {
1675                    Ok(()) => {
1676                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1677                    }
1678                    Err(_) => {
1679                        // Watch channel is closed, exit loop.
1680                        return
1681                    }
1682                }
1683            },
1684            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1685                match maybe_checkpoint {
1686                    Ok(checkpoint) => {
1687                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1688
1689                        store
1690                            .update_highest_synced_checkpoint(&checkpoint)
1691                            .expect("store operation should not fail");
1692                        // We don't care if no one is listening as this is a broadcast channel
1693                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1694                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1695                        highest_synced = checkpoint;
1696
1697                    }
1698                    Err(checkpoint) => {
1699                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1700                        if let Some(lowest_peer_checkpoint) =
1701                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1702                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1703                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1704                            }
1705                        } else {
1706                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1707
1708                        }
1709                        // Calculate tx_count for retry by getting previous checkpoint
1710                        let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1711                            checkpoint.network_total_transactions
1712                        } else {
1713                            let prev = store
1714                                .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1715                                .expect("previous checkpoint must exist")
1716                                .network_total_transactions;
1717                            checkpoint.network_total_transactions - prev
1718                        };
1719                        // Retry contents sync on failure.
1720                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1721                            network.clone(),
1722                            &store,
1723                            peer_heights.clone(),
1724                            use_get_checkpoint_contents_v2,
1725                            retry_tx_count,
1726                            checkpoint,
1727                        ));
1728                    }
1729                }
1730            },
1731        }
1732
1733        // Start new tasks up to configured concurrency limits.
1734        while current_sequence < target_sequence_cursor
1735            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1736        {
1737            let next_checkpoint = store
1738                .get_checkpoint_by_sequence_number(current_sequence)
1739                .unwrap_or_else(|| panic!(
1740                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1741                    current_sequence
1742                ));
1743
1744            // Enforce transaction count concurrency limit.
1745            let tx_count = next_checkpoint.network_total_transactions
1746                - highest_started_network_total_transactions;
1747            if tx_count > tx_concurrency_remaining {
1748                break;
1749            }
1750            tx_concurrency_remaining -= tx_count;
1751
1752            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1753            current_sequence += 1;
1754            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1755                network.clone(),
1756                &store,
1757                peer_heights.clone(),
1758                use_get_checkpoint_contents_v2,
1759                tx_count,
1760                next_checkpoint,
1761            ));
1762        }
1763
1764        if highest_synced
1765            .sequence_number()
1766            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1767            || checkpoint_contents_tasks.is_empty()
1768        {
1769            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1770            if let Some(sender) = sender.upgrade() {
1771                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1772                let _ = sender.send(message).await;
1773            }
1774        }
1775    }
1776}
1777
1778#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1779async fn sync_one_checkpoint_contents<S>(
1780    network: anemo::Network,
1781    store: S,
1782    peer_heights: Arc<RwLock<PeerHeights>>,
1783    use_get_checkpoint_contents_v2: bool,
1784    tx_count: u64,
1785    checkpoint: VerifiedCheckpoint,
1786) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1787where
1788    S: WriteStore + Clone,
1789{
1790    let (timeout_min, timeout_max) = {
1791        let ph = peer_heights.read().unwrap();
1792        (
1793            ph.checkpoint_content_timeout_min,
1794            ph.checkpoint_content_timeout_max,
1795        )
1796    };
1797    let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1798    debug!(
1799        "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1800        timeout, tx_count
1801    );
1802
1803    // Check if we already have produced this checkpoint locally. If so, we don't need
1804    // to get it from peers anymore.
1805    if store
1806        .get_highest_synced_checkpoint()
1807        .expect("store operation should not fail")
1808        .sequence_number()
1809        >= checkpoint.sequence_number()
1810    {
1811        debug!("checkpoint was already created via consensus output");
1812        return Ok(checkpoint);
1813    }
1814
1815    // Request checkpoint contents from peers.
1816    let peers = PeerBalancer::new(
1817        &network,
1818        peer_heights.clone(),
1819        PeerCheckpointRequestType::Content,
1820    )
1821    .with_checkpoint(*checkpoint.sequence_number());
1822    let now = tokio::time::Instant::now();
1823    let Some(_contents) = get_full_checkpoint_contents(
1824        peers,
1825        &store,
1826        peer_heights.clone(),
1827        &checkpoint,
1828        use_get_checkpoint_contents_v2,
1829        timeout,
1830    )
1831    .await
1832    else {
1833        // Delay completion in case of error so we don't hammer the network with retries.
1834        let duration = peer_heights
1835            .read()
1836            .unwrap()
1837            .wait_interval_when_no_peer_to_sync_content();
1838        if now.elapsed() < duration {
1839            let duration = duration - now.elapsed();
1840            info!("retrying checkpoint sync after {:?}", duration);
1841            tokio::time::sleep(duration).await;
1842        }
1843        return Err(checkpoint);
1844    };
1845    debug!("completed checkpoint contents sync");
1846    Ok(checkpoint)
1847}
1848
1849#[instrument(level = "debug", skip_all)]
1850async fn get_full_checkpoint_contents<S>(
1851    peers: PeerBalancer,
1852    store: S,
1853    peer_heights: Arc<RwLock<PeerHeights>>,
1854    checkpoint: &VerifiedCheckpoint,
1855    use_get_checkpoint_contents_v2: bool,
1856    timeout: Duration,
1857) -> Option<VersionedFullCheckpointContents>
1858where
1859    S: WriteStore,
1860{
1861    let sequence_number = checkpoint.sequence_number;
1862    let digest = checkpoint.content_digest;
1863    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1864        debug!("store already contains checkpoint contents");
1865        return Some(contents);
1866    }
1867
1868    // Iterate through our selected peers trying each one in turn until we're able to
1869    // successfully get the target checkpoint
1870    for mut peer in peers {
1871        let peer_id = peer.inner().peer_id();
1872        debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1873        let request = Request::new(digest).with_timeout(timeout);
1874        let start = Instant::now();
1875        let result = if use_get_checkpoint_contents_v2 {
1876            peer.get_checkpoint_contents_v2(request).await
1877        } else {
1878            peer.get_checkpoint_contents(request)
1879                .await
1880                .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1881        };
1882        let elapsed = start.elapsed();
1883
1884        let contents = match result {
1885            Ok(response) => match response.into_inner() {
1886                Some(c) => Some(c),
1887                None => {
1888                    trace!("peer unable to help sync");
1889                    peer_heights.write().unwrap().record_failure(peer_id);
1890                    None
1891                }
1892            },
1893            Err(e) => {
1894                trace!("{e:?}");
1895                peer_heights.write().unwrap().record_failure(peer_id);
1896                None
1897            }
1898        };
1899
1900        let Some(contents) = contents else {
1901            continue;
1902        };
1903
1904        if contents.verify_digests(digest).is_ok() {
1905            let size =
1906                bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1907            peer_heights
1908                .write()
1909                .unwrap()
1910                .record_success(peer_id, size, elapsed);
1911            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1912            store
1913                .insert_checkpoint_contents(checkpoint, verified_contents)
1914                .expect("store operation should not fail");
1915            return Some(contents);
1916        }
1917    }
1918    debug!("no peers had checkpoint contents");
1919    None
1920}
1921
1922async fn update_checkpoint_watermark_metrics<S>(
1923    mut recv: oneshot::Receiver<()>,
1924    store: S,
1925    metrics: Metrics,
1926) -> Result<()>
1927where
1928    S: WriteStore + Clone + Send + Sync,
1929{
1930    let mut interval = tokio::time::interval(Duration::from_secs(5));
1931    loop {
1932        tokio::select! {
1933             _now = interval.tick() => {
1934                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1935                    .expect("store operation should not fail");
1936                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1937                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1938                    .expect("store operation should not fail");
1939                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1940             },
1941            _ = &mut recv => break,
1942        }
1943    }
1944    Ok(())
1945}