sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::Duration,
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::{Pipe, TapFallible, TapOptional};
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99/// A handle to the StateSync subsystem.
100///
101/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
102/// dropped, the StateSync system will be gracefully shutdown.
103#[derive(Clone, Debug)]
104pub struct Handle {
105    sender: mpsc::Sender<StateSyncMessage>,
106    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
111    /// to other nodes on the network.
112    ///
113    /// # Invariant
114    ///
115    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
116    /// persistent storage. This includes CheckpointContents and all Transactions and
117    /// TransactionEffects included therein.
118    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119        self.sender
120            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121            .await
122            .unwrap()
123    }
124
125    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
126    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127        self.checkpoint_event_sender.subscribe()
128    }
129}
130
131struct PeerHeights {
132    /// Table used to track the highest checkpoint for each of our peers.
133    peers: HashMap<PeerId, PeerStateSyncInfo>,
134    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
135    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
136
137    // The amount of time to wait before retry if there are no peers to sync content from.
138    wait_interval_when_no_peer_to_sync_content: Duration,
139}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq)]
142struct PeerStateSyncInfo {
143    /// The digest of the Peer's genesis checkpoint.
144    genesis_checkpoint_digest: CheckpointDigest,
145    /// Indicates if this Peer is on the same chain as us.
146    on_same_chain_as_us: bool,
147    /// Highest checkpoint sequence number we know of for this Peer.
148    height: CheckpointSequenceNumber,
149    /// lowest available checkpoint watermark for this Peer.
150    /// This defaults to 0 for now.
151    lowest: CheckpointSequenceNumber,
152}
153
154impl PeerHeights {
155    pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
156        self.highest_known_checkpoint_sequence_number()
157            .and_then(|s| self.sequence_number_to_digest.get(&s))
158            .and_then(|digest| self.unprocessed_checkpoints.get(digest))
159    }
160
161    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
162        self.peers
163            .values()
164            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
165            .max()
166    }
167
168    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
169        self.peers
170            .iter()
171            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
172    }
173
174    // Returns a bool that indicates if the update was done successfully.
175    //
176    // This will return false if the given peer doesn't have an entry or is not on the same chain
177    // as us
178    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
179    pub fn update_peer_info(
180        &mut self,
181        peer_id: PeerId,
182        checkpoint: Checkpoint,
183        low_watermark: Option<CheckpointSequenceNumber>,
184    ) -> bool {
185        debug!("Update peer info");
186
187        let info = match self.peers.get_mut(&peer_id) {
188            Some(info) if info.on_same_chain_as_us => info,
189            _ => return false,
190        };
191
192        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
193        if let Some(low_watermark) = low_watermark {
194            info.lowest = low_watermark;
195        }
196        self.insert_checkpoint(checkpoint);
197
198        true
199    }
200
201    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
202    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
203        use std::collections::hash_map::Entry;
204        debug!("Insert peer info");
205
206        match self.peers.entry(peer_id) {
207            Entry::Occupied(mut entry) => {
208                // If there's already an entry and the genesis checkpoint digests match then update
209                // the maximum height. Otherwise we'll use the more recent one
210                let entry = entry.get_mut();
211                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
212                    entry.height = std::cmp::max(entry.height, info.height);
213                } else {
214                    *entry = info;
215                }
216            }
217            Entry::Vacant(entry) => {
218                entry.insert(info);
219            }
220        }
221    }
222
223    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
224        if let Some(info) = self.peers.get_mut(&peer_id) {
225            info.on_same_chain_as_us = false;
226        }
227    }
228
229    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
230        self.unprocessed_checkpoints
231            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
232        self.sequence_number_to_digest
233            .retain(|&s, _digest| s > sequence_number);
234    }
235
236    // TODO: also record who gives this checkpoint info for peer quality measurement?
237    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
238        let digest = *checkpoint.digest();
239        let sequence_number = *checkpoint.sequence_number();
240        self.unprocessed_checkpoints.insert(digest, checkpoint);
241        self.sequence_number_to_digest
242            .insert(sequence_number, digest);
243    }
244
245    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
246        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
247            self.sequence_number_to_digest
248                .remove(checkpoint.sequence_number());
249        }
250    }
251
252    pub fn get_checkpoint_by_sequence_number(
253        &self,
254        sequence_number: CheckpointSequenceNumber,
255    ) -> Option<&Checkpoint> {
256        self.sequence_number_to_digest
257            .get(&sequence_number)
258            .and_then(|digest| self.get_checkpoint_by_digest(digest))
259    }
260
261    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
262        self.unprocessed_checkpoints.get(digest)
263    }
264
265    #[cfg(test)]
266    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
267        self.wait_interval_when_no_peer_to_sync_content = duration;
268    }
269
270    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
271        self.wait_interval_when_no_peer_to_sync_content
272    }
273}
274
275// PeerBalancer is an Iterator that selects peers based on RTT with some added randomness.
276#[derive(Clone)]
277struct PeerBalancer {
278    peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
279    requested_checkpoint: Option<CheckpointSequenceNumber>,
280    request_type: PeerCheckpointRequestType,
281}
282
283#[derive(Clone)]
284enum PeerCheckpointRequestType {
285    Summary,
286    Content,
287}
288
289impl PeerBalancer {
290    pub fn new(
291        network: &anemo::Network,
292        peer_heights: Arc<RwLock<PeerHeights>>,
293        request_type: PeerCheckpointRequestType,
294    ) -> Self {
295        let mut peers: Vec<_> = peer_heights
296            .read()
297            .unwrap()
298            .peers_on_same_chain()
299            // Filter out any peers who we aren't connected with.
300            .filter_map(|(peer_id, info)| {
301                network
302                    .peer(*peer_id)
303                    .map(|peer| (peer.connection_rtt(), peer, *info))
304            })
305            .collect();
306        peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
307        Self {
308            peers: peers
309                .into_iter()
310                .map(|(_, peer, info)| (peer, info))
311                .collect(),
312            requested_checkpoint: None,
313            request_type,
314        }
315    }
316
317    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
318        self.requested_checkpoint = Some(checkpoint);
319        self
320    }
321}
322
323impl Iterator for PeerBalancer {
324    type Item = StateSyncClient<anemo::Peer>;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        while !self.peers.is_empty() {
328            const SELECTION_WINDOW: usize = 2;
329            let idx =
330                rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
331            let (peer, info) = self.peers.remove(idx).unwrap();
332            let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
333            match &self.request_type {
334                // Summary will never be pruned
335                PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
336                    return Some(StateSyncClient::new(peer));
337                }
338                PeerCheckpointRequestType::Content
339                    if info.height >= requested_checkpoint
340                        && info.lowest <= requested_checkpoint =>
341                {
342                    return Some(StateSyncClient::new(peer));
343                }
344                _ => {}
345            }
346        }
347        None
348    }
349}
350
351#[derive(Clone, Debug)]
352enum StateSyncMessage {
353    StartSyncJob,
354    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
355    // of the new checkpoint.
356    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
357    // Notification that the checkpoint content sync task will send to the event loop in the event
358    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
359    // synced at the same time, only the highest checkpoint is sent.
360    SyncedCheckpoint(Box<VerifiedCheckpoint>),
361}
362
363struct StateSyncEventLoop<S> {
364    config: StateSyncConfig,
365
366    mailbox: mpsc::Receiver<StateSyncMessage>,
367    /// Weak reference to our own mailbox
368    weak_sender: mpsc::WeakSender<StateSyncMessage>,
369
370    tasks: JoinSet<()>,
371    sync_checkpoint_summaries_task: Option<AbortHandle>,
372    sync_checkpoint_contents_task: Option<AbortHandle>,
373    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
374
375    store: S,
376    peer_heights: Arc<RwLock<PeerHeights>>,
377    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
378    network: anemo::Network,
379    metrics: Metrics,
380
381    sync_checkpoint_from_archive_task: Option<AbortHandle>,
382    archive_config: Option<ArchiveReaderConfig>,
383}
384
385impl<S> StateSyncEventLoop<S>
386where
387    S: WriteStore + Clone + Send + Sync + 'static,
388{
389    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
390    // and that the only "await" points are from the select macro picking which event to handle.
391    // This ensures that the event loop is able to process events at a high speed and reduce the
392    // chance for building up a backlog of events to process.
393    pub async fn start(mut self) {
394        info!("State-Synchronizer started");
395
396        self.config.pinned_checkpoints.sort();
397
398        let mut interval = tokio::time::interval(self.config.interval_period());
399        let mut peer_events = {
400            let (subscriber, peers) = self.network.subscribe().unwrap();
401            for peer_id in peers {
402                self.spawn_get_latest_from_peer(peer_id);
403            }
404            subscriber
405        };
406        let (
407            target_checkpoint_contents_sequence_sender,
408            target_checkpoint_contents_sequence_receiver,
409        ) = watch::channel(0);
410
411        // Spawn tokio task to update metrics periodically in the background
412        let (_sender, receiver) = oneshot::channel();
413        tokio::spawn(update_checkpoint_watermark_metrics(
414            receiver,
415            self.store.clone(),
416            self.metrics.clone(),
417        ));
418
419        // Start checkpoint contents sync loop.
420        let task = sync_checkpoint_contents(
421            self.network.clone(),
422            self.store.clone(),
423            self.peer_heights.clone(),
424            self.weak_sender.clone(),
425            self.checkpoint_event_sender.clone(),
426            self.config.checkpoint_content_download_concurrency(),
427            self.config.checkpoint_content_download_tx_concurrency(),
428            self.config.checkpoint_content_timeout(),
429            target_checkpoint_contents_sequence_receiver,
430        );
431        let task_handle = self.tasks.spawn(task);
432        self.sync_checkpoint_contents_task = Some(task_handle);
433
434        // Start archive based checkpoint content sync loop.
435        // TODO: Consider switching to sync from archive only on startup.
436        // Right now because the peer set is fixed at startup, a node may eventually
437        // end up with peers who have all purged their local state. In such a scenario it will be
438        // stuck until restart when it ends up with a different set of peers. Once the discovery
439        // mechanism can dynamically identify and connect to other peers on the network, we will rely
440        // on sync from archive as a fall back.
441        let task = sync_checkpoint_contents_from_archive(
442            self.network.clone(),
443            self.archive_config.clone(),
444            self.store.clone(),
445            self.peer_heights.clone(),
446            self.metrics.clone(),
447        );
448        let task_handle = self.tasks.spawn(task);
449        self.sync_checkpoint_from_archive_task = Some(task_handle);
450
451        // Start main loop.
452        loop {
453            tokio::select! {
454                now = interval.tick() => {
455                    self.handle_tick(now.into_std());
456                },
457                maybe_message = self.mailbox.recv() => {
458                    // Once all handles to our mailbox have been dropped this
459                    // will yield `None` and we can terminate the event loop
460                    if let Some(message) = maybe_message {
461                        self.handle_message(message);
462                    } else {
463                        break;
464                    }
465                },
466                peer_event = peer_events.recv() => {
467                    self.handle_peer_event(peer_event);
468                },
469                Some(task_result) = self.tasks.join_next() => {
470                    match task_result {
471                        Ok(()) => {},
472                        Err(e) => {
473                            if e.is_cancelled() {
474                                // avoid crashing on ungraceful shutdown
475                            } else if e.is_panic() {
476                                // propagate panics.
477                                std::panic::resume_unwind(e.into_panic());
478                            } else {
479                                panic!("task failed: {e}");
480                            }
481                        },
482                    };
483
484                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
485                        panic!("sync_checkpoint_contents task unexpectedly terminated")
486                    }
487
488                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
489                        self.sync_checkpoint_summaries_task = None;
490                    }
491
492                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
493                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
494                    }
495                },
496            }
497
498            self.maybe_start_checkpoint_summary_sync_task();
499            self.maybe_trigger_checkpoint_contents_sync_task(
500                &target_checkpoint_contents_sequence_sender,
501            );
502        }
503
504        info!("State-Synchronizer ended");
505    }
506
507    fn handle_message(&mut self, message: StateSyncMessage) {
508        debug!("Received message: {:?}", message);
509        match message {
510            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
511            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
512                self.handle_checkpoint_from_consensus(checkpoint)
513            }
514            // After we've successfully synced a checkpoint we can notify our peers
515            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
516                self.spawn_notify_peers_of_checkpoint(*checkpoint)
517            }
518        }
519    }
520
521    // Handle a checkpoint that we received from consensus
522    #[instrument(level = "debug", skip_all)]
523    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
524        // Always check previous_digest matches in case there is a gap between
525        // state sync and consensus.
526        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
527            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
528            .digest();
529        if checkpoint.previous_digest != Some(prev_digest) {
530            panic!(
531                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
532                checkpoint.sequence_number(),
533                Some(prev_digest),
534                checkpoint.previous_digest
535            );
536        }
537
538        let latest_checkpoint = self
539            .store
540            .get_highest_verified_checkpoint()
541            .expect("store operation should not fail");
542
543        // If this is an older checkpoint, just ignore it
544        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
545            return;
546        }
547
548        let checkpoint = *checkpoint;
549        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
550        if *checkpoint.sequence_number() > next_sequence_number {
551            debug!(
552                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
553                next_sequence_number,
554                checkpoint.sequence_number()
555            );
556        }
557
558        // Because checkpoint from consensus sends in order, when we have checkpoint n,
559        // we must have all of the checkpoints before n from either state sync or consensus.
560        #[cfg(debug_assertions)]
561        {
562            let _ = (next_sequence_number..=*checkpoint.sequence_number())
563                .map(|n| {
564                    let checkpoint = self
565                        .store
566                        .get_checkpoint_by_sequence_number(n)
567                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
568                    self.store
569                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
570                        .unwrap_or_else(|| {
571                            panic!(
572                                "store should contain checkpoint contents for {:?}",
573                                checkpoint.content_digest
574                            )
575                        });
576                })
577                .collect::<Vec<_>>();
578        }
579
580        // If this is the last checkpoint of a epoch, we need to make sure
581        // new committee is in store before we verify newer checkpoints in next epoch.
582        // This could happen before this validator's reconfiguration finishes, because
583        // state sync does not reconfig.
584        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
585        // committee insertion in two places (only in `insert_checkpoint`).
586        if let Some(EndOfEpochData {
587            next_epoch_committee,
588            ..
589        }) = checkpoint.end_of_epoch_data.as_ref()
590        {
591            let next_committee = next_epoch_committee.iter().cloned().collect();
592            let committee =
593                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
594            self.store
595                .insert_committee(committee)
596                .expect("store operation should not fail");
597        }
598
599        self.store
600            .update_highest_verified_checkpoint(&checkpoint)
601            .expect("store operation should not fail");
602        self.store
603            .update_highest_synced_checkpoint(&checkpoint)
604            .expect("store operation should not fail");
605
606        // We don't care if no one is listening as this is a broadcast channel
607        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
608
609        self.spawn_notify_peers_of_checkpoint(checkpoint);
610    }
611
612    fn handle_peer_event(
613        &mut self,
614        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
615    ) {
616        use tokio::sync::broadcast::error::RecvError;
617
618        match peer_event {
619            Ok(PeerEvent::NewPeer(peer_id)) => {
620                self.spawn_get_latest_from_peer(peer_id);
621            }
622            Ok(PeerEvent::LostPeer(peer_id, _)) => {
623                self.peer_heights.write().unwrap().peers.remove(&peer_id);
624            }
625
626            Err(RecvError::Closed) => {
627                panic!("PeerEvent channel shouldn't be able to be closed");
628            }
629
630            Err(RecvError::Lagged(_)) => {
631                trace!("State-Sync fell behind processing PeerEvents");
632            }
633        }
634    }
635
636    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
637        if let Some(peer) = self.network.peer(peer_id) {
638            let genesis_checkpoint_digest = *self
639                .store
640                .get_checkpoint_by_sequence_number(0)
641                .expect("store should contain genesis checkpoint")
642                .digest();
643            let task = get_latest_from_peer(
644                genesis_checkpoint_digest,
645                peer,
646                self.peer_heights.clone(),
647                self.config.timeout(),
648            );
649            self.tasks.spawn(task);
650        }
651    }
652
653    fn handle_tick(&mut self, _now: std::time::Instant) {
654        let task = query_peers_for_their_latest_checkpoint(
655            self.network.clone(),
656            self.peer_heights.clone(),
657            self.weak_sender.clone(),
658            self.config.timeout(),
659        );
660        self.tasks.spawn(task);
661
662        if let Some(layer) = self.download_limit_layer.as_ref() {
663            layer.maybe_prune_map();
664        }
665    }
666
667    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
668        // Only run one sync task at a time
669        if self.sync_checkpoint_summaries_task.is_some() {
670            return;
671        }
672
673        let highest_processed_checkpoint = self
674            .store
675            .get_highest_verified_checkpoint()
676            .expect("store operation should not fail");
677
678        let highest_known_checkpoint = self
679            .peer_heights
680            .read()
681            .unwrap()
682            .highest_known_checkpoint()
683            .cloned();
684
685        if Some(highest_processed_checkpoint.sequence_number())
686            < highest_known_checkpoint
687                .as_ref()
688                .map(|x| x.sequence_number())
689        {
690            // start sync job
691            let task = sync_to_checkpoint(
692                self.network.clone(),
693                self.store.clone(),
694                self.peer_heights.clone(),
695                self.metrics.clone(),
696                self.config.pinned_checkpoints.clone(),
697                self.config.checkpoint_header_download_concurrency(),
698                self.config.timeout(),
699                // The if condition should ensure that this is Some
700                highest_known_checkpoint.unwrap(),
701            )
702            .map(|result| match result {
703                Ok(()) => {}
704                Err(e) => {
705                    debug!("error syncing checkpoint {e}");
706                }
707            });
708            let task_handle = self.tasks.spawn(task);
709            self.sync_checkpoint_summaries_task = Some(task_handle);
710        }
711    }
712
713    fn maybe_trigger_checkpoint_contents_sync_task(
714        &mut self,
715        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
716    ) {
717        let highest_verified_checkpoint = self
718            .store
719            .get_highest_verified_checkpoint()
720            .expect("store operation should not fail");
721        let highest_synced_checkpoint = self
722            .store
723            .get_highest_synced_checkpoint()
724            .expect("store operation should not fail");
725
726        if highest_verified_checkpoint.sequence_number()
727            > highest_synced_checkpoint.sequence_number()
728            // skip if we aren't connected to any peers that can help
729            && self
730                .peer_heights
731                .read()
732                .unwrap()
733                .highest_known_checkpoint_sequence_number()
734                > Some(*highest_synced_checkpoint.sequence_number())
735        {
736            let _ = target_sequence_channel.send_if_modified(|num| {
737                let new_num = *highest_verified_checkpoint.sequence_number();
738                if *num == new_num {
739                    return false;
740                }
741                *num = new_num;
742                true
743            });
744        }
745    }
746
747    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
748        let task = notify_peers_of_checkpoint(
749            self.network.clone(),
750            self.peer_heights.clone(),
751            checkpoint,
752            self.config.timeout(),
753        );
754        self.tasks.spawn(task);
755    }
756}
757
758async fn notify_peers_of_checkpoint(
759    network: anemo::Network,
760    peer_heights: Arc<RwLock<PeerHeights>>,
761    checkpoint: VerifiedCheckpoint,
762    timeout: Duration,
763) {
764    let futs = peer_heights
765        .read()
766        .unwrap()
767        .peers_on_same_chain()
768        // Filter out any peers who we know already have a checkpoint higher than this one
769        .filter_map(|(peer_id, info)| {
770            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
771        })
772        // Filter out any peers who we aren't connected with
773        .flat_map(|peer_id| network.peer(*peer_id))
774        .map(StateSyncClient::new)
775        .map(|mut client| {
776            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
777            async move { client.push_checkpoint_summary(request).await }
778        })
779        .collect::<Vec<_>>();
780    futures::future::join_all(futs).await;
781}
782
783async fn get_latest_from_peer(
784    our_genesis_checkpoint_digest: CheckpointDigest,
785    peer: anemo::Peer,
786    peer_heights: Arc<RwLock<PeerHeights>>,
787    timeout: Duration,
788) {
789    let peer_id = peer.peer_id();
790    let mut client = StateSyncClient::new(peer);
791
792    let info = {
793        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
794
795        if let Some(info) = maybe_info {
796            info
797        } else {
798            // TODO do we want to create a new API just for querying a node's chainid?
799            //
800            // We need to query this node's genesis checkpoint to see if they're on the same chain
801            // as us
802            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
803                .with_timeout(timeout);
804            let response = client
805                .get_checkpoint_summary(request)
806                .await
807                .map(Response::into_inner);
808
809            let info = match response {
810                Ok(Some(checkpoint)) => {
811                    let digest = *checkpoint.digest();
812                    PeerStateSyncInfo {
813                        genesis_checkpoint_digest: digest,
814                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
815                        height: *checkpoint.sequence_number(),
816                        lowest: CheckpointSequenceNumber::default(),
817                    }
818                }
819                Ok(None) => PeerStateSyncInfo {
820                    genesis_checkpoint_digest: CheckpointDigest::default(),
821                    on_same_chain_as_us: false,
822                    height: CheckpointSequenceNumber::default(),
823                    lowest: CheckpointSequenceNumber::default(),
824                },
825                Err(status) => {
826                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
827                    return;
828                }
829            };
830            peer_heights
831                .write()
832                .unwrap()
833                .insert_peer_info(peer_id, info);
834            info
835        }
836    };
837
838    // Bail early if this node isn't on the same chain as us
839    if !info.on_same_chain_as_us {
840        trace!(?info, "Peer {peer_id} not on same chain as us");
841        return;
842    }
843    let Some((highest_checkpoint, low_watermark)) =
844        query_peer_for_latest_info(&mut client, timeout).await
845    else {
846        return;
847    };
848    peer_heights
849        .write()
850        .unwrap()
851        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
852}
853
854/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark
855async fn query_peer_for_latest_info(
856    client: &mut StateSyncClient<anemo::Peer>,
857    timeout: Duration,
858) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
859    let request = Request::new(()).with_timeout(timeout);
860    let response = client
861        .get_checkpoint_availability(request)
862        .await
863        .map(Response::into_inner);
864    match response {
865        Ok(GetCheckpointAvailabilityResponse {
866            highest_synced_checkpoint,
867            lowest_available_checkpoint,
868        }) => {
869            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
870        }
871        Err(status) => {
872            // If peer hasn't upgraded they would return 404 NotFound error
873            if status.status() != anemo::types::response::StatusCode::NotFound {
874                trace!("get_checkpoint_availability request failed: {status:?}");
875                return None;
876            }
877        }
878    };
879
880    // Then we try the old query
881    // TODO: remove this once the new feature stabilizes
882    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
883    let response = client
884        .get_checkpoint_summary(request)
885        .await
886        .map(Response::into_inner);
887    match response {
888        Ok(Some(checkpoint)) => Some((checkpoint, None)),
889        Ok(None) => None,
890        Err(status) => {
891            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
892            None
893        }
894    }
895}
896
897#[instrument(level = "debug", skip_all)]
898async fn query_peers_for_their_latest_checkpoint(
899    network: anemo::Network,
900    peer_heights: Arc<RwLock<PeerHeights>>,
901    sender: mpsc::WeakSender<StateSyncMessage>,
902    timeout: Duration,
903) {
904    let peer_heights = &peer_heights;
905    let futs = peer_heights
906        .read()
907        .unwrap()
908        .peers_on_same_chain()
909        // Filter out any peers who we aren't connected with
910        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
911        .map(|peer| {
912            let peer_id = peer.peer_id();
913            let mut client = StateSyncClient::new(peer);
914
915            async move {
916                let response = query_peer_for_latest_info(&mut client, timeout).await;
917                match response {
918                    Some((highest_checkpoint, low_watermark)) => peer_heights
919                        .write()
920                        .unwrap()
921                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
922                        .then_some(highest_checkpoint),
923                    None => None,
924                }
925            }
926        })
927        .collect::<Vec<_>>();
928
929    debug!("Query {} peers for latest checkpoint", futs.len());
930
931    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
932
933    let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
934
935    let our_highest_checkpoint = peer_heights
936        .read()
937        .unwrap()
938        .highest_known_checkpoint()
939        .cloned();
940
941    debug!(
942        "Our highest checkpoint {:?}, peers highest checkpoint {:?}",
943        our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
944        highest_checkpoint.as_ref().map(|c| c.sequence_number())
945    );
946
947    let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
948        (Some(theirs), None) => theirs,
949        (Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
950        _ => return,
951    };
952
953    if let Some(sender) = sender.upgrade() {
954        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
955    }
956}
957
958async fn sync_to_checkpoint<S>(
959    network: anemo::Network,
960    store: S,
961    peer_heights: Arc<RwLock<PeerHeights>>,
962    metrics: Metrics,
963    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
964    checkpoint_header_download_concurrency: usize,
965    timeout: Duration,
966    checkpoint: Checkpoint,
967) -> Result<()>
968where
969    S: WriteStore,
970{
971    metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
972
973    let mut current = store
974        .get_highest_verified_checkpoint()
975        .expect("store operation should not fail");
976    if current.sequence_number() >= checkpoint.sequence_number() {
977        return Err(anyhow::anyhow!(
978            "target checkpoint {} is older than highest verified checkpoint {}",
979            checkpoint.sequence_number(),
980            current.sequence_number(),
981        ));
982    }
983
984    let peer_balancer = PeerBalancer::new(
985        &network,
986        peer_heights.clone(),
987        PeerCheckpointRequestType::Summary,
988    );
989    // range of the next sequence_numbers to fetch
990    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
991        ..=*checkpoint.sequence_number())
992        .map(|next| {
993            let peers = peer_balancer.clone().with_checkpoint(next);
994            let peer_heights = peer_heights.clone();
995            let pinned_checkpoints = &pinned_checkpoints;
996            async move {
997                if let Some(checkpoint) = peer_heights
998                    .read()
999                    .unwrap()
1000                    .get_checkpoint_by_sequence_number(next)
1001                {
1002                    return (Some(checkpoint.to_owned()), next, None);
1003                }
1004
1005                // Iterate through peers trying each one in turn until we're able to
1006                // successfully get the target checkpoint
1007                for mut peer in peers {
1008                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1009                        .with_timeout(timeout);
1010                    if let Some(checkpoint) = peer
1011                        .get_checkpoint_summary(request)
1012                        .await
1013                        .tap_err(|e| trace!("{e:?}"))
1014                        .ok()
1015                        .and_then(Response::into_inner)
1016                        .tap_none(|| trace!("peer unable to help sync"))
1017                    {
1018                        // peer didn't give us a checkpoint with the height that we requested
1019                        if *checkpoint.sequence_number() != next {
1020                            tracing::debug!(
1021                                "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1022                                checkpoint.sequence_number()
1023                            );
1024                            continue;
1025                        }
1026
1027                        // peer gave us a checkpoint whose digest does not match pinned digest
1028                        let checkpoint_digest = checkpoint.digest();
1029                        if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1030                            checkpoint.sequence_number(),
1031                            |(seq_num, _digest)| *seq_num
1032                        )
1033                            && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1034                                tracing::debug!(
1035                                    "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1036                                    pinned_checkpoints[pinned_digest_index].1,
1037                                    checkpoint_digest
1038                                );
1039                                continue;
1040                            }
1041
1042                        // Insert in our store in the event that things fail and we need to retry
1043                        peer_heights
1044                            .write()
1045                            .unwrap()
1046                            .insert_checkpoint(checkpoint.clone());
1047                        return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1048                    }
1049                }
1050                (None, next, None)
1051            }
1052        })
1053        .pipe(futures::stream::iter)
1054        .buffered(checkpoint_header_download_concurrency);
1055
1056    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1057        assert_eq!(
1058            current
1059                .sequence_number()
1060                .checked_add(1)
1061                .expect("exhausted u64"),
1062            next
1063        );
1064
1065        // Verify the checkpoint
1066        let checkpoint = 'cp: {
1067            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1068                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1069            })?;
1070            // Skip verification for manually pinned checkpoints.
1071            if pinned_checkpoints
1072                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1073                .is_ok()
1074            {
1075                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1076            }
1077            match verify_checkpoint(&current, &store, checkpoint) {
1078                Ok(verified_checkpoint) => verified_checkpoint,
1079                Err(checkpoint) => {
1080                    let mut peer_heights = peer_heights.write().unwrap();
1081                    // Remove the checkpoint from our temporary store so that we can try querying
1082                    // another peer for a different one
1083                    peer_heights.remove_checkpoint(checkpoint.digest());
1084
1085                    // Mark peer as not on the same chain as us
1086                    if let Some(peer_id) = maybe_peer_id {
1087                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1088                    }
1089
1090                    return Err(anyhow::anyhow!(
1091                        "unable to verify checkpoint {checkpoint:?}"
1092                    ));
1093                }
1094            }
1095        };
1096
1097        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1098        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1099            metrics.checkpoint_summary_age_metrics()
1100        {
1101            checkpoint.report_checkpoint_age(
1102                checkpoint_summary_age_metric,
1103                checkpoint_summary_age_metric_deprecated,
1104            );
1105        }
1106
1107        current = checkpoint.clone();
1108        // Insert the newly verified checkpoint into our store, which will bump our highest
1109        // verified checkpoint watermark as well.
1110        store
1111            .insert_checkpoint(&checkpoint)
1112            .expect("store operation should not fail");
1113    }
1114
1115    peer_heights
1116        .write()
1117        .unwrap()
1118        .cleanup_old_checkpoints(*checkpoint.sequence_number());
1119
1120    Ok(())
1121}
1122
1123async fn sync_checkpoint_contents_from_archive<S>(
1124    network: anemo::Network,
1125    archive_config: Option<ArchiveReaderConfig>,
1126    store: S,
1127    peer_heights: Arc<RwLock<PeerHeights>>,
1128    metrics: Metrics,
1129) where
1130    S: WriteStore + Clone + Send + Sync + 'static,
1131{
1132    loop {
1133        sync_checkpoint_contents_from_archive_iteration(
1134            &network,
1135            &archive_config,
1136            store.clone(),
1137            peer_heights.clone(),
1138            metrics.clone(),
1139        )
1140        .await;
1141        tokio::time::sleep(Duration::from_secs(5)).await;
1142    }
1143}
1144
1145async fn sync_checkpoint_contents_from_archive_iteration<S>(
1146    network: &anemo::Network,
1147    archive_config: &Option<ArchiveReaderConfig>,
1148    store: S,
1149    peer_heights: Arc<RwLock<PeerHeights>>,
1150    metrics: Metrics,
1151) where
1152    S: WriteStore + Clone + Send + Sync + 'static,
1153{
1154    let peers: Vec<_> = peer_heights
1155        .read()
1156        .unwrap()
1157        .peers_on_same_chain()
1158        // Filter out any peers who we aren't connected with.
1159        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1160        .collect();
1161    let lowest_checkpoint_on_peers = peers
1162        .iter()
1163        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1164        .min();
1165    let highest_synced = store
1166        .get_highest_synced_checkpoint()
1167        .expect("store operation should not fail")
1168        .sequence_number;
1169    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1170        highest_synced < lowest_checkpoint_on_peers
1171    } else {
1172        false
1173    };
1174    debug!(
1175        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1176        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1177    );
1178    if sync_from_archive {
1179        let start = highest_synced
1180            .checked_add(1)
1181            .expect("Checkpoint seq num overflow");
1182        let end = lowest_checkpoint_on_peers.unwrap();
1183
1184        let Some(archive_config) = archive_config else {
1185            warn!("Failed to find an archive reader to complete the state sync request");
1186            return;
1187        };
1188        let Some(ingestion_url) = &archive_config.ingestion_url else {
1189            warn!("Archival ingestion url for state sync is not configured");
1190            return;
1191        };
1192        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1193            warn!("{} can't be used as an archival fallback", ingestion_url);
1194            return;
1195        }
1196        let reader_options = ReaderOptions {
1197            batch_size: archive_config.download_concurrency.into(),
1198            upper_limit: Some(end),
1199            ..Default::default()
1200        };
1201        let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1202            StateSyncWorker(store, metrics),
1203            ingestion_url.clone(),
1204            archive_config.remote_store_options.clone(),
1205            start,
1206            1,
1207            Some(reader_options),
1208        )
1209        .await
1210        else {
1211            return;
1212        };
1213        match executor.await {
1214            Ok(_) => info!(
1215                "State sync from archive is complete. Checkpoints downloaded = {:?}",
1216                end - start
1217            ),
1218            Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1219        }
1220    }
1221}
1222
1223async fn sync_checkpoint_contents<S>(
1224    network: anemo::Network,
1225    store: S,
1226    peer_heights: Arc<RwLock<PeerHeights>>,
1227    sender: mpsc::WeakSender<StateSyncMessage>,
1228    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1229    checkpoint_content_download_concurrency: usize,
1230    checkpoint_content_download_tx_concurrency: u64,
1231    timeout: Duration,
1232    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1233) where
1234    S: WriteStore + Clone,
1235{
1236    let mut highest_synced = store
1237        .get_highest_synced_checkpoint()
1238        .expect("store operation should not fail");
1239
1240    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1241    let mut target_sequence_cursor = 0;
1242    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1243    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1244
1245    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1246
1247    loop {
1248        tokio::select! {
1249            result = target_sequence_channel.changed() => {
1250                match result {
1251                    Ok(()) => {
1252                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1253                    }
1254                    Err(_) => {
1255                        // Watch channel is closed, exit loop.
1256                        return
1257                    }
1258                }
1259            },
1260            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1261                match maybe_checkpoint {
1262                    Ok(checkpoint) => {
1263                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1264
1265                        store
1266                            .update_highest_synced_checkpoint(&checkpoint)
1267                            .expect("store operation should not fail");
1268                        // We don't care if no one is listening as this is a broadcast channel
1269                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1270                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1271                        highest_synced = checkpoint;
1272
1273                    }
1274                    Err(checkpoint) => {
1275                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1276                        if let Some(lowest_peer_checkpoint) =
1277                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1278                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1279                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1280                            }
1281                        } else {
1282                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1283
1284                        }
1285                        // Retry contents sync on failure.
1286                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1287                            network.clone(),
1288                            &store,
1289                            peer_heights.clone(),
1290                            timeout,
1291                            checkpoint,
1292                        ));
1293                    }
1294                }
1295            },
1296        }
1297
1298        // Start new tasks up to configured concurrency limits.
1299        while current_sequence < target_sequence_cursor
1300            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1301        {
1302            let next_checkpoint = store
1303                .get_checkpoint_by_sequence_number(current_sequence)
1304                .unwrap_or_else(|| panic!(
1305                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1306                    current_sequence
1307                ));
1308
1309            // Enforce transaction count concurrency limit.
1310            let tx_count = next_checkpoint.network_total_transactions
1311                - highest_started_network_total_transactions;
1312            if tx_count > tx_concurrency_remaining {
1313                break;
1314            }
1315            tx_concurrency_remaining -= tx_count;
1316
1317            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1318            current_sequence += 1;
1319            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1320                network.clone(),
1321                &store,
1322                peer_heights.clone(),
1323                timeout,
1324                next_checkpoint,
1325            ));
1326        }
1327
1328        if highest_synced
1329            .sequence_number()
1330            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1331            || checkpoint_contents_tasks.is_empty()
1332        {
1333            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1334            if let Some(sender) = sender.upgrade() {
1335                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1336                let _ = sender.send(message).await;
1337            }
1338        }
1339    }
1340}
1341
1342#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1343async fn sync_one_checkpoint_contents<S>(
1344    network: anemo::Network,
1345    store: S,
1346    peer_heights: Arc<RwLock<PeerHeights>>,
1347    timeout: Duration,
1348    checkpoint: VerifiedCheckpoint,
1349) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1350where
1351    S: WriteStore + Clone,
1352{
1353    debug!("syncing checkpoint contents");
1354
1355    // Check if we already have produced this checkpoint locally. If so, we don't need
1356    // to get it from peers anymore.
1357    if store
1358        .get_highest_synced_checkpoint()
1359        .expect("store operation should not fail")
1360        .sequence_number()
1361        >= checkpoint.sequence_number()
1362    {
1363        debug!("checkpoint was already created via consensus output");
1364        return Ok(checkpoint);
1365    }
1366
1367    // Request checkpoint contents from peers.
1368    let peers = PeerBalancer::new(
1369        &network,
1370        peer_heights.clone(),
1371        PeerCheckpointRequestType::Content,
1372    )
1373    .with_checkpoint(*checkpoint.sequence_number());
1374    let now = tokio::time::Instant::now();
1375    let Some(_contents) = get_full_checkpoint_contents(peers, &store, &checkpoint, timeout).await
1376    else {
1377        // Delay completion in case of error so we don't hammer the network with retries.
1378        let duration = peer_heights
1379            .read()
1380            .unwrap()
1381            .wait_interval_when_no_peer_to_sync_content();
1382        if now.elapsed() < duration {
1383            let duration = duration - now.elapsed();
1384            info!("retrying checkpoint sync after {:?}", duration);
1385            tokio::time::sleep(duration).await;
1386        }
1387        return Err(checkpoint);
1388    };
1389    debug!("completed checkpoint contents sync");
1390    Ok(checkpoint)
1391}
1392
1393#[instrument(level = "debug", skip_all)]
1394async fn get_full_checkpoint_contents<S>(
1395    peers: PeerBalancer,
1396    store: S,
1397    checkpoint: &VerifiedCheckpoint,
1398    timeout: Duration,
1399) -> Option<FullCheckpointContents>
1400where
1401    S: WriteStore,
1402{
1403    let sequence_number = checkpoint.sequence_number;
1404    let digest = checkpoint.content_digest;
1405    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1406        debug!("store already contains checkpoint contents");
1407        return Some(contents);
1408    }
1409
1410    // Iterate through our selected peers trying each one in turn until we're able to
1411    // successfully get the target checkpoint
1412    for mut peer in peers {
1413        debug!(
1414            ?timeout,
1415            "requesting checkpoint contents from {}",
1416            peer.inner().peer_id(),
1417        );
1418        let request = Request::new(digest).with_timeout(timeout);
1419        if let Some(contents) = peer
1420            .get_checkpoint_contents(request)
1421            .await
1422            .tap_err(|e| trace!("{e:?}"))
1423            .ok()
1424            .and_then(Response::into_inner)
1425            .tap_none(|| trace!("peer unable to help sync"))
1426            && contents.verify_digests(digest).is_ok()
1427        {
1428            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1429            store
1430                .insert_checkpoint_contents(checkpoint, verified_contents)
1431                .expect("store operation should not fail");
1432            return Some(contents);
1433        }
1434    }
1435    debug!("no peers had checkpoint contents");
1436    None
1437}
1438
1439async fn update_checkpoint_watermark_metrics<S>(
1440    mut recv: oneshot::Receiver<()>,
1441    store: S,
1442    metrics: Metrics,
1443) -> Result<()>
1444where
1445    S: WriteStore + Clone + Send + Sync,
1446{
1447    let mut interval = tokio::time::interval(Duration::from_secs(5));
1448    loop {
1449        tokio::select! {
1450             _now = interval.tick() => {
1451                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1452                    .expect("store operation should not fail");
1453                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1454                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1455                    .expect("store operation should not fail");
1456                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1457             },
1458            _ = &mut recv => break,
1459        }
1460    }
1461    Ok(())
1462}