sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::process_archive_checkpoint;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::object_store::util::{build_object_store, fetch_checkpoint};
97use sui_storage::verify_checkpoint;
98
99/// A handle to the StateSync subsystem.
100///
101/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
102/// dropped, the StateSync system will be gracefully shutdown.
103#[derive(Clone, Debug)]
104pub struct Handle {
105    sender: mpsc::Sender<StateSyncMessage>,
106    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107    metrics: Metrics,
108}
109
110impl Handle {
111    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
112    /// to other nodes on the network.
113    ///
114    /// # Invariant
115    ///
116    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
117    /// persistent storage. This includes CheckpointContents and all Transactions and
118    /// TransactionEffects included therein.
119    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
120        self.sender
121            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
122            .await
123            .unwrap()
124    }
125
126    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
127    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
128        self.checkpoint_event_sender.subscribe()
129    }
130
131    /// Returns the number of peers reported due to consistent state sync failures.
132    pub fn get_peers_reported_for_failure(&self) -> u64 {
133        self.metrics.get_peers_reported_for_failure()
134    }
135}
136
137pub(super) fn compute_adaptive_timeout(
138    tx_count: u64,
139    min_timeout: Duration,
140    max_timeout: Duration,
141) -> Duration {
142    const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
143    const JITTER_FRACTION: f64 = 0.1;
144
145    let max_timeout = max_timeout.max(min_timeout);
146    let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
147    let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
148    let base = min_timeout + extra;
149
150    let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
151    let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
152    Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
153}
154
155#[cfg_attr(test, derive(Debug))]
156pub(super) struct PeerScore {
157    successes: VecDeque<(Instant, u64 /* response_size_bytes */, Duration)>,
158    failures: VecDeque<Instant>,
159    window: Duration,
160    failure_rate: f64,
161    failing_since: Option<Instant>,
162}
163
164impl PeerScore {
165    const MAX_SAMPLES: usize = 20;
166    const MIN_SAMPLES_FOR_FAILURE: usize = 10;
167
168    pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
169        Self {
170            successes: VecDeque::new(),
171            failures: VecDeque::new(),
172            window,
173            failure_rate,
174            failing_since: None,
175        }
176    }
177
178    pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
179        let now = Instant::now();
180        self.successes.push_back((now, size, response_time));
181        while self.successes.len() > Self::MAX_SAMPLES {
182            self.successes.pop_front();
183        }
184        self.failing_since = None;
185    }
186
187    pub(super) fn record_failure(&mut self) {
188        let now = Instant::now();
189        self.failures.push_back(now);
190        while self.failures.len() > Self::MAX_SAMPLES {
191            self.failures.pop_front();
192        }
193    }
194
195    pub(super) fn is_failing(&self) -> bool {
196        let now = Instant::now();
197        let recent_failures = self
198            .failures
199            .iter()
200            .filter(|ts| now.duration_since(**ts) < self.window)
201            .count();
202        let recent_successes = self
203            .successes
204            .iter()
205            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
206            .count();
207
208        let total = recent_failures + recent_successes;
209        if total < Self::MIN_SAMPLES_FOR_FAILURE {
210            return false;
211        }
212
213        let rate = recent_failures as f64 / total as f64;
214        rate >= self.failure_rate
215    }
216
217    pub(super) fn update_failing_state(&mut self) {
218        if self.is_failing() && self.failing_since.is_none() {
219            self.failing_since = Some(Instant::now());
220        }
221    }
222
223    pub(super) fn reset_failing_since(&mut self) {
224        self.failing_since = None;
225    }
226
227    pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
228        match self.failing_since {
229            Some(since) => since.elapsed() >= threshold,
230            None => false,
231        }
232    }
233
234    pub(super) fn effective_throughput(&self) -> Option<f64> {
235        let now = Instant::now();
236        let (total_size, total_time) = self
237            .successes
238            .iter()
239            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
240            .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
241                (size + s, time + *d)
242            });
243
244        if total_size == 0 {
245            return None;
246        }
247
248        if total_time.is_zero() {
249            return None;
250        }
251
252        Some(total_size as f64 / total_time.as_secs_f64())
253    }
254}
255
256struct PeerHeights {
257    /// Table used to track the highest checkpoint for each of our peers.
258    peers: HashMap<PeerId, PeerStateSyncInfo>,
259    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
260    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
261    scores: HashMap<PeerId, PeerScore>,
262
263    wait_interval_when_no_peer_to_sync_content: Duration,
264    peer_scoring_window: Duration,
265    peer_failure_rate: f64,
266    checkpoint_content_timeout_min: Duration,
267    checkpoint_content_timeout_max: Duration,
268    exploration_probability: f64,
269}
270
271#[derive(Copy, Clone, Debug, PartialEq, Eq)]
272struct PeerStateSyncInfo {
273    /// The digest of the Peer's genesis checkpoint.
274    genesis_checkpoint_digest: CheckpointDigest,
275    /// Indicates if this Peer is on the same chain as us.
276    on_same_chain_as_us: bool,
277    /// Highest checkpoint sequence number we know of for this Peer.
278    height: CheckpointSequenceNumber,
279    /// lowest available checkpoint watermark for this Peer.
280    /// This defaults to 0 for now.
281    lowest: CheckpointSequenceNumber,
282}
283
284impl PeerHeights {
285    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
286        self.peers
287            .values()
288            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
289            .max()
290    }
291
292    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
293        self.peers
294            .iter()
295            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
296    }
297
298    // Returns a bool that indicates if the update was done successfully.
299    //
300    // This will return false if the given peer doesn't have an entry or is not on the same chain
301    // as us
302    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
303    pub fn update_peer_info(
304        &mut self,
305        peer_id: PeerId,
306        checkpoint: Checkpoint,
307        low_watermark: Option<CheckpointSequenceNumber>,
308    ) -> bool {
309        debug!("Update peer info");
310
311        let info = match self.peers.get_mut(&peer_id) {
312            Some(info) if info.on_same_chain_as_us => info,
313            _ => return false,
314        };
315
316        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
317        if let Some(low_watermark) = low_watermark {
318            info.lowest = low_watermark;
319        }
320        self.insert_checkpoint(checkpoint);
321
322        true
323    }
324
325    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
326    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
327        use std::collections::hash_map::Entry;
328        debug!("Insert peer info");
329
330        match self.peers.entry(peer_id) {
331            Entry::Occupied(mut entry) => {
332                // If there's already an entry and the genesis checkpoint digests match then update
333                // the maximum height. Otherwise we'll use the more recent one
334                let entry = entry.get_mut();
335                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
336                    entry.height = std::cmp::max(entry.height, info.height);
337                } else {
338                    *entry = info;
339                }
340            }
341            Entry::Vacant(entry) => {
342                entry.insert(info);
343            }
344        }
345    }
346
347    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
348        if let Some(info) = self.peers.get_mut(&peer_id) {
349            info.on_same_chain_as_us = false;
350        }
351        self.scores.remove(&peer_id);
352    }
353
354    /// Updates the peer's height without storing any checkpoint data.
355    /// Returns false if the peer doesn't exist or is not on the same chain.
356    pub fn update_peer_height(
357        &mut self,
358        peer_id: PeerId,
359        height: CheckpointSequenceNumber,
360        low_watermark: Option<CheckpointSequenceNumber>,
361    ) -> bool {
362        let info = match self.peers.get_mut(&peer_id) {
363            Some(info) if info.on_same_chain_as_us => info,
364            _ => return false,
365        };
366
367        info.height = std::cmp::max(height, info.height);
368        if let Some(low_watermark) = low_watermark {
369            info.lowest = low_watermark;
370        }
371
372        true
373    }
374
375    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
376        self.unprocessed_checkpoints
377            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
378        self.sequence_number_to_digest
379            .retain(|&s, _digest| s > sequence_number);
380    }
381
382    /// Inserts a checkpoint into the unprocessed checkpoints store.
383    /// Only one checkpoint per sequence number is stored. If a checkpoint with the same
384    /// sequence number but different digest already exists, the new one is dropped.
385    // TODO: also record who gives this checkpoint info for peer quality measurement?
386    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
387        let digest = *checkpoint.digest();
388        let sequence_number = *checkpoint.sequence_number();
389
390        // Check if we already have a checkpoint for this sequence number.
391        if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
392            if *existing_digest == digest {
393                // Same checkpoint, nothing to do.
394                return;
395            }
396            tracing::info!(
397                ?sequence_number,
398                ?existing_digest,
399                ?digest,
400                "received checkpoint with same sequence number but different digest, dropping new checkpoint"
401            );
402            return;
403        }
404
405        self.unprocessed_checkpoints.insert(digest, checkpoint);
406        self.sequence_number_to_digest
407            .insert(sequence_number, digest);
408    }
409
410    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
411        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
412            self.sequence_number_to_digest
413                .remove(checkpoint.sequence_number());
414        }
415    }
416
417    pub fn get_checkpoint_by_sequence_number(
418        &self,
419        sequence_number: CheckpointSequenceNumber,
420    ) -> Option<&Checkpoint> {
421        self.sequence_number_to_digest
422            .get(&sequence_number)
423            .and_then(|digest| self.get_checkpoint_by_digest(digest))
424    }
425
426    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
427        self.unprocessed_checkpoints.get(digest)
428    }
429
430    #[cfg(test)]
431    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
432        self.wait_interval_when_no_peer_to_sync_content = duration;
433    }
434
435    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
436        self.wait_interval_when_no_peer_to_sync_content
437    }
438
439    pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
440        if !self.peers.contains_key(&peer_id) {
441            return;
442        }
443        self.scores
444            .entry(peer_id)
445            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
446            .record_success(size, response_time);
447    }
448
449    pub fn record_failure(&mut self, peer_id: PeerId) {
450        if !self.peers.contains_key(&peer_id) {
451            return;
452        }
453        self.scores
454            .entry(peer_id)
455            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
456            .record_failure();
457    }
458
459    pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
460        self.scores
461            .get(peer_id)
462            .and_then(|s| s.effective_throughput())
463    }
464
465    pub fn is_failing(&self, peer_id: &PeerId) -> bool {
466        self.scores
467            .get(peer_id)
468            .map(|s| s.is_failing())
469            .unwrap_or(false)
470    }
471
472    pub fn update_failing_states(&mut self) {
473        for score in self.scores.values_mut() {
474            score.update_failing_state();
475        }
476    }
477
478    pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
479        self.scores
480            .iter()
481            .filter(|(peer_id, score)| {
482                score.consistently_failing(threshold)
483                    && self
484                        .peers
485                        .get(peer_id)
486                        .is_some_and(|info| info.on_same_chain_as_us)
487            })
488            .max_by_key(|(_, score)| {
489                score
490                    .failing_since
491                    .map(|since| since.elapsed())
492                    .unwrap_or(Duration::ZERO)
493            })
494            .map(|(peer_id, _)| *peer_id)
495    }
496}
497
498// PeerBalancer selects peers using weighted random selection:
499// - Most of the time: select from known peers, weighted by throughput
500// - With configured probability: select from unknown peers (to explore)
501// - Failing peers: only as last resort
502#[derive(Clone)]
503struct PeerBalancer {
504    known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
505    unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
506    failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
507    requested_checkpoint: Option<CheckpointSequenceNumber>,
508    request_type: PeerCheckpointRequestType,
509    exploration_probability: f64,
510}
511
512#[derive(Clone)]
513enum PeerCheckpointRequestType {
514    Summary,
515    Content,
516}
517
518impl PeerBalancer {
519    pub fn new(
520        network: &anemo::Network,
521        peer_heights: Arc<RwLock<PeerHeights>>,
522        request_type: PeerCheckpointRequestType,
523    ) -> Self {
524        let peer_heights_guard = peer_heights.read().unwrap();
525
526        let mut known_peers = Vec::new();
527        let mut unknown_peers = Vec::new();
528        let mut failing_peers = Vec::new();
529        let exploration_probability = peer_heights_guard.exploration_probability;
530
531        for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
532            let Some(peer) = network.peer(*peer_id) else {
533                continue;
534            };
535            let rtt_secs = peer.connection_rtt().as_secs_f64();
536
537            if peer_heights_guard.is_failing(peer_id) {
538                failing_peers.push((peer, *info));
539            } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
540                known_peers.push((peer, *info, throughput));
541            } else {
542                unknown_peers.push((peer, *info, rtt_secs));
543            }
544        }
545        drop(peer_heights_guard);
546
547        unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
548            rtt_a
549                .partial_cmp(rtt_b)
550                .unwrap_or(std::cmp::Ordering::Equal)
551        });
552
553        Self {
554            known_peers,
555            unknown_peers,
556            failing_peers,
557            requested_checkpoint: None,
558            request_type,
559            exploration_probability,
560        }
561    }
562
563    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
564        self.requested_checkpoint = Some(checkpoint);
565        self
566    }
567
568    fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
569        let requested = self.requested_checkpoint.unwrap_or(0);
570        match &self.request_type {
571            PeerCheckpointRequestType::Summary => info.height >= requested,
572            PeerCheckpointRequestType::Content => {
573                info.height >= requested && info.lowest <= requested
574            }
575        }
576    }
577
578    fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
579        let eligible: Vec<_> = self
580            .known_peers
581            .iter()
582            .enumerate()
583            .filter(|(_, (_, info, _))| self.is_eligible(info))
584            .map(|(i, (_, _, t))| (i, *t))
585            .collect();
586
587        if eligible.is_empty() {
588            return None;
589        }
590
591        let total: f64 = eligible.iter().map(|(_, t)| t).sum();
592        let mut pick = rand::thread_rng().gen_range(0.0..total);
593
594        for (idx, throughput) in &eligible {
595            pick -= throughput;
596            if pick <= 0.0 {
597                let (peer, info, _) = self.known_peers.remove(*idx);
598                return Some((peer, info));
599            }
600        }
601
602        let (idx, _) = eligible.last().unwrap();
603        let (peer, info, _) = self.known_peers.remove(*idx);
604        Some((peer, info))
605    }
606
607    fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
608        let pos = self
609            .unknown_peers
610            .iter()
611            .position(|(_, info, _)| self.is_eligible(info))?;
612        let (peer, info, _) = self.unknown_peers.remove(pos);
613        Some((peer, info))
614    }
615
616    fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
617        let pos = self
618            .failing_peers
619            .iter()
620            .position(|(_, info)| self.is_eligible(info))?;
621        Some(self.failing_peers.remove(pos))
622    }
623}
624
625impl Iterator for PeerBalancer {
626    type Item = StateSyncClient<anemo::Peer>;
627
628    fn next(&mut self) -> Option<Self::Item> {
629        let has_eligible_known = self
630            .known_peers
631            .iter()
632            .any(|(_, info, _)| self.is_eligible(info));
633        let has_eligible_unknown = self
634            .unknown_peers
635            .iter()
636            .any(|(_, info, _)| self.is_eligible(info));
637
638        let explore = has_eligible_unknown
639            && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
640
641        if explore && let Some((peer, _)) = self.select_by_rtt() {
642            return Some(StateSyncClient::new(peer));
643        }
644
645        if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
646            return Some(StateSyncClient::new(peer));
647        }
648
649        if let Some((peer, _)) = self.select_failing() {
650            return Some(StateSyncClient::new(peer));
651        }
652
653        None
654    }
655}
656
657#[derive(Clone, Debug)]
658enum StateSyncMessage {
659    StartSyncJob,
660    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
661    // of the new checkpoint.
662    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
663    // Notification that the checkpoint content sync task will send to the event loop in the event
664    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
665    // synced at the same time, only the highest checkpoint is sent.
666    SyncedCheckpoint(Box<VerifiedCheckpoint>),
667}
668
669struct StateSyncEventLoop<S> {
670    config: StateSyncConfig,
671
672    mailbox: mpsc::Receiver<StateSyncMessage>,
673    /// Weak reference to our own mailbox
674    weak_sender: mpsc::WeakSender<StateSyncMessage>,
675
676    tasks: JoinSet<()>,
677    sync_checkpoint_summaries_task: Option<AbortHandle>,
678    sync_checkpoint_contents_task: Option<AbortHandle>,
679    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
680
681    store: S,
682    peer_heights: Arc<RwLock<PeerHeights>>,
683    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
684    network: anemo::Network,
685    metrics: Metrics,
686
687    sync_checkpoint_from_archive_task: Option<AbortHandle>,
688    archive_config: Option<ArchiveReaderConfig>,
689    discovery_sender: Option<crate::discovery::Sender>,
690}
691
692impl<S> StateSyncEventLoop<S>
693where
694    S: WriteStore + Clone + Send + Sync + 'static,
695{
696    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
697    // and that the only "await" points are from the select macro picking which event to handle.
698    // This ensures that the event loop is able to process events at a high speed and reduce the
699    // chance for building up a backlog of events to process.
700    pub async fn start(mut self) {
701        info!("State-Synchronizer started");
702
703        self.config.pinned_checkpoints.sort();
704
705        let mut interval = tokio::time::interval(self.config.interval_period());
706        let mut peer_events = {
707            let (subscriber, peers) = self.network.subscribe().unwrap();
708            for peer_id in peers {
709                self.spawn_get_latest_from_peer(peer_id);
710            }
711            subscriber
712        };
713        let (
714            target_checkpoint_contents_sequence_sender,
715            target_checkpoint_contents_sequence_receiver,
716        ) = watch::channel(0);
717
718        // Spawn tokio task to update metrics periodically in the background
719        let (_sender, receiver) = oneshot::channel();
720        tokio::spawn(update_checkpoint_watermark_metrics(
721            receiver,
722            self.store.clone(),
723            self.metrics.clone(),
724        ));
725
726        // Start checkpoint contents sync loop.
727        let task = sync_checkpoint_contents(
728            self.network.clone(),
729            self.store.clone(),
730            self.peer_heights.clone(),
731            self.weak_sender.clone(),
732            self.checkpoint_event_sender.clone(),
733            self.config.checkpoint_content_download_concurrency(),
734            self.config.checkpoint_content_download_tx_concurrency(),
735            self.config.use_get_checkpoint_contents_v2(),
736            target_checkpoint_contents_sequence_receiver,
737        );
738        let task_handle = self.tasks.spawn(task);
739        self.sync_checkpoint_contents_task = Some(task_handle);
740
741        // Start archive based checkpoint content sync loop.
742        // TODO: Consider switching to sync from archive only on startup.
743        // Right now because the peer set is fixed at startup, a node may eventually
744        // end up with peers who have all purged their local state. In such a scenario it will be
745        // stuck until restart when it ends up with a different set of peers. Once the discovery
746        // mechanism can dynamically identify and connect to other peers on the network, we will rely
747        // on sync from archive as a fall back.
748        let task = sync_checkpoint_contents_from_archive(
749            self.network.clone(),
750            self.archive_config.clone(),
751            self.store.clone(),
752            self.peer_heights.clone(),
753            self.metrics.clone(),
754        );
755        let task_handle = self.tasks.spawn(task);
756        self.sync_checkpoint_from_archive_task = Some(task_handle);
757
758        // Start main loop.
759        loop {
760            tokio::select! {
761                now = interval.tick() => {
762                    self.handle_tick(now.into_std());
763                },
764                maybe_message = self.mailbox.recv() => {
765                    // Once all handles to our mailbox have been dropped this
766                    // will yield `None` and we can terminate the event loop
767                    if let Some(message) = maybe_message {
768                        self.handle_message(message);
769                    } else {
770                        break;
771                    }
772                },
773                peer_event = peer_events.recv() => {
774                    self.handle_peer_event(peer_event);
775                },
776                Some(task_result) = self.tasks.join_next() => {
777                    match task_result {
778                        Ok(()) => {},
779                        Err(e) => {
780                            if e.is_cancelled() {
781                                // avoid crashing on ungraceful shutdown
782                            } else if e.is_panic() {
783                                // propagate panics.
784                                std::panic::resume_unwind(e.into_panic());
785                            } else {
786                                panic!("task failed: {e}");
787                            }
788                        },
789                    };
790
791                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
792                        panic!("sync_checkpoint_contents task unexpectedly terminated")
793                    }
794
795                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
796                        self.sync_checkpoint_summaries_task = None;
797                    }
798
799                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
800                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
801                    }
802                },
803            }
804
805            self.maybe_start_checkpoint_summary_sync_task();
806            self.maybe_trigger_checkpoint_contents_sync_task(
807                &target_checkpoint_contents_sequence_sender,
808            );
809        }
810
811        info!("State-Synchronizer ended");
812    }
813
814    fn handle_message(&mut self, message: StateSyncMessage) {
815        debug!("Received message: {:?}", message);
816        match message {
817            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
818            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
819                self.handle_checkpoint_from_consensus(checkpoint)
820            }
821            // After we've successfully synced a checkpoint we can notify our peers
822            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
823                self.spawn_notify_peers_of_checkpoint(*checkpoint)
824            }
825        }
826    }
827
828    // Handle a checkpoint that we received from consensus
829    #[instrument(level = "debug", skip_all)]
830    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
831        // Always check previous_digest matches in case there is a gap between
832        // state sync and consensus.
833        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
834            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
835            .digest();
836        if checkpoint.previous_digest != Some(prev_digest) {
837            panic!(
838                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
839                checkpoint.sequence_number(),
840                Some(prev_digest),
841                checkpoint.previous_digest
842            );
843        }
844
845        let latest_checkpoint = self
846            .store
847            .get_highest_verified_checkpoint()
848            .expect("store operation should not fail");
849
850        // If this is an older checkpoint, just ignore it
851        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
852            return;
853        }
854
855        let checkpoint = *checkpoint;
856        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
857        if *checkpoint.sequence_number() > next_sequence_number {
858            debug!(
859                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
860                next_sequence_number,
861                checkpoint.sequence_number()
862            );
863        }
864
865        // Because checkpoint from consensus sends in order, when we have checkpoint n,
866        // we must have all of the checkpoints before n from either state sync or consensus.
867        #[cfg(debug_assertions)]
868        {
869            let _ = (next_sequence_number..=*checkpoint.sequence_number())
870                .map(|n| {
871                    let checkpoint = self
872                        .store
873                        .get_checkpoint_by_sequence_number(n)
874                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
875                    self.store
876                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
877                        .unwrap_or_else(|| {
878                            panic!(
879                                "store should contain checkpoint contents for {:?}",
880                                checkpoint.content_digest
881                            )
882                        });
883                })
884                .collect::<Vec<_>>();
885        }
886
887        // If this is the last checkpoint of a epoch, we need to make sure
888        // new committee is in store before we verify newer checkpoints in next epoch.
889        // This could happen before this validator's reconfiguration finishes, because
890        // state sync does not reconfig.
891        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
892        // committee insertion in two places (only in `insert_checkpoint`).
893        if let Some(EndOfEpochData {
894            next_epoch_committee,
895            ..
896        }) = checkpoint.end_of_epoch_data.as_ref()
897        {
898            let next_committee = next_epoch_committee.iter().cloned().collect();
899            let committee =
900                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
901            self.store
902                .insert_committee(committee)
903                .expect("store operation should not fail");
904        }
905
906        self.store
907            .update_highest_verified_checkpoint(&checkpoint)
908            .expect("store operation should not fail");
909        self.store
910            .update_highest_synced_checkpoint(&checkpoint)
911            .expect("store operation should not fail");
912
913        // We don't care if no one is listening as this is a broadcast channel
914        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
915
916        self.spawn_notify_peers_of_checkpoint(checkpoint);
917    }
918
919    fn handle_peer_event(
920        &mut self,
921        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
922    ) {
923        use tokio::sync::broadcast::error::RecvError;
924
925        match peer_event {
926            Ok(PeerEvent::NewPeer(peer_id)) => {
927                self.spawn_get_latest_from_peer(peer_id);
928            }
929            Ok(PeerEvent::LostPeer(peer_id, _)) => {
930                let mut heights = self.peer_heights.write().unwrap();
931                heights.peers.remove(&peer_id);
932                heights.scores.remove(&peer_id);
933            }
934
935            Err(RecvError::Closed) => {
936                panic!("PeerEvent channel shouldn't be able to be closed");
937            }
938
939            Err(RecvError::Lagged(_)) => {
940                trace!("State-Sync fell behind processing PeerEvents");
941            }
942        }
943    }
944
945    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
946        if let Some(peer) = self.network.peer(peer_id) {
947            let genesis_checkpoint_digest = *self
948                .store
949                .get_checkpoint_by_sequence_number(0)
950                .expect("store should contain genesis checkpoint")
951                .digest();
952            let task = get_latest_from_peer(
953                genesis_checkpoint_digest,
954                peer,
955                self.peer_heights.clone(),
956                self.config.timeout(),
957            );
958            self.tasks.spawn(task);
959        }
960    }
961
962    fn handle_tick(&mut self, _now: std::time::Instant) {
963        let task = query_peers_for_their_latest_checkpoint(
964            self.network.clone(),
965            self.peer_heights.clone(),
966            self.weak_sender.clone(),
967            self.config.timeout(),
968        );
969        self.tasks.spawn(task);
970
971        if let Some(layer) = self.download_limit_layer.as_ref() {
972            layer.maybe_prune_map();
973        }
974
975        self.maybe_report_failing_peer();
976    }
977
978    fn maybe_report_failing_peer(&mut self) {
979        let threshold = self.config.peer_disconnect_threshold();
980        if threshold.is_zero() {
981            return;
982        }
983
984        let mut peer_heights = self.peer_heights.write().unwrap();
985        peer_heights.update_failing_states();
986
987        let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
988            return;
989        };
990        // Reset the failing clock so this peer isn't re-reported on the next tick.
991        // If the peer is still failing, update_failing_states will restart the clock
992        // and the peer must fail for another full threshold before being reported again.
993        // Discovery may decline to disconnect (e.g. too few peers), so this also
994        // prevents spamming reports every tick in that case.
995        if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
996            score.reset_failing_since();
997        }
998        drop(peer_heights);
999
1000        info!("reporting peer {peer_id} for consistent state sync failures");
1001        if let Some(sender) = &self.discovery_sender {
1002            sender.report_peer_failure(peer_id);
1003        } else {
1004            let _ = self.network.disconnect(peer_id);
1005        }
1006        self.metrics.inc_peers_reported_for_failure();
1007    }
1008
1009    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1010        // Only run one sync task at a time
1011        if self.sync_checkpoint_summaries_task.is_some() {
1012            return;
1013        }
1014
1015        let highest_processed_checkpoint = self
1016            .store
1017            .get_highest_verified_checkpoint()
1018            .expect("store operation should not fail");
1019
1020        let highest_known_sequence_number = self
1021            .peer_heights
1022            .read()
1023            .unwrap()
1024            .highest_known_checkpoint_sequence_number();
1025
1026        if let Some(target_seq) = highest_known_sequence_number
1027            && *highest_processed_checkpoint.sequence_number() < target_seq
1028        {
1029            // Limit the per-sync batch size according to config.
1030            let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1031            let limited_target = std::cmp::min(
1032                target_seq,
1033                highest_processed_checkpoint
1034                    .sequence_number()
1035                    .saturating_add(max_batch_size),
1036            );
1037            let was_limited = limited_target < target_seq;
1038
1039            // Start sync job.
1040            let weak_sender = self.weak_sender.clone();
1041            let task = sync_to_checkpoint(
1042                self.network.clone(),
1043                self.store.clone(),
1044                self.peer_heights.clone(),
1045                self.metrics.clone(),
1046                self.config.pinned_checkpoints.clone(),
1047                self.config.checkpoint_header_download_concurrency(),
1048                self.config.timeout(),
1049                limited_target,
1050            )
1051            .map(move |result| match result {
1052                Ok(()) => {
1053                    // If we limited the sync range, immediately trigger
1054                    // another sync to continue catching up.
1055                    if was_limited && let Some(sender) = weak_sender.upgrade() {
1056                        let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1057                    }
1058                }
1059                Err(e) => {
1060                    debug!("error syncing checkpoint {e}");
1061                }
1062            });
1063            let task_handle = self.tasks.spawn(task);
1064            self.sync_checkpoint_summaries_task = Some(task_handle);
1065        }
1066    }
1067
1068    fn maybe_trigger_checkpoint_contents_sync_task(
1069        &mut self,
1070        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1071    ) {
1072        let highest_verified_checkpoint = self
1073            .store
1074            .get_highest_verified_checkpoint()
1075            .expect("store operation should not fail");
1076        let highest_synced_checkpoint = self
1077            .store
1078            .get_highest_synced_checkpoint()
1079            .expect("store operation should not fail");
1080
1081        if highest_verified_checkpoint.sequence_number()
1082            > highest_synced_checkpoint.sequence_number()
1083            // skip if we aren't connected to any peers that can help
1084            && self
1085                .peer_heights
1086                .read()
1087                .unwrap()
1088                .highest_known_checkpoint_sequence_number()
1089                > Some(*highest_synced_checkpoint.sequence_number())
1090        {
1091            let _ = target_sequence_channel.send_if_modified(|num| {
1092                let new_num = *highest_verified_checkpoint.sequence_number();
1093                if *num == new_num {
1094                    return false;
1095                }
1096                *num = new_num;
1097                true
1098            });
1099        }
1100    }
1101
1102    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1103        let task = notify_peers_of_checkpoint(
1104            self.network.clone(),
1105            self.peer_heights.clone(),
1106            checkpoint,
1107            self.config.timeout(),
1108        );
1109        self.tasks.spawn(task);
1110    }
1111}
1112
1113async fn notify_peers_of_checkpoint(
1114    network: anemo::Network,
1115    peer_heights: Arc<RwLock<PeerHeights>>,
1116    checkpoint: VerifiedCheckpoint,
1117    timeout: Duration,
1118) {
1119    let futs = peer_heights
1120        .read()
1121        .unwrap()
1122        .peers_on_same_chain()
1123        // Filter out any peers who we know already have a checkpoint higher than this one
1124        .filter_map(|(peer_id, info)| {
1125            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1126        })
1127        // Filter out any peers who we aren't connected with
1128        .flat_map(|peer_id| network.peer(*peer_id))
1129        .map(StateSyncClient::new)
1130        .map(|mut client| {
1131            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1132            async move { client.push_checkpoint_summary(request).await }
1133        })
1134        .collect::<Vec<_>>();
1135    futures::future::join_all(futs).await;
1136}
1137
1138async fn get_latest_from_peer(
1139    our_genesis_checkpoint_digest: CheckpointDigest,
1140    peer: anemo::Peer,
1141    peer_heights: Arc<RwLock<PeerHeights>>,
1142    timeout: Duration,
1143) {
1144    let peer_id = peer.peer_id();
1145    let mut client = StateSyncClient::new(peer);
1146
1147    let info = {
1148        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1149
1150        if let Some(info) = maybe_info {
1151            info
1152        } else {
1153            // TODO do we want to create a new API just for querying a node's chainid?
1154            //
1155            // We need to query this node's genesis checkpoint to see if they're on the same chain
1156            // as us
1157            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1158                .with_timeout(timeout);
1159            let response = client
1160                .get_checkpoint_summary(request)
1161                .await
1162                .map(Response::into_inner);
1163
1164            let info = match response {
1165                Ok(Some(checkpoint)) => {
1166                    let digest = *checkpoint.digest();
1167                    PeerStateSyncInfo {
1168                        genesis_checkpoint_digest: digest,
1169                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1170                        height: *checkpoint.sequence_number(),
1171                        lowest: CheckpointSequenceNumber::default(),
1172                    }
1173                }
1174                Ok(None) => PeerStateSyncInfo {
1175                    genesis_checkpoint_digest: CheckpointDigest::default(),
1176                    on_same_chain_as_us: false,
1177                    height: CheckpointSequenceNumber::default(),
1178                    lowest: CheckpointSequenceNumber::default(),
1179                },
1180                Err(status) => {
1181                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
1182                    return;
1183                }
1184            };
1185            peer_heights
1186                .write()
1187                .unwrap()
1188                .insert_peer_info(peer_id, info);
1189            info
1190        }
1191    };
1192
1193    // Bail early if this node isn't on the same chain as us
1194    if !info.on_same_chain_as_us {
1195        trace!(?info, "Peer {peer_id} not on same chain as us");
1196        return;
1197    }
1198    let Some((highest_checkpoint, low_watermark)) =
1199        query_peer_for_latest_info(&mut client, timeout).await
1200    else {
1201        return;
1202    };
1203    peer_heights
1204        .write()
1205        .unwrap()
1206        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1207}
1208
1209/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark
1210async fn query_peer_for_latest_info(
1211    client: &mut StateSyncClient<anemo::Peer>,
1212    timeout: Duration,
1213) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1214    let request = Request::new(()).with_timeout(timeout);
1215    let response = client
1216        .get_checkpoint_availability(request)
1217        .await
1218        .map(Response::into_inner);
1219    match response {
1220        Ok(GetCheckpointAvailabilityResponse {
1221            highest_synced_checkpoint,
1222            lowest_available_checkpoint,
1223        }) => {
1224            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1225        }
1226        Err(status) => {
1227            // If peer hasn't upgraded they would return 404 NotFound error
1228            if status.status() != anemo::types::response::StatusCode::NotFound {
1229                trace!("get_checkpoint_availability request failed: {status:?}");
1230                return None;
1231            }
1232        }
1233    };
1234
1235    // Then we try the old query
1236    // TODO: remove this once the new feature stabilizes
1237    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1238    let response = client
1239        .get_checkpoint_summary(request)
1240        .await
1241        .map(Response::into_inner);
1242    match response {
1243        Ok(Some(checkpoint)) => Some((checkpoint, None)),
1244        Ok(None) => None,
1245        Err(status) => {
1246            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1247            None
1248        }
1249    }
1250}
1251
1252#[instrument(level = "debug", skip_all)]
1253async fn query_peers_for_their_latest_checkpoint(
1254    network: anemo::Network,
1255    peer_heights: Arc<RwLock<PeerHeights>>,
1256    sender: mpsc::WeakSender<StateSyncMessage>,
1257    timeout: Duration,
1258) {
1259    let peer_heights = &peer_heights;
1260    let futs = peer_heights
1261        .read()
1262        .unwrap()
1263        .peers_on_same_chain()
1264        // Filter out any peers who we aren't connected with
1265        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1266        .map(|peer| {
1267            let peer_id = peer.peer_id();
1268            let mut client = StateSyncClient::new(peer);
1269
1270            async move {
1271                let response = query_peer_for_latest_info(&mut client, timeout).await;
1272                match response {
1273                    Some((highest_checkpoint, low_watermark)) => peer_heights
1274                        .write()
1275                        .unwrap()
1276                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1277                        .then_some(highest_checkpoint),
1278                    None => None,
1279                }
1280            }
1281        })
1282        .collect::<Vec<_>>();
1283
1284    debug!("Query {} peers for latest checkpoint", futs.len());
1285
1286    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1287
1288    let highest_checkpoint_seq = checkpoints
1289        .map(|checkpoint| *checkpoint.sequence_number())
1290        .max();
1291
1292    let our_highest_seq = peer_heights
1293        .read()
1294        .unwrap()
1295        .highest_known_checkpoint_sequence_number();
1296
1297    debug!(
1298        "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1299    );
1300
1301    let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1302        (Some(theirs), None) => theirs,
1303        (Some(theirs), Some(ours)) if theirs > ours => theirs,
1304        _ => return,
1305    };
1306
1307    if let Some(sender) = sender.upgrade() {
1308        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1309    }
1310}
1311
1312async fn sync_to_checkpoint<S>(
1313    network: anemo::Network,
1314    store: S,
1315    peer_heights: Arc<RwLock<PeerHeights>>,
1316    metrics: Metrics,
1317    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1318    checkpoint_header_download_concurrency: usize,
1319    timeout: Duration,
1320    target_sequence_number: CheckpointSequenceNumber,
1321) -> Result<()>
1322where
1323    S: WriteStore,
1324{
1325    metrics.set_highest_known_checkpoint(target_sequence_number);
1326
1327    let mut current = store
1328        .get_highest_verified_checkpoint()
1329        .expect("store operation should not fail");
1330    if *current.sequence_number() >= target_sequence_number {
1331        return Err(anyhow::anyhow!(
1332            "target checkpoint {} is older than highest verified checkpoint {}",
1333            target_sequence_number,
1334            current.sequence_number(),
1335        ));
1336    }
1337
1338    let peer_balancer = PeerBalancer::new(
1339        &network,
1340        peer_heights.clone(),
1341        PeerCheckpointRequestType::Summary,
1342    );
1343    // range of the next sequence_numbers to fetch
1344    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1345        ..=target_sequence_number)
1346        .map(|next| {
1347            let peers = peer_balancer.clone().with_checkpoint(next);
1348            let peer_heights = peer_heights.clone();
1349            let pinned_checkpoints = &pinned_checkpoints;
1350            async move {
1351                if let Some(checkpoint) = peer_heights
1352                    .read()
1353                    .unwrap()
1354                    .get_checkpoint_by_sequence_number(next)
1355                {
1356                    return (Some(checkpoint.to_owned()), next, None);
1357                }
1358
1359                // Iterate through peers trying each one in turn until we're able to
1360                // successfully get the target checkpoint.
1361                for mut peer in peers {
1362                    let peer_id = peer.inner().peer_id();
1363                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1364                        .with_timeout(timeout);
1365                    let start = Instant::now();
1366                    let result = peer.get_checkpoint_summary(request).await;
1367                    let elapsed = start.elapsed();
1368
1369                    let checkpoint = match result {
1370                        Ok(response) => match response.into_inner() {
1371                            Some(cp) => Some(cp),
1372                            None => {
1373                                trace!("peer unable to help sync");
1374                                peer_heights.write().unwrap().record_failure(peer_id);
1375                                None
1376                            }
1377                        },
1378                        Err(e) => {
1379                            trace!("{e:?}");
1380                            peer_heights.write().unwrap().record_failure(peer_id);
1381                            None
1382                        }
1383                    };
1384
1385                    let Some(checkpoint) = checkpoint else {
1386                        continue;
1387                    };
1388
1389                    let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1390                    peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1391
1392                    // peer didn't give us a checkpoint with the height that we requested
1393                    if *checkpoint.sequence_number() != next {
1394                        tracing::debug!(
1395                            "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1396                            checkpoint.sequence_number()
1397                        );
1398                        peer_heights
1399                            .write()
1400                            .unwrap()
1401                            .mark_peer_as_not_on_same_chain(peer_id);
1402                        continue;
1403                    }
1404
1405                    // peer gave us a checkpoint whose digest does not match pinned digest
1406                    let checkpoint_digest = checkpoint.digest();
1407                    if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1408                        checkpoint.sequence_number(),
1409                        |(seq_num, _digest)| *seq_num
1410                    )
1411                        && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1412                    {
1413                        tracing::debug!(
1414                            "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1415                            pinned_checkpoints[pinned_digest_index].1,
1416                            checkpoint_digest
1417                        );
1418                        peer_heights
1419                            .write()
1420                            .unwrap()
1421                            .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1422                        continue;
1423                    }
1424
1425                    // Insert in our store in the event that things fail and we need to retry
1426                    peer_heights
1427                        .write()
1428                        .unwrap()
1429                        .insert_checkpoint(checkpoint.clone());
1430                    return (Some(checkpoint), next, Some(peer_id));
1431                }
1432                (None, next, None)
1433            }
1434        })
1435        .pipe(futures::stream::iter)
1436        .buffered(checkpoint_header_download_concurrency);
1437
1438    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1439        assert_eq!(
1440            current
1441                .sequence_number()
1442                .checked_add(1)
1443                .expect("exhausted u64"),
1444            next
1445        );
1446
1447        // Verify the checkpoint
1448        let checkpoint = 'cp: {
1449            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1450                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1451            })?;
1452            // Skip verification for manually pinned checkpoints.
1453            if pinned_checkpoints
1454                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1455                .is_ok()
1456            {
1457                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1458            }
1459            match verify_checkpoint(&current, &store, checkpoint) {
1460                Ok(verified_checkpoint) => verified_checkpoint,
1461                Err(checkpoint) => {
1462                    let mut peer_heights = peer_heights.write().unwrap();
1463                    // Remove the checkpoint from our temporary store so that we can try querying
1464                    // another peer for a different one
1465                    peer_heights.remove_checkpoint(checkpoint.digest());
1466
1467                    // Mark peer as not on the same chain as us
1468                    if let Some(peer_id) = maybe_peer_id {
1469                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1470                    }
1471
1472                    return Err(anyhow::anyhow!(
1473                        "unable to verify checkpoint {checkpoint:?}"
1474                    ));
1475                }
1476            }
1477        };
1478
1479        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1480        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1481            metrics.checkpoint_summary_age_metrics()
1482        {
1483            checkpoint.report_checkpoint_age(
1484                checkpoint_summary_age_metric,
1485                checkpoint_summary_age_metric_deprecated,
1486            );
1487        }
1488
1489        // Insert the newly verified checkpoint into our store, which will bump our highest
1490        // verified checkpoint watermark as well.
1491        store
1492            .insert_checkpoint(&checkpoint)
1493            .expect("store operation should not fail");
1494
1495        current = checkpoint;
1496    }
1497
1498    peer_heights
1499        .write()
1500        .unwrap()
1501        .cleanup_old_checkpoints(*current.sequence_number());
1502
1503    Ok(())
1504}
1505
1506async fn sync_checkpoint_contents_from_archive<S>(
1507    network: anemo::Network,
1508    archive_config: Option<ArchiveReaderConfig>,
1509    store: S,
1510    peer_heights: Arc<RwLock<PeerHeights>>,
1511    metrics: Metrics,
1512) where
1513    S: WriteStore + Clone + Send + Sync + 'static,
1514{
1515    loop {
1516        sync_checkpoint_contents_from_archive_iteration(
1517            &network,
1518            &archive_config,
1519            store.clone(),
1520            peer_heights.clone(),
1521            metrics.clone(),
1522        )
1523        .await;
1524        tokio::time::sleep(Duration::from_secs(5)).await;
1525    }
1526}
1527
1528async fn sync_checkpoint_contents_from_archive_iteration<S>(
1529    network: &anemo::Network,
1530    archive_config: &Option<ArchiveReaderConfig>,
1531    store: S,
1532    peer_heights: Arc<RwLock<PeerHeights>>,
1533    metrics: Metrics,
1534) where
1535    S: WriteStore + Clone + Send + Sync + 'static,
1536{
1537    let peers: Vec<_> = peer_heights
1538        .read()
1539        .unwrap()
1540        .peers_on_same_chain()
1541        // Filter out any peers who we aren't connected with.
1542        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1543        .collect();
1544    let lowest_checkpoint_on_peers = peers
1545        .iter()
1546        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1547        .min();
1548    let highest_synced = store
1549        .get_highest_synced_checkpoint()
1550        .expect("store operation should not fail")
1551        .sequence_number;
1552    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1553        highest_synced < lowest_checkpoint_on_peers
1554    } else {
1555        false
1556    };
1557    debug!(
1558        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1559        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1560    );
1561    if sync_from_archive {
1562        let start = highest_synced
1563            .checked_add(1)
1564            .expect("Checkpoint seq num overflow");
1565        let end = lowest_checkpoint_on_peers.unwrap();
1566
1567        let Some(archive_config) = archive_config else {
1568            warn!("Failed to find an archive reader to complete the state sync request");
1569            return;
1570        };
1571        let Some(ingestion_url) = &archive_config.ingestion_url else {
1572            warn!("Archival ingestion url for state sync is not configured");
1573            return;
1574        };
1575        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1576            warn!("{} can't be used as an archival fallback", ingestion_url);
1577            return;
1578        }
1579        let obj_store =
1580            build_object_store(ingestion_url, archive_config.remote_store_options.clone());
1581        let mut checkpoint_stream = futures::stream::iter(start..=end)
1582            .map(|seq| {
1583                let obj_store = obj_store.clone();
1584                async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1585            })
1586            .buffered(archive_config.download_concurrency.get());
1587
1588        while let Some((seq, result)) = checkpoint_stream.next().await {
1589            let checkpoint = match result {
1590                Ok(checkpoint) => checkpoint,
1591                Err(err) => {
1592                    warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1593                    return;
1594                }
1595            };
1596            if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1597                warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1598                return;
1599            }
1600        }
1601    }
1602}
1603
1604async fn sync_checkpoint_contents<S>(
1605    network: anemo::Network,
1606    store: S,
1607    peer_heights: Arc<RwLock<PeerHeights>>,
1608    sender: mpsc::WeakSender<StateSyncMessage>,
1609    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1610    checkpoint_content_download_concurrency: usize,
1611    checkpoint_content_download_tx_concurrency: u64,
1612    use_get_checkpoint_contents_v2: bool,
1613    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1614) where
1615    S: WriteStore + Clone,
1616{
1617    let mut highest_synced = store
1618        .get_highest_synced_checkpoint()
1619        .expect("store operation should not fail");
1620
1621    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1622    let mut target_sequence_cursor = 0;
1623    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1624    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1625
1626    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1627
1628    loop {
1629        tokio::select! {
1630            result = target_sequence_channel.changed() => {
1631                match result {
1632                    Ok(()) => {
1633                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1634                    }
1635                    Err(_) => {
1636                        // Watch channel is closed, exit loop.
1637                        return
1638                    }
1639                }
1640            },
1641            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1642                match maybe_checkpoint {
1643                    Ok(checkpoint) => {
1644                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1645
1646                        store
1647                            .update_highest_synced_checkpoint(&checkpoint)
1648                            .expect("store operation should not fail");
1649                        // We don't care if no one is listening as this is a broadcast channel
1650                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1651                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1652                        highest_synced = checkpoint;
1653
1654                    }
1655                    Err(checkpoint) => {
1656                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1657                        if let Some(lowest_peer_checkpoint) =
1658                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1659                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1660                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1661                            }
1662                        } else {
1663                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1664
1665                        }
1666                        // Calculate tx_count for retry by getting previous checkpoint
1667                        let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1668                            checkpoint.network_total_transactions
1669                        } else {
1670                            let prev = store
1671                                .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1672                                .expect("previous checkpoint must exist")
1673                                .network_total_transactions;
1674                            checkpoint.network_total_transactions - prev
1675                        };
1676                        // Retry contents sync on failure.
1677                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1678                            network.clone(),
1679                            &store,
1680                            peer_heights.clone(),
1681                            use_get_checkpoint_contents_v2,
1682                            retry_tx_count,
1683                            checkpoint,
1684                        ));
1685                    }
1686                }
1687            },
1688        }
1689
1690        // Start new tasks up to configured concurrency limits.
1691        while current_sequence < target_sequence_cursor
1692            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1693        {
1694            let next_checkpoint = store
1695                .get_checkpoint_by_sequence_number(current_sequence)
1696                .unwrap_or_else(|| panic!(
1697                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1698                    current_sequence
1699                ));
1700
1701            // Enforce transaction count concurrency limit.
1702            let tx_count = next_checkpoint.network_total_transactions
1703                - highest_started_network_total_transactions;
1704            if tx_count > tx_concurrency_remaining {
1705                break;
1706            }
1707            tx_concurrency_remaining -= tx_count;
1708
1709            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1710            current_sequence += 1;
1711            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1712                network.clone(),
1713                &store,
1714                peer_heights.clone(),
1715                use_get_checkpoint_contents_v2,
1716                tx_count,
1717                next_checkpoint,
1718            ));
1719        }
1720
1721        if highest_synced
1722            .sequence_number()
1723            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1724            || checkpoint_contents_tasks.is_empty()
1725        {
1726            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1727            if let Some(sender) = sender.upgrade() {
1728                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1729                let _ = sender.send(message).await;
1730            }
1731        }
1732    }
1733}
1734
1735#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1736async fn sync_one_checkpoint_contents<S>(
1737    network: anemo::Network,
1738    store: S,
1739    peer_heights: Arc<RwLock<PeerHeights>>,
1740    use_get_checkpoint_contents_v2: bool,
1741    tx_count: u64,
1742    checkpoint: VerifiedCheckpoint,
1743) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1744where
1745    S: WriteStore + Clone,
1746{
1747    let (timeout_min, timeout_max) = {
1748        let ph = peer_heights.read().unwrap();
1749        (
1750            ph.checkpoint_content_timeout_min,
1751            ph.checkpoint_content_timeout_max,
1752        )
1753    };
1754    let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1755    debug!(
1756        "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1757        timeout, tx_count
1758    );
1759
1760    // Check if we already have produced this checkpoint locally. If so, we don't need
1761    // to get it from peers anymore.
1762    if store
1763        .get_highest_synced_checkpoint()
1764        .expect("store operation should not fail")
1765        .sequence_number()
1766        >= checkpoint.sequence_number()
1767    {
1768        debug!("checkpoint was already created via consensus output");
1769        return Ok(checkpoint);
1770    }
1771
1772    // Request checkpoint contents from peers.
1773    let peers = PeerBalancer::new(
1774        &network,
1775        peer_heights.clone(),
1776        PeerCheckpointRequestType::Content,
1777    )
1778    .with_checkpoint(*checkpoint.sequence_number());
1779    let now = tokio::time::Instant::now();
1780    let Some(_contents) = get_full_checkpoint_contents(
1781        peers,
1782        &store,
1783        peer_heights.clone(),
1784        &checkpoint,
1785        use_get_checkpoint_contents_v2,
1786        timeout,
1787    )
1788    .await
1789    else {
1790        // Delay completion in case of error so we don't hammer the network with retries.
1791        let duration = peer_heights
1792            .read()
1793            .unwrap()
1794            .wait_interval_when_no_peer_to_sync_content();
1795        if now.elapsed() < duration {
1796            let duration = duration - now.elapsed();
1797            info!("retrying checkpoint sync after {:?}", duration);
1798            tokio::time::sleep(duration).await;
1799        }
1800        return Err(checkpoint);
1801    };
1802    debug!("completed checkpoint contents sync");
1803    Ok(checkpoint)
1804}
1805
1806#[instrument(level = "debug", skip_all)]
1807async fn get_full_checkpoint_contents<S>(
1808    peers: PeerBalancer,
1809    store: S,
1810    peer_heights: Arc<RwLock<PeerHeights>>,
1811    checkpoint: &VerifiedCheckpoint,
1812    use_get_checkpoint_contents_v2: bool,
1813    timeout: Duration,
1814) -> Option<VersionedFullCheckpointContents>
1815where
1816    S: WriteStore,
1817{
1818    let sequence_number = checkpoint.sequence_number;
1819    let digest = checkpoint.content_digest;
1820    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1821        debug!("store already contains checkpoint contents");
1822        return Some(contents);
1823    }
1824
1825    // Iterate through our selected peers trying each one in turn until we're able to
1826    // successfully get the target checkpoint
1827    for mut peer in peers {
1828        let peer_id = peer.inner().peer_id();
1829        debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1830        let request = Request::new(digest).with_timeout(timeout);
1831        let start = Instant::now();
1832        let result = if use_get_checkpoint_contents_v2 {
1833            peer.get_checkpoint_contents_v2(request).await
1834        } else {
1835            peer.get_checkpoint_contents(request)
1836                .await
1837                .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1838        };
1839        let elapsed = start.elapsed();
1840
1841        let contents = match result {
1842            Ok(response) => match response.into_inner() {
1843                Some(c) => Some(c),
1844                None => {
1845                    trace!("peer unable to help sync");
1846                    peer_heights.write().unwrap().record_failure(peer_id);
1847                    None
1848                }
1849            },
1850            Err(e) => {
1851                trace!("{e:?}");
1852                peer_heights.write().unwrap().record_failure(peer_id);
1853                None
1854            }
1855        };
1856
1857        let Some(contents) = contents else {
1858            continue;
1859        };
1860
1861        if contents.verify_digests(digest).is_ok() {
1862            let size =
1863                bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1864            peer_heights
1865                .write()
1866                .unwrap()
1867                .record_success(peer_id, size, elapsed);
1868            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1869            store
1870                .insert_checkpoint_contents(checkpoint, verified_contents)
1871                .expect("store operation should not fail");
1872            return Some(contents);
1873        }
1874    }
1875    debug!("no peers had checkpoint contents");
1876    None
1877}
1878
1879async fn update_checkpoint_watermark_metrics<S>(
1880    mut recv: oneshot::Receiver<()>,
1881    store: S,
1882    metrics: Metrics,
1883) -> Result<()>
1884where
1885    S: WriteStore + Clone + Send + Sync,
1886{
1887    let mut interval = tokio::time::interval(Duration::from_secs(5));
1888    loop {
1889        tokio::select! {
1890             _now = interval.tick() => {
1891                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1892                    .expect("store operation should not fail");
1893                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1894                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1895                    .expect("store operation should not fail");
1896                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1897             },
1898            _ = &mut recv => break,
1899        }
1900    }
1901    Ok(())
1902}