sui_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Peer-to-peer data synchronization of checkpoints.
5//!
6//! This StateSync module is responsible for the synchronization and dissemination of checkpoints
7//! and the transactions, and their effects, contained within. This module is *not* responsible for
8//! the execution of the transactions included in a checkpoint, that process is left to another
9//! component in the system.
10//!
11//! # High-level Overview of StateSync
12//!
13//! StateSync discovers new checkpoints via a few different sources:
14//! 1. If this node is a Validator, checkpoints will be produced via consensus at which point
15//!    consensus can notify state-sync of the new checkpoint via [Handle::send_checkpoint].
16//! 2. A peer notifies us of the latest checkpoint which they have synchronized. State-Sync will
17//!    also periodically query its peers to discover what their latest checkpoint is.
18//!
19//! We keep track of two different watermarks:
20//! * highest_verified_checkpoint - This is the highest checkpoint header that we've locally
21//!   verified. This indicated that we have in our persistent store (and have verified) all
22//!   checkpoint headers up to and including this value.
23//! * highest_synced_checkpoint - This is the highest checkpoint that we've fully synchronized,
24//!   meaning we've downloaded and have in our persistent stores all of the transactions, and their
25//!   effects (but not the objects), for all checkpoints up to and including this point. This is
26//!   the watermark that is shared with other peers, either via notification or when they query for
27//!   our latest checkpoint, and is intended to be used as a guarantee of data availability.
28//!
29//! The `PeerHeights` struct is used to track the highest_synced_checkpoint watermark for all of
30//! our peers.
31//!
32//! When a new checkpoint is discovered, and we've determined that it is higher than our
33//! highest_verified_checkpoint, then StateSync will kick off a task to synchronize and verify all
34//! checkpoints between our highest_synced_checkpoint and the newly discovered checkpoint. This
35//! process is done by querying one of our peers for the checkpoints we're missing (using the
36//! `PeerHeights` struct as a way to intelligently select which peers have the data available for
37//! us to query) at which point we will locally verify the signatures on the checkpoint header with
38//! the appropriate committee (based on the epoch). As checkpoints are verified, the
39//! highest_synced_checkpoint watermark will be ratcheted up.
40//!
41//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher than
42//! highest_synced_checkpoint, StateSync will then kick off a task to synchronize the contents of
43//! all of the checkpoints from highest_synced_checkpoint..=highest_verified_checkpoint. After the
44//! contents of each checkpoint is fully downloaded, StateSync will update our
45//! highest_synced_checkpoint watermark and send out a notification on a broadcast channel
46//! indicating that a new checkpoint has been fully downloaded. Notifications on this broadcast
47//! channel will always be made in order. StateSync will also send out a notification to its peers
48//! of the newly synchronized checkpoint so that it can help other peers synchronize.
49
50use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54    collections::{HashMap, VecDeque},
55    sync::{Arc, RwLock},
56    time::Duration,
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60    committee::Committee,
61    digests::CheckpointDigest,
62    messages_checkpoint::{
63        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64        VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65    },
66    storage::WriteStore,
67};
68use tap::{Pipe, TapFallible, TapOptional};
69use tokio::sync::oneshot;
70use tokio::{
71    sync::{broadcast, mpsc, watch},
72    task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77    include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90    state_sync_client::StateSyncClient,
91    state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99/// A handle to the StateSync subsystem.
100///
101/// This handle can be cloned and shared. Once all copies of a StateSync system's Handle have been
102/// dropped, the StateSync system will be gracefully shutdown.
103#[derive(Clone, Debug)]
104pub struct Handle {
105    sender: mpsc::Sender<StateSyncMessage>,
106    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110    /// Send a newly minted checkpoint from Consensus to StateSync so that it can be disseminated
111    /// to other nodes on the network.
112    ///
113    /// # Invariant
114    ///
115    /// Consensus must only notify StateSync of new checkpoints that have been fully committed to
116    /// persistent storage. This includes CheckpointContents and all Transactions and
117    /// TransactionEffects included therein.
118    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119        self.sender
120            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121            .await
122            .unwrap()
123    }
124
125    /// Subscribe to the stream of checkpoints that have been fully synchronized and downloaded.
126    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127        self.checkpoint_event_sender.subscribe()
128    }
129}
130
131struct PeerHeights {
132    /// Table used to track the highest checkpoint for each of our peers.
133    peers: HashMap<PeerId, PeerStateSyncInfo>,
134    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
135    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
136
137    // The amount of time to wait before retry if there are no peers to sync content from.
138    wait_interval_when_no_peer_to_sync_content: Duration,
139}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq)]
142struct PeerStateSyncInfo {
143    /// The digest of the Peer's genesis checkpoint.
144    genesis_checkpoint_digest: CheckpointDigest,
145    /// Indicates if this Peer is on the same chain as us.
146    on_same_chain_as_us: bool,
147    /// Highest checkpoint sequence number we know of for this Peer.
148    height: CheckpointSequenceNumber,
149    /// lowest available checkpoint watermark for this Peer.
150    /// This defaults to 0 for now.
151    lowest: CheckpointSequenceNumber,
152}
153
154impl PeerHeights {
155    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
156        self.peers
157            .values()
158            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
159            .max()
160    }
161
162    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
163        self.peers
164            .iter()
165            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
166    }
167
168    // Returns a bool that indicates if the update was done successfully.
169    //
170    // This will return false if the given peer doesn't have an entry or is not on the same chain
171    // as us
172    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
173    pub fn update_peer_info(
174        &mut self,
175        peer_id: PeerId,
176        checkpoint: Checkpoint,
177        low_watermark: Option<CheckpointSequenceNumber>,
178    ) -> bool {
179        debug!("Update peer info");
180
181        let info = match self.peers.get_mut(&peer_id) {
182            Some(info) if info.on_same_chain_as_us => info,
183            _ => return false,
184        };
185
186        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
187        if let Some(low_watermark) = low_watermark {
188            info.lowest = low_watermark;
189        }
190        self.insert_checkpoint(checkpoint);
191
192        true
193    }
194
195    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
196    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
197        use std::collections::hash_map::Entry;
198        debug!("Insert peer info");
199
200        match self.peers.entry(peer_id) {
201            Entry::Occupied(mut entry) => {
202                // If there's already an entry and the genesis checkpoint digests match then update
203                // the maximum height. Otherwise we'll use the more recent one
204                let entry = entry.get_mut();
205                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
206                    entry.height = std::cmp::max(entry.height, info.height);
207                } else {
208                    *entry = info;
209                }
210            }
211            Entry::Vacant(entry) => {
212                entry.insert(info);
213            }
214        }
215    }
216
217    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
218        if let Some(info) = self.peers.get_mut(&peer_id) {
219            info.on_same_chain_as_us = false;
220        }
221    }
222
223    /// Updates the peer's height without storing any checkpoint data.
224    /// Returns false if the peer doesn't exist or is not on the same chain.
225    pub fn update_peer_height(
226        &mut self,
227        peer_id: PeerId,
228        height: CheckpointSequenceNumber,
229        low_watermark: Option<CheckpointSequenceNumber>,
230    ) -> bool {
231        let info = match self.peers.get_mut(&peer_id) {
232            Some(info) if info.on_same_chain_as_us => info,
233            _ => return false,
234        };
235
236        info.height = std::cmp::max(height, info.height);
237        if let Some(low_watermark) = low_watermark {
238            info.lowest = low_watermark;
239        }
240
241        true
242    }
243
244    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
245        self.unprocessed_checkpoints
246            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
247        self.sequence_number_to_digest
248            .retain(|&s, _digest| s > sequence_number);
249    }
250
251    /// Inserts a checkpoint into the unprocessed checkpoints store.
252    /// Only one checkpoint per sequence number is stored. If a checkpoint with the same
253    /// sequence number but different digest already exists, the new one is dropped.
254    // TODO: also record who gives this checkpoint info for peer quality measurement?
255    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
256        let digest = *checkpoint.digest();
257        let sequence_number = *checkpoint.sequence_number();
258
259        // Check if we already have a checkpoint for this sequence number.
260        if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
261            if *existing_digest == digest {
262                // Same checkpoint, nothing to do.
263                return;
264            }
265            tracing::info!(
266                ?sequence_number,
267                ?existing_digest,
268                ?digest,
269                "received checkpoint with same sequence number but different digest, dropping new checkpoint"
270            );
271            return;
272        }
273
274        self.unprocessed_checkpoints.insert(digest, checkpoint);
275        self.sequence_number_to_digest
276            .insert(sequence_number, digest);
277    }
278
279    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
280        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
281            self.sequence_number_to_digest
282                .remove(checkpoint.sequence_number());
283        }
284    }
285
286    pub fn get_checkpoint_by_sequence_number(
287        &self,
288        sequence_number: CheckpointSequenceNumber,
289    ) -> Option<&Checkpoint> {
290        self.sequence_number_to_digest
291            .get(&sequence_number)
292            .and_then(|digest| self.get_checkpoint_by_digest(digest))
293    }
294
295    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
296        self.unprocessed_checkpoints.get(digest)
297    }
298
299    #[cfg(test)]
300    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
301        self.wait_interval_when_no_peer_to_sync_content = duration;
302    }
303
304    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
305        self.wait_interval_when_no_peer_to_sync_content
306    }
307}
308
309// PeerBalancer is an Iterator that selects peers based on RTT with some added randomness.
310#[derive(Clone)]
311struct PeerBalancer {
312    peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
313    requested_checkpoint: Option<CheckpointSequenceNumber>,
314    request_type: PeerCheckpointRequestType,
315}
316
317#[derive(Clone)]
318enum PeerCheckpointRequestType {
319    Summary,
320    Content,
321}
322
323impl PeerBalancer {
324    pub fn new(
325        network: &anemo::Network,
326        peer_heights: Arc<RwLock<PeerHeights>>,
327        request_type: PeerCheckpointRequestType,
328    ) -> Self {
329        let mut peers: Vec<_> = peer_heights
330            .read()
331            .unwrap()
332            .peers_on_same_chain()
333            // Filter out any peers who we aren't connected with.
334            .filter_map(|(peer_id, info)| {
335                network
336                    .peer(*peer_id)
337                    .map(|peer| (peer.connection_rtt(), peer, *info))
338            })
339            .collect();
340        peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
341        Self {
342            peers: peers
343                .into_iter()
344                .map(|(_, peer, info)| (peer, info))
345                .collect(),
346            requested_checkpoint: None,
347            request_type,
348        }
349    }
350
351    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
352        self.requested_checkpoint = Some(checkpoint);
353        self
354    }
355}
356
357impl Iterator for PeerBalancer {
358    type Item = StateSyncClient<anemo::Peer>;
359
360    fn next(&mut self) -> Option<Self::Item> {
361        while !self.peers.is_empty() {
362            const SELECTION_WINDOW: usize = 2;
363            let idx =
364                rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
365            let (peer, info) = self.peers.remove(idx).unwrap();
366            let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
367            match &self.request_type {
368                // Summary will never be pruned
369                PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
370                    return Some(StateSyncClient::new(peer));
371                }
372                PeerCheckpointRequestType::Content
373                    if info.height >= requested_checkpoint
374                        && info.lowest <= requested_checkpoint =>
375                {
376                    return Some(StateSyncClient::new(peer));
377                }
378                _ => {}
379            }
380        }
381        None
382    }
383}
384
385#[derive(Clone, Debug)]
386enum StateSyncMessage {
387    StartSyncJob,
388    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our peers
389    // of the new checkpoint.
390    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
391    // Notification that the checkpoint content sync task will send to the event loop in the event
392    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
393    // synced at the same time, only the highest checkpoint is sent.
394    SyncedCheckpoint(Box<VerifiedCheckpoint>),
395}
396
397struct StateSyncEventLoop<S> {
398    config: StateSyncConfig,
399
400    mailbox: mpsc::Receiver<StateSyncMessage>,
401    /// Weak reference to our own mailbox
402    weak_sender: mpsc::WeakSender<StateSyncMessage>,
403
404    tasks: JoinSet<()>,
405    sync_checkpoint_summaries_task: Option<AbortHandle>,
406    sync_checkpoint_contents_task: Option<AbortHandle>,
407    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
408
409    store: S,
410    peer_heights: Arc<RwLock<PeerHeights>>,
411    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
412    network: anemo::Network,
413    metrics: Metrics,
414
415    sync_checkpoint_from_archive_task: Option<AbortHandle>,
416    archive_config: Option<ArchiveReaderConfig>,
417}
418
419impl<S> StateSyncEventLoop<S>
420where
421    S: WriteStore + Clone + Send + Sync + 'static,
422{
423    // Note: A great deal of care is taken to ensure that all event handlers are non-asynchronous
424    // and that the only "await" points are from the select macro picking which event to handle.
425    // This ensures that the event loop is able to process events at a high speed and reduce the
426    // chance for building up a backlog of events to process.
427    pub async fn start(mut self) {
428        info!("State-Synchronizer started");
429
430        self.config.pinned_checkpoints.sort();
431
432        let mut interval = tokio::time::interval(self.config.interval_period());
433        let mut peer_events = {
434            let (subscriber, peers) = self.network.subscribe().unwrap();
435            for peer_id in peers {
436                self.spawn_get_latest_from_peer(peer_id);
437            }
438            subscriber
439        };
440        let (
441            target_checkpoint_contents_sequence_sender,
442            target_checkpoint_contents_sequence_receiver,
443        ) = watch::channel(0);
444
445        // Spawn tokio task to update metrics periodically in the background
446        let (_sender, receiver) = oneshot::channel();
447        tokio::spawn(update_checkpoint_watermark_metrics(
448            receiver,
449            self.store.clone(),
450            self.metrics.clone(),
451        ));
452
453        // Start checkpoint contents sync loop.
454        let task = sync_checkpoint_contents(
455            self.network.clone(),
456            self.store.clone(),
457            self.peer_heights.clone(),
458            self.weak_sender.clone(),
459            self.checkpoint_event_sender.clone(),
460            self.config.checkpoint_content_download_concurrency(),
461            self.config.checkpoint_content_download_tx_concurrency(),
462            self.config.use_get_checkpoint_contents_v2(),
463            self.config.checkpoint_content_timeout(),
464            target_checkpoint_contents_sequence_receiver,
465        );
466        let task_handle = self.tasks.spawn(task);
467        self.sync_checkpoint_contents_task = Some(task_handle);
468
469        // Start archive based checkpoint content sync loop.
470        // TODO: Consider switching to sync from archive only on startup.
471        // Right now because the peer set is fixed at startup, a node may eventually
472        // end up with peers who have all purged their local state. In such a scenario it will be
473        // stuck until restart when it ends up with a different set of peers. Once the discovery
474        // mechanism can dynamically identify and connect to other peers on the network, we will rely
475        // on sync from archive as a fall back.
476        let task = sync_checkpoint_contents_from_archive(
477            self.network.clone(),
478            self.archive_config.clone(),
479            self.store.clone(),
480            self.peer_heights.clone(),
481            self.metrics.clone(),
482        );
483        let task_handle = self.tasks.spawn(task);
484        self.sync_checkpoint_from_archive_task = Some(task_handle);
485
486        // Start main loop.
487        loop {
488            tokio::select! {
489                now = interval.tick() => {
490                    self.handle_tick(now.into_std());
491                },
492                maybe_message = self.mailbox.recv() => {
493                    // Once all handles to our mailbox have been dropped this
494                    // will yield `None` and we can terminate the event loop
495                    if let Some(message) = maybe_message {
496                        self.handle_message(message);
497                    } else {
498                        break;
499                    }
500                },
501                peer_event = peer_events.recv() => {
502                    self.handle_peer_event(peer_event);
503                },
504                Some(task_result) = self.tasks.join_next() => {
505                    match task_result {
506                        Ok(()) => {},
507                        Err(e) => {
508                            if e.is_cancelled() {
509                                // avoid crashing on ungraceful shutdown
510                            } else if e.is_panic() {
511                                // propagate panics.
512                                std::panic::resume_unwind(e.into_panic());
513                            } else {
514                                panic!("task failed: {e}");
515                            }
516                        },
517                    };
518
519                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
520                        panic!("sync_checkpoint_contents task unexpectedly terminated")
521                    }
522
523                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
524                        self.sync_checkpoint_summaries_task = None;
525                    }
526
527                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
528                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
529                    }
530                },
531            }
532
533            self.maybe_start_checkpoint_summary_sync_task();
534            self.maybe_trigger_checkpoint_contents_sync_task(
535                &target_checkpoint_contents_sequence_sender,
536            );
537        }
538
539        info!("State-Synchronizer ended");
540    }
541
542    fn handle_message(&mut self, message: StateSyncMessage) {
543        debug!("Received message: {:?}", message);
544        match message {
545            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
546            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
547                self.handle_checkpoint_from_consensus(checkpoint)
548            }
549            // After we've successfully synced a checkpoint we can notify our peers
550            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
551                self.spawn_notify_peers_of_checkpoint(*checkpoint)
552            }
553        }
554    }
555
556    // Handle a checkpoint that we received from consensus
557    #[instrument(level = "debug", skip_all)]
558    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
559        // Always check previous_digest matches in case there is a gap between
560        // state sync and consensus.
561        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
562            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
563            .digest();
564        if checkpoint.previous_digest != Some(prev_digest) {
565            panic!(
566                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
567                checkpoint.sequence_number(),
568                Some(prev_digest),
569                checkpoint.previous_digest
570            );
571        }
572
573        let latest_checkpoint = self
574            .store
575            .get_highest_verified_checkpoint()
576            .expect("store operation should not fail");
577
578        // If this is an older checkpoint, just ignore it
579        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
580            return;
581        }
582
583        let checkpoint = *checkpoint;
584        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
585        if *checkpoint.sequence_number() > next_sequence_number {
586            debug!(
587                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
588                next_sequence_number,
589                checkpoint.sequence_number()
590            );
591        }
592
593        // Because checkpoint from consensus sends in order, when we have checkpoint n,
594        // we must have all of the checkpoints before n from either state sync or consensus.
595        #[cfg(debug_assertions)]
596        {
597            let _ = (next_sequence_number..=*checkpoint.sequence_number())
598                .map(|n| {
599                    let checkpoint = self
600                        .store
601                        .get_checkpoint_by_sequence_number(n)
602                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
603                    self.store
604                        .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
605                        .unwrap_or_else(|| {
606                            panic!(
607                                "store should contain checkpoint contents for {:?}",
608                                checkpoint.content_digest
609                            )
610                        });
611                })
612                .collect::<Vec<_>>();
613        }
614
615        // If this is the last checkpoint of a epoch, we need to make sure
616        // new committee is in store before we verify newer checkpoints in next epoch.
617        // This could happen before this validator's reconfiguration finishes, because
618        // state sync does not reconfig.
619        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
620        // committee insertion in two places (only in `insert_checkpoint`).
621        if let Some(EndOfEpochData {
622            next_epoch_committee,
623            ..
624        }) = checkpoint.end_of_epoch_data.as_ref()
625        {
626            let next_committee = next_epoch_committee.iter().cloned().collect();
627            let committee =
628                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
629            self.store
630                .insert_committee(committee)
631                .expect("store operation should not fail");
632        }
633
634        self.store
635            .update_highest_verified_checkpoint(&checkpoint)
636            .expect("store operation should not fail");
637        self.store
638            .update_highest_synced_checkpoint(&checkpoint)
639            .expect("store operation should not fail");
640
641        // We don't care if no one is listening as this is a broadcast channel
642        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
643
644        self.spawn_notify_peers_of_checkpoint(checkpoint);
645    }
646
647    fn handle_peer_event(
648        &mut self,
649        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
650    ) {
651        use tokio::sync::broadcast::error::RecvError;
652
653        match peer_event {
654            Ok(PeerEvent::NewPeer(peer_id)) => {
655                self.spawn_get_latest_from_peer(peer_id);
656            }
657            Ok(PeerEvent::LostPeer(peer_id, _)) => {
658                self.peer_heights.write().unwrap().peers.remove(&peer_id);
659            }
660
661            Err(RecvError::Closed) => {
662                panic!("PeerEvent channel shouldn't be able to be closed");
663            }
664
665            Err(RecvError::Lagged(_)) => {
666                trace!("State-Sync fell behind processing PeerEvents");
667            }
668        }
669    }
670
671    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
672        if let Some(peer) = self.network.peer(peer_id) {
673            let genesis_checkpoint_digest = *self
674                .store
675                .get_checkpoint_by_sequence_number(0)
676                .expect("store should contain genesis checkpoint")
677                .digest();
678            let task = get_latest_from_peer(
679                genesis_checkpoint_digest,
680                peer,
681                self.peer_heights.clone(),
682                self.config.timeout(),
683            );
684            self.tasks.spawn(task);
685        }
686    }
687
688    fn handle_tick(&mut self, _now: std::time::Instant) {
689        let task = query_peers_for_their_latest_checkpoint(
690            self.network.clone(),
691            self.peer_heights.clone(),
692            self.weak_sender.clone(),
693            self.config.timeout(),
694        );
695        self.tasks.spawn(task);
696
697        if let Some(layer) = self.download_limit_layer.as_ref() {
698            layer.maybe_prune_map();
699        }
700    }
701
702    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
703        // Only run one sync task at a time
704        if self.sync_checkpoint_summaries_task.is_some() {
705            return;
706        }
707
708        let highest_processed_checkpoint = self
709            .store
710            .get_highest_verified_checkpoint()
711            .expect("store operation should not fail");
712
713        let highest_known_sequence_number = self
714            .peer_heights
715            .read()
716            .unwrap()
717            .highest_known_checkpoint_sequence_number();
718
719        if let Some(target_seq) = highest_known_sequence_number
720            && *highest_processed_checkpoint.sequence_number() < target_seq
721        {
722            // Limit the per-sync batch size according to config.
723            let max_batch_size = self.config.max_checkpoint_sync_batch_size();
724            let limited_target = std::cmp::min(
725                target_seq,
726                highest_processed_checkpoint
727                    .sequence_number()
728                    .saturating_add(max_batch_size),
729            );
730            let was_limited = limited_target < target_seq;
731
732            // Start sync job.
733            let weak_sender = self.weak_sender.clone();
734            let task = sync_to_checkpoint(
735                self.network.clone(),
736                self.store.clone(),
737                self.peer_heights.clone(),
738                self.metrics.clone(),
739                self.config.pinned_checkpoints.clone(),
740                self.config.checkpoint_header_download_concurrency(),
741                self.config.timeout(),
742                limited_target,
743            )
744            .map(move |result| match result {
745                Ok(()) => {
746                    // If we limited the sync range, immediately trigger
747                    // another sync to continue catching up.
748                    if was_limited && let Some(sender) = weak_sender.upgrade() {
749                        let _ = sender.try_send(StateSyncMessage::StartSyncJob);
750                    }
751                }
752                Err(e) => {
753                    debug!("error syncing checkpoint {e}");
754                }
755            });
756            let task_handle = self.tasks.spawn(task);
757            self.sync_checkpoint_summaries_task = Some(task_handle);
758        }
759    }
760
761    fn maybe_trigger_checkpoint_contents_sync_task(
762        &mut self,
763        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
764    ) {
765        let highest_verified_checkpoint = self
766            .store
767            .get_highest_verified_checkpoint()
768            .expect("store operation should not fail");
769        let highest_synced_checkpoint = self
770            .store
771            .get_highest_synced_checkpoint()
772            .expect("store operation should not fail");
773
774        if highest_verified_checkpoint.sequence_number()
775            > highest_synced_checkpoint.sequence_number()
776            // skip if we aren't connected to any peers that can help
777            && self
778                .peer_heights
779                .read()
780                .unwrap()
781                .highest_known_checkpoint_sequence_number()
782                > Some(*highest_synced_checkpoint.sequence_number())
783        {
784            let _ = target_sequence_channel.send_if_modified(|num| {
785                let new_num = *highest_verified_checkpoint.sequence_number();
786                if *num == new_num {
787                    return false;
788                }
789                *num = new_num;
790                true
791            });
792        }
793    }
794
795    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
796        let task = notify_peers_of_checkpoint(
797            self.network.clone(),
798            self.peer_heights.clone(),
799            checkpoint,
800            self.config.timeout(),
801        );
802        self.tasks.spawn(task);
803    }
804}
805
806async fn notify_peers_of_checkpoint(
807    network: anemo::Network,
808    peer_heights: Arc<RwLock<PeerHeights>>,
809    checkpoint: VerifiedCheckpoint,
810    timeout: Duration,
811) {
812    let futs = peer_heights
813        .read()
814        .unwrap()
815        .peers_on_same_chain()
816        // Filter out any peers who we know already have a checkpoint higher than this one
817        .filter_map(|(peer_id, info)| {
818            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
819        })
820        // Filter out any peers who we aren't connected with
821        .flat_map(|peer_id| network.peer(*peer_id))
822        .map(StateSyncClient::new)
823        .map(|mut client| {
824            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
825            async move { client.push_checkpoint_summary(request).await }
826        })
827        .collect::<Vec<_>>();
828    futures::future::join_all(futs).await;
829}
830
831async fn get_latest_from_peer(
832    our_genesis_checkpoint_digest: CheckpointDigest,
833    peer: anemo::Peer,
834    peer_heights: Arc<RwLock<PeerHeights>>,
835    timeout: Duration,
836) {
837    let peer_id = peer.peer_id();
838    let mut client = StateSyncClient::new(peer);
839
840    let info = {
841        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
842
843        if let Some(info) = maybe_info {
844            info
845        } else {
846            // TODO do we want to create a new API just for querying a node's chainid?
847            //
848            // We need to query this node's genesis checkpoint to see if they're on the same chain
849            // as us
850            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
851                .with_timeout(timeout);
852            let response = client
853                .get_checkpoint_summary(request)
854                .await
855                .map(Response::into_inner);
856
857            let info = match response {
858                Ok(Some(checkpoint)) => {
859                    let digest = *checkpoint.digest();
860                    PeerStateSyncInfo {
861                        genesis_checkpoint_digest: digest,
862                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
863                        height: *checkpoint.sequence_number(),
864                        lowest: CheckpointSequenceNumber::default(),
865                    }
866                }
867                Ok(None) => PeerStateSyncInfo {
868                    genesis_checkpoint_digest: CheckpointDigest::default(),
869                    on_same_chain_as_us: false,
870                    height: CheckpointSequenceNumber::default(),
871                    lowest: CheckpointSequenceNumber::default(),
872                },
873                Err(status) => {
874                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
875                    return;
876                }
877            };
878            peer_heights
879                .write()
880                .unwrap()
881                .insert_peer_info(peer_id, info);
882            info
883        }
884    };
885
886    // Bail early if this node isn't on the same chain as us
887    if !info.on_same_chain_as_us {
888        trace!(?info, "Peer {peer_id} not on same chain as us");
889        return;
890    }
891    let Some((highest_checkpoint, low_watermark)) =
892        query_peer_for_latest_info(&mut client, timeout).await
893    else {
894        return;
895    };
896    peer_heights
897        .write()
898        .unwrap()
899        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
900}
901
902/// Queries a peer for their highest_synced_checkpoint and low checkpoint watermark
903async fn query_peer_for_latest_info(
904    client: &mut StateSyncClient<anemo::Peer>,
905    timeout: Duration,
906) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
907    let request = Request::new(()).with_timeout(timeout);
908    let response = client
909        .get_checkpoint_availability(request)
910        .await
911        .map(Response::into_inner);
912    match response {
913        Ok(GetCheckpointAvailabilityResponse {
914            highest_synced_checkpoint,
915            lowest_available_checkpoint,
916        }) => {
917            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
918        }
919        Err(status) => {
920            // If peer hasn't upgraded they would return 404 NotFound error
921            if status.status() != anemo::types::response::StatusCode::NotFound {
922                trace!("get_checkpoint_availability request failed: {status:?}");
923                return None;
924            }
925        }
926    };
927
928    // Then we try the old query
929    // TODO: remove this once the new feature stabilizes
930    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
931    let response = client
932        .get_checkpoint_summary(request)
933        .await
934        .map(Response::into_inner);
935    match response {
936        Ok(Some(checkpoint)) => Some((checkpoint, None)),
937        Ok(None) => None,
938        Err(status) => {
939            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
940            None
941        }
942    }
943}
944
945#[instrument(level = "debug", skip_all)]
946async fn query_peers_for_their_latest_checkpoint(
947    network: anemo::Network,
948    peer_heights: Arc<RwLock<PeerHeights>>,
949    sender: mpsc::WeakSender<StateSyncMessage>,
950    timeout: Duration,
951) {
952    let peer_heights = &peer_heights;
953    let futs = peer_heights
954        .read()
955        .unwrap()
956        .peers_on_same_chain()
957        // Filter out any peers who we aren't connected with
958        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
959        .map(|peer| {
960            let peer_id = peer.peer_id();
961            let mut client = StateSyncClient::new(peer);
962
963            async move {
964                let response = query_peer_for_latest_info(&mut client, timeout).await;
965                match response {
966                    Some((highest_checkpoint, low_watermark)) => peer_heights
967                        .write()
968                        .unwrap()
969                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
970                        .then_some(highest_checkpoint),
971                    None => None,
972                }
973            }
974        })
975        .collect::<Vec<_>>();
976
977    debug!("Query {} peers for latest checkpoint", futs.len());
978
979    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
980
981    let highest_checkpoint_seq = checkpoints
982        .map(|checkpoint| *checkpoint.sequence_number())
983        .max();
984
985    let our_highest_seq = peer_heights
986        .read()
987        .unwrap()
988        .highest_known_checkpoint_sequence_number();
989
990    debug!(
991        "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
992    );
993
994    let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
995        (Some(theirs), None) => theirs,
996        (Some(theirs), Some(ours)) if theirs > ours => theirs,
997        _ => return,
998    };
999
1000    if let Some(sender) = sender.upgrade() {
1001        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1002    }
1003}
1004
1005async fn sync_to_checkpoint<S>(
1006    network: anemo::Network,
1007    store: S,
1008    peer_heights: Arc<RwLock<PeerHeights>>,
1009    metrics: Metrics,
1010    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1011    checkpoint_header_download_concurrency: usize,
1012    timeout: Duration,
1013    target_sequence_number: CheckpointSequenceNumber,
1014) -> Result<()>
1015where
1016    S: WriteStore,
1017{
1018    metrics.set_highest_known_checkpoint(target_sequence_number);
1019
1020    let mut current = store
1021        .get_highest_verified_checkpoint()
1022        .expect("store operation should not fail");
1023    if *current.sequence_number() >= target_sequence_number {
1024        return Err(anyhow::anyhow!(
1025            "target checkpoint {} is older than highest verified checkpoint {}",
1026            target_sequence_number,
1027            current.sequence_number(),
1028        ));
1029    }
1030
1031    let peer_balancer = PeerBalancer::new(
1032        &network,
1033        peer_heights.clone(),
1034        PeerCheckpointRequestType::Summary,
1035    );
1036    // range of the next sequence_numbers to fetch
1037    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1038        ..=target_sequence_number)
1039        .map(|next| {
1040            let peers = peer_balancer.clone().with_checkpoint(next);
1041            let peer_heights = peer_heights.clone();
1042            let pinned_checkpoints = &pinned_checkpoints;
1043            async move {
1044                if let Some(checkpoint) = peer_heights
1045                    .read()
1046                    .unwrap()
1047                    .get_checkpoint_by_sequence_number(next)
1048                {
1049                    return (Some(checkpoint.to_owned()), next, None);
1050                }
1051
1052                // Iterate through peers trying each one in turn until we're able to
1053                // successfully get the target checkpoint.
1054                for mut peer in peers {
1055                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1056                        .with_timeout(timeout);
1057                    if let Some(checkpoint) = peer
1058                        .get_checkpoint_summary(request)
1059                        .await
1060                        .tap_err(|e| trace!("{e:?}"))
1061                        .ok()
1062                        .and_then(Response::into_inner)
1063                        .tap_none(|| trace!("peer unable to help sync"))
1064                    {
1065                        // peer didn't give us a checkpoint with the height that we requested
1066                        if *checkpoint.sequence_number() != next {
1067                            tracing::debug!(
1068                                "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1069                                checkpoint.sequence_number()
1070                            );
1071                            peer_heights.write().unwrap().mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1072                            continue;
1073                        }
1074
1075                        // peer gave us a checkpoint whose digest does not match pinned digest
1076                        let checkpoint_digest = checkpoint.digest();
1077                        if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1078                            checkpoint.sequence_number(),
1079                            |(seq_num, _digest)| *seq_num
1080                        )
1081                            && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1082                                tracing::debug!(
1083                                    "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1084                                    pinned_checkpoints[pinned_digest_index].1,
1085                                    checkpoint_digest
1086                                );
1087                                continue;
1088                            }
1089
1090                        // Insert in our store in the event that things fail and we need to retry
1091                        peer_heights
1092                            .write()
1093                            .unwrap()
1094                            .insert_checkpoint(checkpoint.clone());
1095                        return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1096                    }
1097                }
1098                (None, next, None)
1099            }
1100        })
1101        .pipe(futures::stream::iter)
1102        .buffered(checkpoint_header_download_concurrency);
1103
1104    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1105        assert_eq!(
1106            current
1107                .sequence_number()
1108                .checked_add(1)
1109                .expect("exhausted u64"),
1110            next
1111        );
1112
1113        // Verify the checkpoint
1114        let checkpoint = 'cp: {
1115            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1116                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1117            })?;
1118            // Skip verification for manually pinned checkpoints.
1119            if pinned_checkpoints
1120                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1121                .is_ok()
1122            {
1123                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1124            }
1125            match verify_checkpoint(&current, &store, checkpoint) {
1126                Ok(verified_checkpoint) => verified_checkpoint,
1127                Err(checkpoint) => {
1128                    let mut peer_heights = peer_heights.write().unwrap();
1129                    // Remove the checkpoint from our temporary store so that we can try querying
1130                    // another peer for a different one
1131                    peer_heights.remove_checkpoint(checkpoint.digest());
1132
1133                    // Mark peer as not on the same chain as us
1134                    if let Some(peer_id) = maybe_peer_id {
1135                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1136                    }
1137
1138                    return Err(anyhow::anyhow!(
1139                        "unable to verify checkpoint {checkpoint:?}"
1140                    ));
1141                }
1142            }
1143        };
1144
1145        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1146        if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1147            metrics.checkpoint_summary_age_metrics()
1148        {
1149            checkpoint.report_checkpoint_age(
1150                checkpoint_summary_age_metric,
1151                checkpoint_summary_age_metric_deprecated,
1152            );
1153        }
1154
1155        // Insert the newly verified checkpoint into our store, which will bump our highest
1156        // verified checkpoint watermark as well.
1157        store
1158            .insert_checkpoint(&checkpoint)
1159            .expect("store operation should not fail");
1160
1161        current = checkpoint;
1162    }
1163
1164    peer_heights
1165        .write()
1166        .unwrap()
1167        .cleanup_old_checkpoints(*current.sequence_number());
1168
1169    Ok(())
1170}
1171
1172async fn sync_checkpoint_contents_from_archive<S>(
1173    network: anemo::Network,
1174    archive_config: Option<ArchiveReaderConfig>,
1175    store: S,
1176    peer_heights: Arc<RwLock<PeerHeights>>,
1177    metrics: Metrics,
1178) where
1179    S: WriteStore + Clone + Send + Sync + 'static,
1180{
1181    loop {
1182        sync_checkpoint_contents_from_archive_iteration(
1183            &network,
1184            &archive_config,
1185            store.clone(),
1186            peer_heights.clone(),
1187            metrics.clone(),
1188        )
1189        .await;
1190        tokio::time::sleep(Duration::from_secs(5)).await;
1191    }
1192}
1193
1194async fn sync_checkpoint_contents_from_archive_iteration<S>(
1195    network: &anemo::Network,
1196    archive_config: &Option<ArchiveReaderConfig>,
1197    store: S,
1198    peer_heights: Arc<RwLock<PeerHeights>>,
1199    metrics: Metrics,
1200) where
1201    S: WriteStore + Clone + Send + Sync + 'static,
1202{
1203    let peers: Vec<_> = peer_heights
1204        .read()
1205        .unwrap()
1206        .peers_on_same_chain()
1207        // Filter out any peers who we aren't connected with.
1208        .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1209        .collect();
1210    let lowest_checkpoint_on_peers = peers
1211        .iter()
1212        .map(|(_p, state_sync_info)| state_sync_info.lowest)
1213        .min();
1214    let highest_synced = store
1215        .get_highest_synced_checkpoint()
1216        .expect("store operation should not fail")
1217        .sequence_number;
1218    let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1219        highest_synced < lowest_checkpoint_on_peers
1220    } else {
1221        false
1222    };
1223    debug!(
1224        "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1225        lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1226    );
1227    if sync_from_archive {
1228        let start = highest_synced
1229            .checked_add(1)
1230            .expect("Checkpoint seq num overflow");
1231        let end = lowest_checkpoint_on_peers.unwrap();
1232
1233        let Some(archive_config) = archive_config else {
1234            warn!("Failed to find an archive reader to complete the state sync request");
1235            return;
1236        };
1237        let Some(ingestion_url) = &archive_config.ingestion_url else {
1238            warn!("Archival ingestion url for state sync is not configured");
1239            return;
1240        };
1241        if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1242            warn!("{} can't be used as an archival fallback", ingestion_url);
1243            return;
1244        }
1245        let reader_options = ReaderOptions {
1246            batch_size: archive_config.download_concurrency.into(),
1247            upper_limit: Some(end),
1248            ..Default::default()
1249        };
1250        let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1251            StateSyncWorker(store, metrics),
1252            ingestion_url.clone(),
1253            archive_config.remote_store_options.clone(),
1254            start,
1255            1,
1256            Some(reader_options),
1257        )
1258        .await
1259        else {
1260            return;
1261        };
1262        match executor.await {
1263            Ok(_) => info!(
1264                "State sync from archive is complete. Checkpoints downloaded = {:?}",
1265                end - start
1266            ),
1267            Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1268        }
1269    }
1270}
1271
1272async fn sync_checkpoint_contents<S>(
1273    network: anemo::Network,
1274    store: S,
1275    peer_heights: Arc<RwLock<PeerHeights>>,
1276    sender: mpsc::WeakSender<StateSyncMessage>,
1277    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1278    checkpoint_content_download_concurrency: usize,
1279    checkpoint_content_download_tx_concurrency: u64,
1280    use_get_checkpoint_contents_v2: bool,
1281    timeout: Duration,
1282    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1283) where
1284    S: WriteStore + Clone,
1285{
1286    let mut highest_synced = store
1287        .get_highest_synced_checkpoint()
1288        .expect("store operation should not fail");
1289
1290    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1291    let mut target_sequence_cursor = 0;
1292    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1293    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1294
1295    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1296
1297    loop {
1298        tokio::select! {
1299            result = target_sequence_channel.changed() => {
1300                match result {
1301                    Ok(()) => {
1302                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1303                    }
1304                    Err(_) => {
1305                        // Watch channel is closed, exit loop.
1306                        return
1307                    }
1308                }
1309            },
1310            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1311                match maybe_checkpoint {
1312                    Ok(checkpoint) => {
1313                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1314
1315                        store
1316                            .update_highest_synced_checkpoint(&checkpoint)
1317                            .expect("store operation should not fail");
1318                        // We don't care if no one is listening as this is a broadcast channel
1319                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1320                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1321                        highest_synced = checkpoint;
1322
1323                    }
1324                    Err(checkpoint) => {
1325                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1326                        if let Some(lowest_peer_checkpoint) =
1327                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1328                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1329                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1330                            }
1331                        } else {
1332                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1333
1334                        }
1335                        // Retry contents sync on failure.
1336                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1337                            network.clone(),
1338                            &store,
1339                            peer_heights.clone(),
1340                            use_get_checkpoint_contents_v2,
1341                            timeout,
1342                            checkpoint,
1343                        ));
1344                    }
1345                }
1346            },
1347        }
1348
1349        // Start new tasks up to configured concurrency limits.
1350        while current_sequence < target_sequence_cursor
1351            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1352        {
1353            let next_checkpoint = store
1354                .get_checkpoint_by_sequence_number(current_sequence)
1355                .unwrap_or_else(|| panic!(
1356                    "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1357                    current_sequence
1358                ));
1359
1360            // Enforce transaction count concurrency limit.
1361            let tx_count = next_checkpoint.network_total_transactions
1362                - highest_started_network_total_transactions;
1363            if tx_count > tx_concurrency_remaining {
1364                break;
1365            }
1366            tx_concurrency_remaining -= tx_count;
1367
1368            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1369            current_sequence += 1;
1370            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1371                network.clone(),
1372                &store,
1373                peer_heights.clone(),
1374                use_get_checkpoint_contents_v2,
1375                timeout,
1376                next_checkpoint,
1377            ));
1378        }
1379
1380        if highest_synced
1381            .sequence_number()
1382            .is_multiple_of(checkpoint_content_download_concurrency as u64)
1383            || checkpoint_contents_tasks.is_empty()
1384        {
1385            // Periodically notify event loop to notify our peers that we've synced to a new checkpoint height
1386            if let Some(sender) = sender.upgrade() {
1387                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1388                let _ = sender.send(message).await;
1389            }
1390        }
1391    }
1392}
1393
1394#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1395async fn sync_one_checkpoint_contents<S>(
1396    network: anemo::Network,
1397    store: S,
1398    peer_heights: Arc<RwLock<PeerHeights>>,
1399    use_get_checkpoint_contents_v2: bool,
1400    timeout: Duration,
1401    checkpoint: VerifiedCheckpoint,
1402) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1403where
1404    S: WriteStore + Clone,
1405{
1406    debug!("syncing checkpoint contents");
1407
1408    // Check if we already have produced this checkpoint locally. If so, we don't need
1409    // to get it from peers anymore.
1410    if store
1411        .get_highest_synced_checkpoint()
1412        .expect("store operation should not fail")
1413        .sequence_number()
1414        >= checkpoint.sequence_number()
1415    {
1416        debug!("checkpoint was already created via consensus output");
1417        return Ok(checkpoint);
1418    }
1419
1420    // Request checkpoint contents from peers.
1421    let peers = PeerBalancer::new(
1422        &network,
1423        peer_heights.clone(),
1424        PeerCheckpointRequestType::Content,
1425    )
1426    .with_checkpoint(*checkpoint.sequence_number());
1427    let now = tokio::time::Instant::now();
1428    let Some(_contents) = get_full_checkpoint_contents(
1429        peers,
1430        &store,
1431        &checkpoint,
1432        use_get_checkpoint_contents_v2,
1433        timeout,
1434    )
1435    .await
1436    else {
1437        // Delay completion in case of error so we don't hammer the network with retries.
1438        let duration = peer_heights
1439            .read()
1440            .unwrap()
1441            .wait_interval_when_no_peer_to_sync_content();
1442        if now.elapsed() < duration {
1443            let duration = duration - now.elapsed();
1444            info!("retrying checkpoint sync after {:?}", duration);
1445            tokio::time::sleep(duration).await;
1446        }
1447        return Err(checkpoint);
1448    };
1449    debug!("completed checkpoint contents sync");
1450    Ok(checkpoint)
1451}
1452
1453#[instrument(level = "debug", skip_all)]
1454async fn get_full_checkpoint_contents<S>(
1455    peers: PeerBalancer,
1456    store: S,
1457    checkpoint: &VerifiedCheckpoint,
1458    use_get_checkpoint_contents_v2: bool,
1459    timeout: Duration,
1460) -> Option<VersionedFullCheckpointContents>
1461where
1462    S: WriteStore,
1463{
1464    let sequence_number = checkpoint.sequence_number;
1465    let digest = checkpoint.content_digest;
1466    if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1467        debug!("store already contains checkpoint contents");
1468        return Some(contents);
1469    }
1470
1471    // Iterate through our selected peers trying each one in turn until we're able to
1472    // successfully get the target checkpoint
1473    for mut peer in peers {
1474        debug!(
1475            ?timeout,
1476            "requesting checkpoint contents from {}",
1477            peer.inner().peer_id(),
1478        );
1479        let request = Request::new(digest).with_timeout(timeout);
1480        let contents = if use_get_checkpoint_contents_v2 {
1481            peer.get_checkpoint_contents_v2(request)
1482                .await
1483                .tap_err(|e| trace!("{e:?}"))
1484                .ok()
1485                .and_then(Response::into_inner)
1486        } else {
1487            peer.get_checkpoint_contents(request)
1488                .await
1489                .tap_err(|e| trace!("{e:?}"))
1490                .ok()
1491                .and_then(Response::into_inner)
1492                .map(VersionedFullCheckpointContents::V1)
1493        };
1494        if let Some(contents) = contents.tap_none(|| trace!("peer unable to help sync"))
1495            && contents.verify_digests(digest).is_ok()
1496        {
1497            let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1498            store
1499                .insert_checkpoint_contents(checkpoint, verified_contents)
1500                .expect("store operation should not fail");
1501            return Some(contents);
1502        }
1503    }
1504    debug!("no peers had checkpoint contents");
1505    None
1506}
1507
1508async fn update_checkpoint_watermark_metrics<S>(
1509    mut recv: oneshot::Receiver<()>,
1510    store: S,
1511    metrics: Metrics,
1512) -> Result<()>
1513where
1514    S: WriteStore + Clone + Send + Sync,
1515{
1516    let mut interval = tokio::time::interval(Duration::from_secs(5));
1517    loop {
1518        tokio::select! {
1519             _now = interval.tick() => {
1520                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1521                    .expect("store operation should not fail");
1522                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1523                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1524                    .expect("store operation should not fail");
1525                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1526             },
1527            _ = &mut recv => break,
1528        }
1529    }
1530    Ok(())
1531}