sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99/// A handle to the StateSync subsystem.
100///
101/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
102/// dropped, the StateSync system will be gracefully shutdown.
103#[derive(Clone, Debug)]
104pub struct Handle {
105    sender: mpsc::Sender<StateSyncMessage>,
106    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
111    /// to other nodes on the network.
112    ///
113    /// # Invariant
114    ///
115    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
116    /// persistent storage. This includes CheckpointContents and all Transactions and
117    /// TransactionEffects included therein.
118    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119        self.sender
120            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121            .await
122            .unwrap()
123    }
124
125    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
126    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127        self.checkpoint_event_sender.subscribe()
128    }
129}
130
131pub(super) fn compute_adaptive_timeout(
132    tx_count: u64,
133    min_timeout: Duration,
134    max_timeout: Duration,
135) -> Duration {
136    const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
137    const JITTER_FRACTION: f64 = 0.1;
138
139    let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
140    let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
141    let base = min_timeout + extra;
142
143    let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
144    let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
145    Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
146}
147
148#[cfg_attr(test, derive(Debug))]
149pub(super) struct PeerScore {
150    successes: VecDeque<(Instant, u64 /* response_size_bytes */, Duration)>,
151    failures: VecDeque<Instant>,
152    window: Duration,
153    failure_rate: f64,
154}
155
156impl PeerScore {
157    const MAX_SAMPLES: usize = 20;
158    const MIN_SAMPLES_FOR_FAILURE: usize = 10;
159
160    pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
161        Self {
162            successes: VecDeque::new(),
163            failures: VecDeque::new(),
164            window,
165            failure_rate,
166        }
167    }
168
169    pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
170        let now = Instant::now();
171        self.successes.push_back((now, size, response_time));
172        while self.successes.len() > Self::MAX_SAMPLES {
173            self.successes.pop_front();
174        }
175    }
176
177    pub(super) fn record_failure(&mut self) {
178        let now = Instant::now();
179        self.failures.push_back(now);
180        while self.failures.len() > Self::MAX_SAMPLES {
181            self.failures.pop_front();
182        }
183    }
184
185    pub(super) fn is_failing(&self) -> bool {
186        let now = Instant::now();
187        let recent_failures = self
188            .failures
189            .iter()
190            .filter(|ts| now.duration_since(**ts) < self.window)
191            .count();
192        let recent_successes = self
193            .successes
194            .iter()
195            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
196            .count();
197
198        let total = recent_failures + recent_successes;
199        if total < Self::MIN_SAMPLES_FOR_FAILURE {
200            return false;
201        }
202
203        let rate = recent_failures as f64 / total as f64;
204        rate >= self.failure_rate
205    }
206
207    pub(super) fn effective_throughput(&self) -> Option<f64> {
208        let now = Instant::now();
209        let (total_size, total_time) = self
210            .successes
211            .iter()
212            .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
213            .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
214                (size + s, time + *d)
215            });
216
217        if total_size == 0 {
218            return None;
219        }
220
221        if total_time.is_zero() {
222            return None;
223        }
224
225        Some(total_size as f64 / total_time.as_secs_f64())
226    }
227}
228
229struct PeerHeights {
230    /// Table used to track the highest checkpoint for each of our peers.
231    peers: HashMap<PeerId, PeerStateSyncInfo>,
232    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
233    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
234    scores: HashMap<PeerId, PeerScore>,
235
236    wait_interval_when_no_peer_to_sync_content: Duration,
237    peer_scoring_window: Duration,
238    peer_failure_rate: f64,
239    checkpoint_content_timeout_min: Duration,
240    checkpoint_content_timeout_max: Duration,
241    exploration_probability: f64,
242}
243
244#[derive(Copy, Clone, Debug, PartialEq, Eq)]
245struct PeerStateSyncInfo {
246    /// The digest of the Peer's genesis checkpoint.
247    genesis_checkpoint_digest: CheckpointDigest,
248    /// Indicates if this Peer is on the same chain as us.
249    on_same_chain_as_us: bool,
250    /// Highest checkpoint sequence number we know of for this Peer.
251    height: CheckpointSequenceNumber,
252    /// lowest available checkpoint watermark for this Peer.
253    /// This defaults to 0 for now.
254    lowest: CheckpointSequenceNumber,
255}
256
257impl PeerHeights {
258    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
259        self.peers
260            .values()
261            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
262            .max()
263    }
264
265    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
266        self.peers
267            .iter()
268            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
269    }
270
271    // Returns a bool that indicates if the update was done successfully.
272    //
273    // This will return false if the given peer doesn't have an entry or is not on the same chain
274    // as us
275    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
276    pub fn update_peer_info(
277        &mut self,
278        peer_id: PeerId,
279        checkpoint: Checkpoint,
280        low_watermark: Option<CheckpointSequenceNumber>,
281    ) -> bool {
282        debug!("Update peer info");
283
284        let info = match self.peers.get_mut(&peer_id) {
285            Some(info) if info.on_same_chain_as_us => info,
286            _ => return false,
287        };
288
289        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
290        if let Some(low_watermark) = low_watermark {
291            info.lowest = low_watermark;
292        }
293        self.insert_checkpoint(checkpoint);
294
295        true
296    }
297
298    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
299    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
300        use std::collections::hash_map::Entry;
301        debug!("Insert peer info");
302
303        match self.peers.entry(peer_id) {
304            Entry::Occupied(mut entry) => {
305                // If there's already an entry and the genesis checkpoint digests match then update
306                // the maximum height. Otherwise we'll use the more recent one
307                let entry = entry.get_mut();
308                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
309                    entry.height = std::cmp::max(entry.height, info.height);
310                } else {
311                    *entry = info;
312                }
313            }
314            Entry::Vacant(entry) => {
315                entry.insert(info);
316            }
317        }
318    }
319
320    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
321        if let Some(info) = self.peers.get_mut(&peer_id) {
322            info.on_same_chain_as_us = false;
323        }
324    }
325
326    /// Updates the peer's height without storing any checkpoint data.
327    /// Returns false if the peer doesn't exist or is not on the same chain.
328    pub fn update_peer_height(
329        &mut self,
330        peer_id: PeerId,
331        height: CheckpointSequenceNumber,
332        low_watermark: Option<CheckpointSequenceNumber>,
333    ) -> bool {
334        let info = match self.peers.get_mut(&peer_id) {
335            Some(info) if info.on_same_chain_as_us => info,
336            _ => return false,
337        };
338
339        info.height = std::cmp::max(height, info.height);
340        if let Some(low_watermark) = low_watermark {
341            info.lowest = low_watermark;
342        }
343
344        true
345    }
346
347    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
348        self.unprocessed_checkpoints
349            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
350        self.sequence_number_to_digest
351            .retain(|&s, _digest| s > sequence_number);
352    }
353
354    /// Inserts a checkpoint into the unprocessed checkpoints store.
355    /// Only one checkpoint per sequence number is stored. If a checkpoint with the same
356    /// sequence number but different digest already exists, the new one is dropped.
357    // TODO: also record who gives this checkpoint info for peer quality measurement?
358    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
359        let digest = *checkpoint.digest();
360        let sequence_number = *checkpoint.sequence_number();
361
362        // Check if we already have a checkpoint for this sequence number.
363        if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
364            if *existing_digest == digest {
365                // Same checkpoint, nothing to do.
366                return;
367            }
368            tracing::info!(
369                ?sequence_number,
370                ?existing_digest,
371                ?digest,
372                "received checkpoint with same sequence number but different digest, dropping new checkpoint"
373            );
374            return;
375        }
376
377        self.unprocessed_checkpoints.insert(digest, checkpoint);
378        self.sequence_number_to_digest
379            .insert(sequence_number, digest);
380    }
381
382    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
383        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
384            self.sequence_number_to_digest
385                .remove(checkpoint.sequence_number());
386        }
387    }
388
389    pub fn get_checkpoint_by_sequence_number(
390        &self,
391        sequence_number: CheckpointSequenceNumber,
392    ) -> Option<&Checkpoint> {
393        self.sequence_number_to_digest
394            .get(&sequence_number)
395            .and_then(|digest| self.get_checkpoint_by_digest(digest))
396    }
397
398    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
399        self.unprocessed_checkpoints.get(digest)
400    }
401
402    #[cfg(test)]
403    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
404        self.wait_interval_when_no_peer_to_sync_content = duration;
405    }
406
407    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
408        self.wait_interval_when_no_peer_to_sync_content
409    }
410
411    pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
412        self.scores
413            .entry(peer_id)
414            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
415            .record_success(size, response_time);
416    }
417
418    pub fn record_failure(&mut self, peer_id: PeerId) {
419        self.scores
420            .entry(peer_id)
421            .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
422            .record_failure();
423    }
424
425    pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
426        self.scores
427            .get(peer_id)
428            .and_then(|s| s.effective_throughput())
429    }
430
431    pub fn is_failing(&self, peer_id: &PeerId) -> bool {
432        self.scores
433            .get(peer_id)
434            .map(|s| s.is_failing())
435            .unwrap_or(false)
436    }
437}
438
439// PeerBalancer selects peers using weighted random selection:
440// - Most of the time: select from known peers, weighted by throughput
441// - With configured probability: select from unknown peers (to explore)
442// - Failing peers: only as last resort
443#[derive(Clone)]
444struct PeerBalancer {
445    known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
446    unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
447    failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
448    requested_checkpoint: Option<CheckpointSequenceNumber>,
449    request_type: PeerCheckpointRequestType,
450    exploration_probability: f64,
451}
452
453#[derive(Clone)]
454enum PeerCheckpointRequestType {
455    Summary,
456    Content,
457}
458
459impl PeerBalancer {
460    pub fn new(
461        network: &anemo::Network,
462        peer_heights: Arc<RwLock<PeerHeights>>,
463        request_type: PeerCheckpointRequestType,
464    ) -> Self {
465        let peer_heights_guard = peer_heights.read().unwrap();
466
467        let mut known_peers = Vec::new();
468        let mut unknown_peers = Vec::new();
469        let mut failing_peers = Vec::new();
470        let exploration_probability = peer_heights_guard.exploration_probability;
471
472        for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
473            let Some(peer) = network.peer(*peer_id) else {
474                continue;
475            };
476            let rtt_secs = peer.connection_rtt().as_secs_f64();
477
478            if peer_heights_guard.is_failing(peer_id) {
479                failing_peers.push((peer, *info));
480            } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
481                known_peers.push((peer, *info, throughput));
482            } else {
483                unknown_peers.push((peer, *info, rtt_secs));
484            }
485        }
486        drop(peer_heights_guard);
487
488        unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
489            rtt_a
490                .partial_cmp(rtt_b)
491                .unwrap_or(std::cmp::Ordering::Equal)
492        });
493
494        Self {
495            known_peers,
496            unknown_peers,
497            failing_peers,
498            requested_checkpoint: None,
499            request_type,
500            exploration_probability,
501        }
502    }
503
504    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
505        self.requested_checkpoint = Some(checkpoint);
506        self
507    }
508
509    fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
510        let requested = self.requested_checkpoint.unwrap_or(0);
511        match &self.request_type {
512            PeerCheckpointRequestType::Summary => info.height >= requested,
513            PeerCheckpointRequestType::Content => {
514                info.height >= requested && info.lowest <= requested
515            }
516        }
517    }
518
519    fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
520        let eligible: Vec<_> = self
521            .known_peers
522            .iter()
523            .enumerate()
524            .filter(|(_, (_, info, _))| self.is_eligible(info))
525            .map(|(i, (_, _, t))| (i, *t))
526            .collect();
527
528        if eligible.is_empty() {
529            return None;
530        }
531
532        let total: f64 = eligible.iter().map(|(_, t)| t).sum();
533        let mut pick = rand::thread_rng().gen_range(0.0..total);
534
535        for (idx, throughput) in &eligible {
536            pick -= throughput;
537            if pick <= 0.0 {
538                let (peer, info, _) = self.known_peers.remove(*idx);
539                return Some((peer, info));
540            }
541        }
542
543        let (idx, _) = eligible.last().unwrap();
544        let (peer, info, _) = self.known_peers.remove(*idx);
545        Some((peer, info))
546    }
547
548    fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
549        let pos = self
550            .unknown_peers
551            .iter()
552            .position(|(_, info, _)| self.is_eligible(info))?;
553        let (peer, info, _) = self.unknown_peers.remove(pos);
554        Some((peer, info))
555    }
556
557    fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
558        let pos = self
559            .failing_peers
560            .iter()
561            .position(|(_, info)| self.is_eligible(info))?;
562        Some(self.failing_peers.remove(pos))
563    }
564}
565
566impl Iterator for PeerBalancer {
567    type Item = StateSyncClient<anemo::Peer>;
568
569    fn next(&mut self) -> Option<Self::Item> {
570        let has_eligible_known = self
571            .known_peers
572            .iter()
573            .any(|(_, info, _)| self.is_eligible(info));
574        let has_eligible_unknown = self
575            .unknown_peers
576            .iter()
577            .any(|(_, info, _)| self.is_eligible(info));
578
579        let explore = has_eligible_unknown
580            && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
581
582        if explore && let Some((peer, _)) = self.select_by_rtt() {
583            return Some(StateSyncClient::new(peer));
584        }
585
586        if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
587            return Some(StateSyncClient::new(peer));
588        }
589
590        if let Some((peer, _)) = self.select_failing() {
591            return Some(StateSyncClient::new(peer));
592        }
593
594        None
595    }
596}
597
598#[derive(Clone, Debug)]
599enum StateSyncMessage {
600    StartSyncJob,
601    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
602    // of the new checkpoint.
603    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
604    // Notification that the checkpoint content sync task will send to the event loop in the event
605    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
606    // synced at the same time, only the highest checkpoint is sent.
607    SyncedCheckpoint(Box<VerifiedCheckpoint>),
608}
609
610struct StateSyncEventLoop<S> {
611    config: StateSyncConfig,
612
613    mailbox: mpsc::Receiver<StateSyncMessage>,
614    /// Weak reference to our own mailbox
615    weak_sender: mpsc::WeakSender<StateSyncMessage>,
616
617    tasks: JoinSet<()>,
618    sync_checkpoint_summaries_task: Option<AbortHandle>,
619    sync_checkpoint_contents_task: Option<AbortHandle>,
620    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
621
622    store: S,
623    peer_heights: Arc<RwLock<PeerHeights>>,
624    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
625    network: anemo::Network,
626    metrics: Metrics,
627
628    sync_checkpoint_from_archive_task: Option<AbortHandle>,
629    archive_config: Option<ArchiveReaderConfig>,
630}
631
632impl<S> StateSyncEventLoop<S>
633where
634    S: WriteStore + Clone + Send + Sync + 'static,
635{
636    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
637    // and that the only "await" points are from the select macro picking which event to handle.
638    // This ensures that the event loop is able to process events at a high speed and reduce the
639    // chance for building up a backlog of events to process.
640    pub async fn start(mut self) {
641        info!("State-Synchronizer started");
642
643        self.config.pinned_checkpoints.sort();
644
645        let mut interval = tokio::time::interval(self.config.interval_period());
646        let mut peer_events = {
647            let (subscriber, peers) = self.network.subscribe().unwrap();
648            for peer_id in peers {
649                self.spawn_get_latest_from_peer(peer_id);
650            }
651            subscriber
652        };
653        let (
654            target_checkpoint_contents_sequence_sender,
655            target_checkpoint_contents_sequence_receiver,
656        ) = watch::channel(0);
657
658        // Spawn tokio task to update metrics periodically in the background
659        let (_sender, receiver) = oneshot::channel();
660        tokio::spawn(update_checkpoint_watermark_metrics(
661            receiver,
662            self.store.clone(),
663            self.metrics.clone(),
664        ));
665
666        // Start checkpoint contents sync loop.
667        let task = sync_checkpoint_contents(
668            self.network.clone(),
669            self.store.clone(),
670            self.peer_heights.clone(),
671            self.weak_sender.clone(),
672            self.checkpoint_event_sender.clone(),
673            self.config.checkpoint_content_download_concurrency(),
674            self.config.checkpoint_content_download_tx_concurrency(),
675            self.config.use_get_checkpoint_contents_v2(),
676            target_checkpoint_contents_sequence_receiver,
677        );
678        let task_handle = self.tasks.spawn(task);
679        self.sync_checkpoint_contents_task = Some(task_handle);
680
681        // Start archive based checkpoint content sync loop.
682        // TODO: Consider switching to sync from archive only on startup.
683        // Right now because the peer set is fixed at startup, a node may eventually
684        // end up with peers who have all purged their local state. In such a scenario it will be
685        // stuck until restart when it ends up with a different set of peers. Once the discovery
686        // mechanism can dynamically identify and connect to other peers on the network, we will rely
687        // on sync from archive as a fall back.
688        let task = sync_checkpoint_contents_from_archive(
689            self.network.clone(),
690            self.archive_config.clone(),
691            self.store.clone(),
692            self.peer_heights.clone(),
693            self.metrics.clone(),
694        );
695        let task_handle = self.tasks.spawn(task);
696        self.sync_checkpoint_from_archive_task = Some(task_handle);
697
698        // Start main loop.
699        loop {
700            tokio::select! {
701                now = interval.tick() => {
702                    self.handle_tick(now.into_std());
703                },
704                maybe_message = self.mailbox.recv() => {
705                    // Once all handles to our mailbox have been dropped this
706                    // will yield `None` and we can terminate the event loop
707                    if let Some(message) = maybe_message {
708                        self.handle_message(message);
709                    } else {
710                        break;
711                    }
712                },
713                peer_event = peer_events.recv() => {
714                    self.handle_peer_event(peer_event);
715                },
716                Some(task_result) = self.tasks.join_next() => {
717                    match task_result {
718                        Ok(()) => {},
719                        Err(e) => {
720                            if e.is_cancelled() {
721                                // avoid crashing on ungraceful shutdown
722                            } else if e.is_panic() {
723                                // propagate panics.
724                                std::panic::resume_unwind(e.into_panic());
725                            } else {
726                                panic!("task failed: {e}");
727                            }
728                        },
729                    };
730
731                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
732                        panic!("sync_checkpoint_contents task unexpectedly terminated")
733                    }
734
735                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
736                        self.sync_checkpoint_summaries_task = None;
737                    }
738
739                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
740                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
741                    }
742                },
743            }
744
745            self.maybe_start_checkpoint_summary_sync_task();
746            self.maybe_trigger_checkpoint_contents_sync_task(
747                &target_checkpoint_contents_sequence_sender,
748            );
749        }
750
751        info!("State-Synchronizer ended");
752    }
753
754    fn handle_message(&mut self, message: StateSyncMessage) {
755        debug!("Received message: {:?}", message);
756        match message {
757            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
758            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
759                self.handle_checkpoint_from_consensus(checkpoint)
760            }
761            // After we've successfully synced a checkpoint we can notify our peers
762            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
763                self.spawn_notify_peers_of_checkpoint(*checkpoint)
764            }
765        }
766    }
767
768    // Handle a checkpoint that we received from consensus
769    #[instrument(level = "debug", skip_all)]
770    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
771        // Always check previous_digest matches in case there is a gap between
772        // state sync and consensus.
773        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
774            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
775            .digest();
776        if checkpoint.previous_digest != Some(prev_digest) {
777            panic!(
778                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
779                checkpoint.sequence_number(),
780                Some(prev_digest),
781                checkpoint.previous_digest
782            );
783        }
784
785        let latest_checkpoint = self
786            .store
787            .get_highest_verified_checkpoint()
788            .expect("store operation should not fail");
789
790        // If this is an older checkpoint, just ignore it
791        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
792            return;
793        }
794
795        let checkpoint = *checkpoint;
796        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
797        if *checkpoint.sequence_number() > next_sequence_number {
798            debug!(
799                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
800                next_sequence_number,
801                checkpoint.sequence_number()
802            );
803        }
804
805        // Because checkpoint from consensus sends in order, when we have checkpoint n,
806        // we must have all of the checkpoints before n from either state sync or consensus.
807        #[cfg(debug_assertions)]
808        {
809            let _ = (next_sequence_number..=*checkpoint.sequence_number())
810                .map(|n| {
811                    let checkpoint = self
812                        .store
813                        .get_checkpoint_by_sequence_number(n)
814                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
815                    self.store
816                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
817                        .unwrap_or_else(|| {
818                            panic!(
819                                "store should contain checkpoint contents for {:?}",
820                                checkpoint.content_digest
821                            )
822                        });
823                })
824                .collect::<Vec<_>>();
825        }
826
827        // If this is the last checkpoint of a epoch, we need to make sure
828        // new committee is in store before we verify newer checkpoints in next epoch.
829        // This could happen before this validator's reconfiguration finishes, because
830        // state sync does not reconfig.
831        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
832        // committee insertion in two places (only in `insert_checkpoint`).
833        if let Some(EndOfEpochData {
834            next_epoch_committee,
835            ..
836        }) = checkpoint.end_of_epoch_data.as_ref()
837        {
838            let next_committee = next_epoch_committee.iter().cloned().collect();
839            let committee =
840                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
841            self.store
842                .insert_committee(committee)
843                .expect("store operation should not fail");
844        }
845
846        self.store
847            .update_highest_verified_checkpoint(&checkpoint)
848            .expect("store operation should not fail");
849        self.store
850            .update_highest_synced_checkpoint(&checkpoint)
851            .expect("store operation should not fail");
852
853        // We don't care if no one is listening as this is a broadcast channel
854        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
855
856        self.spawn_notify_peers_of_checkpoint(checkpoint);
857    }
858
859    fn handle_peer_event(
860        &mut self,
861        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
862    ) {
863        use tokio::sync::broadcast::error::RecvError;
864
865        match peer_event {
866            Ok(PeerEvent::NewPeer(peer_id)) => {
867                self.spawn_get_latest_from_peer(peer_id);
868            }
869            Ok(PeerEvent::LostPeer(peer_id, _)) => {
870                self.peer_heights.write().unwrap().peers.remove(&peer_id);
871            }
872
873            Err(RecvError::Closed) => {
874                panic!("PeerEvent channel shouldn't be able to be closed");
875            }
876
877            Err(RecvError::Lagged(_)) => {
878                trace!("State-Sync fell behind processing PeerEvents");
879            }
880        }
881    }
882
883    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
884        if let Some(peer) = self.network.peer(peer_id) {
885            let genesis_checkpoint_digest = *self
886                .store
887                .get_checkpoint_by_sequence_number(0)
888                .expect("store should contain genesis checkpoint")
889                .digest();
890            let task = get_latest_from_peer(
891                genesis_checkpoint_digest,
892                peer,
893                self.peer_heights.clone(),
894                self.config.timeout(),
895            );
896            self.tasks.spawn(task);
897        }
898    }
899
900    fn handle_tick(&mut self, _now: std::time::Instant) {
901        let task = query_peers_for_their_latest_checkpoint(
902            self.network.clone(),
903            self.peer_heights.clone(),
904            self.weak_sender.clone(),
905            self.config.timeout(),
906        );
907        self.tasks.spawn(task);
908
909        if let Some(layer) = self.download_limit_layer.as_ref() {
910            layer.maybe_prune_map();
911        }
912    }
913
914    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
915        // Only run one sync task at a time
916        if self.sync_checkpoint_summaries_task.is_some() {
917            return;
918        }
919
920        let highest_processed_checkpoint = self
921            .store
922            .get_highest_verified_checkpoint()
923            .expect("store operation should not fail");
924
925        let highest_known_sequence_number = self
926            .peer_heights
927            .read()
928            .unwrap()
929            .highest_known_checkpoint_sequence_number();
930
931        if let Some(target_seq) = highest_known_sequence_number
932            && *highest_processed_checkpoint.sequence_number() < target_seq
933        {
934            // Limit the per-sync batch size according to config.
935            let max_batch_size = self.config.max_checkpoint_sync_batch_size();
936            let limited_target = std::cmp::min(
937                target_seq,
938                highest_processed_checkpoint
939                    .sequence_number()
940                    .saturating_add(max_batch_size),
941            );
942            let was_limited = limited_target < target_seq;
943
944            // Start sync job.
945            let weak_sender = self.weak_sender.clone();
946            let task = sync_to_checkpoint(
947                self.network.clone(),
948                self.store.clone(),
949                self.peer_heights.clone(),
950                self.metrics.clone(),
951                self.config.pinned_checkpoints.clone(),
952                self.config.checkpoint_header_download_concurrency(),
953                self.config.timeout(),
954                limited_target,
955            )
956            .map(move |result| match result {
957                Ok(()) => {
958                    // If we limited the sync range, immediately trigger
959                    // another sync to continue catching up.
960                    if was_limited && let Some(sender) = weak_sender.upgrade() {
961                        let _ = sender.try_send(StateSyncMessage::StartSyncJob);
962                    }
963                }
964                Err(e) => {
965                    debug!("error syncing checkpoint {e}");
966                }
967            });
968            let task_handle = self.tasks.spawn(task);
969            self.sync_checkpoint_summaries_task = Some(task_handle);
970        }
971    }
972
973    fn maybe_trigger_checkpoint_contents_sync_task(
974        &mut self,
975        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
976    ) {
977        let highest_verified_checkpoint = self
978            .store
979            .get_highest_verified_checkpoint()
980            .expect("store operation should not fail");
981        let highest_synced_checkpoint = self
982            .store
983            .get_highest_synced_checkpoint()
984            .expect("store operation should not fail");
985
986        if highest_verified_checkpoint.sequence_number()
987            > highest_synced_checkpoint.sequence_number()
988            // skip if we aren't connected to any peers that can help
989            && self
990                .peer_heights
991                .read()
992                .unwrap()
993                .highest_known_checkpoint_sequence_number()
994                > Some(*highest_synced_checkpoint.sequence_number())
995        {
996            let _ = target_sequence_channel.send_if_modified(|num| {
997                let new_num = *highest_verified_checkpoint.sequence_number();
998                if *num == new_num {
999                    return false;
1000                }
1001                *num = new_num;
1002                true
1003            });
1004        }
1005    }
1006
1007    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1008        let task = notify_peers_of_checkpoint(
1009            self.network.clone(),
1010            self.peer_heights.clone(),
1011            checkpoint,
1012            self.config.timeout(),
1013        );
1014        self.tasks.spawn(task);
1015    }
1016}
1017
1018async fn notify_peers_of_checkpoint(
1019    network: anemo::Network,
1020    peer_heights: Arc<RwLock<PeerHeights>>,
1021    checkpoint: VerifiedCheckpoint,
1022    timeout: Duration,
1023) {
1024    let futs = peer_heights
1025        .read()
1026        .unwrap()
1027        .peers_on_same_chain()
1028        // Filter out any peers who we know already have a checkpoint higher than this one
1029        .filter_map(|(peer_id, info)| {
1030            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1031        })
1032        // Filter out any peers who we aren't connected with
1033        .flat_map(|peer_id| network.peer(*peer_id))
1034        .map(StateSyncClient::new)
1035        .map(|mut client| {
1036            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1037            async move { client.push_checkpoint_summary(request).await }
1038        })
1039        .collect::<Vec<_>>();
1040    futures::future::join_all(futs).await;
1041}
1042
1043async fn get_latest_from_peer(
1044    our_genesis_checkpoint_digest: CheckpointDigest,
1045    peer: anemo::Peer,
1046    peer_heights: Arc<RwLock<PeerHeights>>,
1047    timeout: Duration,
1048) {
1049    let peer_id = peer.peer_id();
1050    let mut client = StateSyncClient::new(peer);
1051
1052    let info = {
1053        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1054
1055        if let Some(info) = maybe_info {
1056            info
1057        } else {
1058            // TODO do we want to create a new API just for querying a node's chainid?
1059            //
1060            // We need to query this node's genesis checkpoint to see if they're on the same chain
1061            // as us
1062            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1063                .with_timeout(timeout);
1064            let response = client
1065                .get_checkpoint_summary(request)
1066                .await
1067                .map(Response::into_inner);
1068
1069            let info = match response {
1070                Ok(Some(checkpoint)) => {
1071                    let digest = *checkpoint.digest();
1072                    PeerStateSyncInfo {
1073                        genesis_checkpoint_digest: digest,
1074                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1075                        height: *checkpoint.sequence_number(),
1076                        lowest: CheckpointSequenceNumber::default(),
1077                    }
1078                }
1079                Ok(None) => PeerStateSyncInfo {
1080                    genesis_checkpoint_digest: CheckpointDigest::default(),
1081                    on_same_chain_as_us: false,
1082                    height: CheckpointSequenceNumber::default(),
1083                    lowest: CheckpointSequenceNumber::default(),
1084                },
1085                Err(status) => {
1086                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
1087                    return;
1088                }
1089            };
1090            peer_heights
1091                .write()
1092                .unwrap()
1093                .insert_peer_info(peer_id, info);
1094            info
1095        }
1096    };
1097
1098    // Bail early if this node isn't on the same chain as us
1099    if !info.on_same_chain_as_us {
1100        trace!(?info, "Peer {peer_id} not on same chain as us");
1101        return;
1102    }
1103    let Some((highest_checkpoint, low_watermark)) =
1104        query_peer_for_latest_info(&mut client, timeout).await
1105    else {
1106        return;
1107    };
1108    peer_heights
1109        .write()
1110        .unwrap()
1111        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1112}
1113
1114/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark
1115async fn query_peer_for_latest_info(
1116    client: &mut StateSyncClient<anemo::Peer>,
1117    timeout: Duration,
1118) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1119    let request = Request::new(()).with_timeout(timeout);
1120    let response = client
1121        .get_checkpoint_availability(request)
1122        .await
1123        .map(Response::into_inner);
1124    match response {
1125        Ok(GetCheckpointAvailabilityResponse {
1126            highest_synced_checkpoint,
1127            lowest_available_checkpoint,
1128        }) => {
1129            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1130        }
1131        Err(status) => {
1132            // If peer hasn't upgraded they would return 404 NotFound error
1133            if status.status() != anemo::types::response::StatusCode::NotFound {
1134                trace!("get_checkpoint_availability request failed: {status:?}");
1135                return None;
1136            }
1137        }
1138    };
1139
1140    // Then we try the old query
1141    // TODO: remove this once the new feature stabilizes
1142    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1143    let response = client
1144        .get_checkpoint_summary(request)
1145        .await
1146        .map(Response::into_inner);
1147    match response {
1148        Ok(Some(checkpoint)) => Some((checkpoint, None)),
1149        Ok(None) => None,
1150        Err(status) => {
1151            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1152            None
1153        }
1154    }
1155}
1156
1157#[instrument(level = "debug", skip_all)]
1158async fn query_peers_for_their_latest_checkpoint(
1159    network: anemo::Network,
1160    peer_heights: Arc<RwLock<PeerHeights>>,
1161    sender: mpsc::WeakSender<StateSyncMessage>,
1162    timeout: Duration,
1163) {
1164    let peer_heights = &peer_heights;
1165    let futs = peer_heights
1166        .read()
1167        .unwrap()
1168        .peers_on_same_chain()
1169        // Filter out any peers who we aren't connected with
1170        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1171        .map(|peer| {
1172            let peer_id = peer.peer_id();
1173            let mut client = StateSyncClient::new(peer);
1174
1175            async move {
1176                let response = query_peer_for_latest_info(&mut client, timeout).await;
1177                match response {
1178                    Some((highest_checkpoint, low_watermark)) => peer_heights
1179                        .write()
1180                        .unwrap()
1181                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1182                        .then_some(highest_checkpoint),
1183                    None => None,
1184                }
1185            }
1186        })
1187        .collect::<Vec<_>>();
1188
1189    debug!("Query {} peers for latest checkpoint", futs.len());
1190
1191    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1192
1193    let highest_checkpoint_seq = checkpoints
1194        .map(|checkpoint| *checkpoint.sequence_number())
1195        .max();
1196
1197    let our_highest_seq = peer_heights
1198        .read()
1199        .unwrap()
1200        .highest_known_checkpoint_sequence_number();
1201
1202    debug!(
1203        "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1204    );
1205
1206    let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1207        (Some(theirs), None) => theirs,
1208        (Some(theirs), Some(ours)) if theirs > ours => theirs,
1209        _ => return,
1210    };
1211
1212    if let Some(sender) = sender.upgrade() {
1213        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1214    }
1215}
1216
1217async fn sync_to_checkpoint<S>(
1218    network: anemo::Network,
1219    store: S,
1220    peer_heights: Arc<RwLock<PeerHeights>>,
1221    metrics: Metrics,
1222    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1223    checkpoint_header_download_concurrency: usize,
1224    timeout: Duration,
1225    target_sequence_number: CheckpointSequenceNumber,
1226) -> Result<()>
1227where
1228    S: WriteStore,
1229{
1230    metrics.set_highest_known_checkpoint(target_sequence_number);
1231
1232    let mut current = store
1233        .get_highest_verified_checkpoint()
1234        .expect("store operation should not fail");
1235    if *current.sequence_number() >= target_sequence_number {
1236        return Err(anyhow::anyhow!(
1237            "target checkpoint {} is older than highest verified checkpoint {}",
1238            target_sequence_number,
1239            current.sequence_number(),
1240        ));
1241    }
1242
1243    let peer_balancer = PeerBalancer::new(
1244        &network,
1245        peer_heights.clone(),
1246        PeerCheckpointRequestType::Summary,
1247    );
1248    // range of the next sequence_numbers to fetch
1249    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1250        ..=target_sequence_number)
1251        .map(|next| {
1252            let peers = peer_balancer.clone().with_checkpoint(next);
1253            let peer_heights = peer_heights.clone();
1254            let pinned_checkpoints = &pinned_checkpoints;
1255            async move {
1256                if let Some(checkpoint) = peer_heights
1257                    .read()
1258                    .unwrap()
1259                    .get_checkpoint_by_sequence_number(next)
1260                {
1261                    return (Some(checkpoint.to_owned()), next, None);
1262                }
1263
1264                // Iterate through peers trying each one in turn until we're able to
1265                // successfully get the target checkpoint.
1266                for mut peer in peers {
1267                    let peer_id = peer.inner().peer_id();
1268                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1269                        .with_timeout(timeout);
1270                    let start = Instant::now();
1271                    let result = peer.get_checkpoint_summary(request).await;
1272                    let elapsed = start.elapsed();
1273
1274                    let checkpoint = match result {
1275                        Ok(response) => match response.into_inner() {
1276                            Some(cp) => Some(cp),
1277                            None => {
1278                                trace!("peer unable to help sync");
1279                                peer_heights.write().unwrap().record_failure(peer_id);
1280                                None
1281                            }
1282                        },
1283                        Err(e) => {
1284                            trace!("{e:?}");
1285                            peer_heights.write().unwrap().record_failure(peer_id);
1286                            None
1287                        }
1288                    };
1289
1290                    let Some(checkpoint) = checkpoint else {
1291                        continue;
1292                    };
1293
1294                    let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1295                    peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1296
1297                    // peer didn't give us a checkpoint with the height that we requested
1298                    if *checkpoint.sequence_number() != next {
1299                        tracing::debug!(
1300                            "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1301                            checkpoint.sequence_number()
1302                        );
1303                        peer_heights
1304                            .write()
1305                            .unwrap()
1306                            .mark_peer_as_not_on_same_chain(peer_id);
1307                        continue;
1308                    }
1309
1310                    // peer gave us a checkpoint whose digest does not match pinned digest
1311                    let checkpoint_digest = checkpoint.digest();
1312                    if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1313                        checkpoint.sequence_number(),
1314                        |(seq_num, _digest)| *seq_num
1315                    )
1316                        && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1317                    {
1318                        tracing::debug!(
1319                            "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1320                            pinned_checkpoints[pinned_digest_index].1,
1321                            checkpoint_digest
1322                        );
1323                        peer_heights
1324                            .write()
1325                            .unwrap()
1326                            .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1327                        continue;
1328                    }
1329
1330                    // Insert in our store in the event that things fail and we need to retry
1331                    peer_heights
1332                        .write()
1333                        .unwrap()
1334                        .insert_checkpoint(checkpoint.clone());
1335                    return (Some(checkpoint), next, Some(peer_id));
1336                }
1337                (None, next, None)
1338            }
1339        })
1340        .pipe(futures::stream::iter)
1341        .buffered(checkpoint_header_download_concurrency);
1342
1343    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1344        assert_eq!(
1345            current
1346                .sequence_number()
1347                .checked_add(1)
1348                .expect("exhausted u64"),
1349            next
1350        );
1351
1352        // Verify the checkpoint
1353        let checkpoint = 'cp: {
1354            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1355                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1356            })?;
1357            // Skip verification for manually pinned checkpoints.
1358            if pinned_checkpoints
1359                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1360                .is_ok()
1361            {
1362                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1363            }
1364            match verify_checkpoint(&current, &store, checkpoint) {
1365                Ok(verified_checkpoint) => verified_checkpoint,
1366                Err(checkpoint) => {
1367                    let mut peer_heights = peer_heights.write().unwrap();
1368                    // Remove the checkpoint from our temporary store so that we can try querying
1369                    // another peer for a different one
1370                    peer_heights.remove_checkpoint(checkpoint.digest());
1371
1372                    // Mark peer as not on the same chain as us
1373                    if let Some(peer_id) = maybe_peer_id {
1374                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1375                    }
1376
1377                    return Err(anyhow::anyhow!(
1378                        "unable to verify checkpoint {checkpoint:?}"
1379                    ));
1380                }
1381            }
1382        };
1383
1384        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1385        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1386            metrics.checkpoint_summary_age_metrics()
1387        {
1388            checkpoint.report_checkpoint_age(
1389                checkpoint_summary_age_metric,
1390                checkpoint_summary_age_metric_deprecated,
1391            );
1392        }
1393
1394        // Insert the newly verified checkpoint into our store, which will bump our highest
1395        // verified checkpoint watermark as well.
1396        store
1397            .insert_checkpoint(&checkpoint)
1398            .expect("store operation should not fail");
1399
1400        current = checkpoint;
1401    }
1402
1403    peer_heights
1404        .write()
1405        .unwrap()
1406        .cleanup_old_checkpoints(*current.sequence_number());
1407
1408    Ok(())
1409}
1410
1411async fn sync_checkpoint_contents_from_archive<S>(
1412    network: anemo::Network,
1413    archive_config: Option<ArchiveReaderConfig>,
1414    store: S,
1415    peer_heights: Arc<RwLock<PeerHeights>>,
1416    metrics: Metrics,
1417) where
1418    S: WriteStore + Clone + Send + Sync + 'static,
1419{
1420    loop {
1421        sync_checkpoint_contents_from_archive_iteration(
1422            &network,
1423            &archive_config,
1424            store.clone(),
1425            peer_heights.clone(),
1426            metrics.clone(),
1427        )
1428        .await;
1429        tokio::time::sleep(Duration::from_secs(5)).await;
1430    }
1431}
1432
1433async fn sync_checkpoint_contents_from_archive_iteration<S>(
1434    network: &anemo::Network,
1435    archive_config: &Option<ArchiveReaderConfig>,
1436    store: S,
1437    peer_heights: Arc<RwLock<PeerHeights>>,
1438    metrics: Metrics,
1439) where
1440    S: WriteStore + Clone + Send + Sync + 'static,
1441{
1442    let peers: Vec<_> = peer_heights
1443        .read()
1444        .unwrap()
1445        .peers_on_same_chain()
1446        // Filter out any peers who we aren't connected with.
1447        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1448        .collect();
1449    let lowest_checkpoint_on_peers = peers
1450        .iter()
1451        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1452        .min();
1453    let highest_synced = store
1454        .get_highest_synced_checkpoint()
1455        .expect("store operation should not fail")
1456        .sequence_number;
1457    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1458        highest_synced < lowest_checkpoint_on_peers
1459    } else {
1460        false
1461    };
1462    debug!(
1463        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1464        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1465    );
1466    if sync_from_archive {
1467        let start = highest_synced
1468            .checked_add(1)
1469            .expect("Checkpoint seq num overflow");
1470        let end = lowest_checkpoint_on_peers.unwrap();
1471
1472        let Some(archive_config) = archive_config else {
1473            warn!("Failed to find an archive reader to complete the state sync request");
1474            return;
1475        };
1476        let Some(ingestion_url) = &archive_config.ingestion_url else {
1477            warn!("Archival ingestion url for state sync is not configured");
1478            return;
1479        };
1480        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1481            warn!("{} can't be used as an archival fallback", ingestion_url);
1482            return;
1483        }
1484        let reader_options = ReaderOptions {
1485            batch_size: archive_config.download_concurrency.into(),
1486            upper_limit: Some(end),
1487            ..Default::default()
1488        };
1489        let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1490            StateSyncWorker(store, metrics),
1491            ingestion_url.clone(),
1492            archive_config.remote_store_options.clone(),
1493            start,
1494            1,
1495            Some(reader_options),
1496        )
1497        .await
1498        else {
1499            return;
1500        };
1501        match executor.await {
1502            Ok(_) => info!(
1503                "State sync from archive is complete. Checkpoints downloaded = {:?}",
1504                end - start
1505            ),
1506            Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1507        }
1508    }
1509}
1510
1511async fn sync_checkpoint_contents<S>(
1512    network: anemo::Network,
1513    store: S,
1514    peer_heights: Arc<RwLock<PeerHeights>>,
1515    sender: mpsc::WeakSender<StateSyncMessage>,
1516    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1517    checkpoint_content_download_concurrency: usize,
1518    checkpoint_content_download_tx_concurrency: u64,
1519    use_get_checkpoint_contents_v2: bool,
1520    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1521) where
1522    S: WriteStore + Clone,
1523{
1524    let mut highest_synced = store
1525        .get_highest_synced_checkpoint()
1526        .expect("store operation should not fail");
1527
1528    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1529    let mut target_sequence_cursor = 0;
1530    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1531    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1532
1533    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1534
1535    loop {
1536        tokio::select! {
1537            result = target_sequence_channel.changed() => {
1538                match result {
1539                    Ok(()) => {
1540                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1541                    }
1542                    Err(_) => {
1543                        // Watch channel is closed, exit loop.
1544                        return
1545                    }
1546                }
1547            },
1548            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1549                match maybe_checkpoint {
1550                    Ok(checkpoint) => {
1551                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1552
1553                        store
1554                            .update_highest_synced_checkpoint(&checkpoint)
1555                            .expect("store operation should not fail");
1556                        // We don't care if no one is listening as this is a broadcast channel
1557                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1558                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1559                        highest_synced = checkpoint;
1560
1561                    }
1562                    Err(checkpoint) => {
1563                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1564                        if let Some(lowest_peer_checkpoint) =
1565                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1566                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1567                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1568                            }
1569                        } else {
1570                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1571
1572                        }
1573                        // Calculate tx_count for retry by getting previous checkpoint
1574                        let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1575                            checkpoint.network_total_transactions
1576                        } else {
1577                            let prev = store
1578                                .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1579                                .expect("previous checkpoint must exist")
1580                                .network_total_transactions;
1581                            checkpoint.network_total_transactions - prev
1582                        };
1583                        // Retry contents sync on failure.
1584                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1585                            network.clone(),
1586                            &store,
1587                            peer_heights.clone(),
1588                            use_get_checkpoint_contents_v2,
1589                            retry_tx_count,
1590                            checkpoint,
1591                        ));
1592                    }
1593                }
1594            },
1595        }
1596
1597        // Start new tasks up to configured concurrency limits.
1598        while current_sequence < target_sequence_cursor
1599            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1600        {
1601            let next_checkpoint = store
1602                .get_checkpoint_by_sequence_number(current_sequence)
1603                .unwrap_or_else(|| panic!(
1604                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1605                    current_sequence
1606                ));
1607
1608            // Enforce transaction count concurrency limit.
1609            let tx_count = next_checkpoint.network_total_transactions
1610                - highest_started_network_total_transactions;
1611            if tx_count > tx_concurrency_remaining {
1612                break;
1613            }
1614            tx_concurrency_remaining -= tx_count;
1615
1616            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1617            current_sequence += 1;
1618            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1619                network.clone(),
1620                &store,
1621                peer_heights.clone(),
1622                use_get_checkpoint_contents_v2,
1623                tx_count,
1624                next_checkpoint,
1625            ));
1626        }
1627
1628        if highest_synced
1629            .sequence_number()
1630            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1631            || checkpoint_contents_tasks.is_empty()
1632        {
1633            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1634            if let Some(sender) = sender.upgrade() {
1635                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1636                let _ = sender.send(message).await;
1637            }
1638        }
1639    }
1640}
1641
1642#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1643async fn sync_one_checkpoint_contents<S>(
1644    network: anemo::Network,
1645    store: S,
1646    peer_heights: Arc<RwLock<PeerHeights>>,
1647    use_get_checkpoint_contents_v2: bool,
1648    tx_count: u64,
1649    checkpoint: VerifiedCheckpoint,
1650) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1651where
1652    S: WriteStore + Clone,
1653{
1654    let (timeout_min, timeout_max) = {
1655        let ph = peer_heights.read().unwrap();
1656        (
1657            ph.checkpoint_content_timeout_min,
1658            ph.checkpoint_content_timeout_max,
1659        )
1660    };
1661    let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1662    debug!(
1663        "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1664        timeout, tx_count
1665    );
1666
1667    // Check if we already have produced this checkpoint locally. If so, we don't need
1668    // to get it from peers anymore.
1669    if store
1670        .get_highest_synced_checkpoint()
1671        .expect("store operation should not fail")
1672        .sequence_number()
1673        >= checkpoint.sequence_number()
1674    {
1675        debug!("checkpoint was already created via consensus output");
1676        return Ok(checkpoint);
1677    }
1678
1679    // Request checkpoint contents from peers.
1680    let peers = PeerBalancer::new(
1681        &network,
1682        peer_heights.clone(),
1683        PeerCheckpointRequestType::Content,
1684    )
1685    .with_checkpoint(*checkpoint.sequence_number());
1686    let now = tokio::time::Instant::now();
1687    let Some(_contents) = get_full_checkpoint_contents(
1688        peers,
1689        &store,
1690        peer_heights.clone(),
1691        &checkpoint,
1692        use_get_checkpoint_contents_v2,
1693        timeout,
1694    )
1695    .await
1696    else {
1697        // Delay completion in case of error so we don't hammer the network with retries.
1698        let duration = peer_heights
1699            .read()
1700            .unwrap()
1701            .wait_interval_when_no_peer_to_sync_content();
1702        if now.elapsed() < duration {
1703            let duration = duration - now.elapsed();
1704            info!("retrying checkpoint sync after {:?}", duration);
1705            tokio::time::sleep(duration).await;
1706        }
1707        return Err(checkpoint);
1708    };
1709    debug!("completed checkpoint contents sync");
1710    Ok(checkpoint)
1711}
1712
1713#[instrument(level = "debug", skip_all)]
1714async fn get_full_checkpoint_contents<S>(
1715    peers: PeerBalancer,
1716    store: S,
1717    peer_heights: Arc<RwLock<PeerHeights>>,
1718    checkpoint: &VerifiedCheckpoint,
1719    use_get_checkpoint_contents_v2: bool,
1720    timeout: Duration,
1721) -> Option<VersionedFullCheckpointContents>
1722where
1723    S: WriteStore,
1724{
1725    let sequence_number = checkpoint.sequence_number;
1726    let digest = checkpoint.content_digest;
1727    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1728        debug!("store already contains checkpoint contents");
1729        return Some(contents);
1730    }
1731
1732    // Iterate through our selected peers trying each one in turn until we're able to
1733    // successfully get the target checkpoint
1734    for mut peer in peers {
1735        let peer_id = peer.inner().peer_id();
1736        debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1737        let request = Request::new(digest).with_timeout(timeout);
1738        let start = Instant::now();
1739        let result = if use_get_checkpoint_contents_v2 {
1740            peer.get_checkpoint_contents_v2(request).await
1741        } else {
1742            peer.get_checkpoint_contents(request)
1743                .await
1744                .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1745        };
1746        let elapsed = start.elapsed();
1747
1748        let contents = match result {
1749            Ok(response) => match response.into_inner() {
1750                Some(c) => Some(c),
1751                None => {
1752                    trace!("peer unable to help sync");
1753                    peer_heights.write().unwrap().record_failure(peer_id);
1754                    None
1755                }
1756            },
1757            Err(e) => {
1758                trace!("{e:?}");
1759                peer_heights.write().unwrap().record_failure(peer_id);
1760                None
1761            }
1762        };
1763
1764        let Some(contents) = contents else {
1765            continue;
1766        };
1767
1768        if contents.verify_digests(digest).is_ok() {
1769            let size =
1770                bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1771            peer_heights
1772                .write()
1773                .unwrap()
1774                .record_success(peer_id, size, elapsed);
1775            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1776            store
1777                .insert_checkpoint_contents(checkpoint, verified_contents)
1778                .expect("store operation should not fail");
1779            return Some(contents);
1780        }
1781    }
1782    debug!("no peers had checkpoint contents");
1783    None
1784}
1785
1786async fn update_checkpoint_watermark_metrics<S>(
1787    mut recv: oneshot::Receiver<()>,
1788    store: S,
1789    metrics: Metrics,
1790) -> Result<()>
1791where
1792    S: WriteStore + Clone + Send + Sync,
1793{
1794    let mut interval = tokio::time::interval(Duration::from_secs(5));
1795    loop {
1796        tokio::select! {
1797             _now = interval.tick() => {
1798                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1799                    .expect("store operation should not fail");
1800                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1801                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1802                    .expect("store operation should not fail");
1803                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1804             },
1805            _ = &mut recv => break,
1806        }
1807    }
1808    Ok(())
1809}