1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119 self.sender
120 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121 .await
122 .unwrap()
123 }
124
125 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127 self.checkpoint_event_sender.subscribe()
128 }
129}
130
131pub(super) fn compute_adaptive_timeout(
132 tx_count: u64,
133 min_timeout: Duration,
134 max_timeout: Duration,
135) -> Duration {
136 const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
137 const JITTER_FRACTION: f64 = 0.1;
138
139 let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
140 let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
141 let base = min_timeout + extra;
142
143 let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
144 let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
145 Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
146}
147
148#[cfg_attr(test, derive(Debug))]
149pub(super) struct PeerScore {
150 successes: VecDeque<(Instant, u64 , Duration)>,
151 failures: VecDeque<Instant>,
152 window: Duration,
153 failure_rate: f64,
154}
155
156impl PeerScore {
157 const MAX_SAMPLES: usize = 20;
158 const MIN_SAMPLES_FOR_FAILURE: usize = 10;
159
160 pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
161 Self {
162 successes: VecDeque::new(),
163 failures: VecDeque::new(),
164 window,
165 failure_rate,
166 }
167 }
168
169 pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
170 let now = Instant::now();
171 self.successes.push_back((now, size, response_time));
172 while self.successes.len() > Self::MAX_SAMPLES {
173 self.successes.pop_front();
174 }
175 }
176
177 pub(super) fn record_failure(&mut self) {
178 let now = Instant::now();
179 self.failures.push_back(now);
180 while self.failures.len() > Self::MAX_SAMPLES {
181 self.failures.pop_front();
182 }
183 }
184
185 pub(super) fn is_failing(&self) -> bool {
186 let now = Instant::now();
187 let recent_failures = self
188 .failures
189 .iter()
190 .filter(|ts| now.duration_since(**ts) < self.window)
191 .count();
192 let recent_successes = self
193 .successes
194 .iter()
195 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
196 .count();
197
198 let total = recent_failures + recent_successes;
199 if total < Self::MIN_SAMPLES_FOR_FAILURE {
200 return false;
201 }
202
203 let rate = recent_failures as f64 / total as f64;
204 rate >= self.failure_rate
205 }
206
207 pub(super) fn effective_throughput(&self) -> Option<f64> {
208 let now = Instant::now();
209 let (total_size, total_time) = self
210 .successes
211 .iter()
212 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
213 .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
214 (size + s, time + *d)
215 });
216
217 if total_size == 0 {
218 return None;
219 }
220
221 if total_time.is_zero() {
222 return None;
223 }
224
225 Some(total_size as f64 / total_time.as_secs_f64())
226 }
227}
228
229struct PeerHeights {
230 peers: HashMap<PeerId, PeerStateSyncInfo>,
232 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
233 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
234 scores: HashMap<PeerId, PeerScore>,
235
236 wait_interval_when_no_peer_to_sync_content: Duration,
237 peer_scoring_window: Duration,
238 peer_failure_rate: f64,
239 checkpoint_content_timeout_min: Duration,
240 checkpoint_content_timeout_max: Duration,
241 exploration_probability: f64,
242}
243
244#[derive(Copy, Clone, Debug, PartialEq, Eq)]
245struct PeerStateSyncInfo {
246 genesis_checkpoint_digest: CheckpointDigest,
248 on_same_chain_as_us: bool,
250 height: CheckpointSequenceNumber,
252 lowest: CheckpointSequenceNumber,
255}
256
257impl PeerHeights {
258 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
259 self.peers
260 .values()
261 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
262 .max()
263 }
264
265 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
266 self.peers
267 .iter()
268 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
269 }
270
271 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
276 pub fn update_peer_info(
277 &mut self,
278 peer_id: PeerId,
279 checkpoint: Checkpoint,
280 low_watermark: Option<CheckpointSequenceNumber>,
281 ) -> bool {
282 debug!("Update peer info");
283
284 let info = match self.peers.get_mut(&peer_id) {
285 Some(info) if info.on_same_chain_as_us => info,
286 _ => return false,
287 };
288
289 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
290 if let Some(low_watermark) = low_watermark {
291 info.lowest = low_watermark;
292 }
293 self.insert_checkpoint(checkpoint);
294
295 true
296 }
297
298 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
299 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
300 use std::collections::hash_map::Entry;
301 debug!("Insert peer info");
302
303 match self.peers.entry(peer_id) {
304 Entry::Occupied(mut entry) => {
305 let entry = entry.get_mut();
308 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
309 entry.height = std::cmp::max(entry.height, info.height);
310 } else {
311 *entry = info;
312 }
313 }
314 Entry::Vacant(entry) => {
315 entry.insert(info);
316 }
317 }
318 }
319
320 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
321 if let Some(info) = self.peers.get_mut(&peer_id) {
322 info.on_same_chain_as_us = false;
323 }
324 }
325
326 pub fn update_peer_height(
329 &mut self,
330 peer_id: PeerId,
331 height: CheckpointSequenceNumber,
332 low_watermark: Option<CheckpointSequenceNumber>,
333 ) -> bool {
334 let info = match self.peers.get_mut(&peer_id) {
335 Some(info) if info.on_same_chain_as_us => info,
336 _ => return false,
337 };
338
339 info.height = std::cmp::max(height, info.height);
340 if let Some(low_watermark) = low_watermark {
341 info.lowest = low_watermark;
342 }
343
344 true
345 }
346
347 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
348 self.unprocessed_checkpoints
349 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
350 self.sequence_number_to_digest
351 .retain(|&s, _digest| s > sequence_number);
352 }
353
354 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
359 let digest = *checkpoint.digest();
360 let sequence_number = *checkpoint.sequence_number();
361
362 if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
364 if *existing_digest == digest {
365 return;
367 }
368 tracing::info!(
369 ?sequence_number,
370 ?existing_digest,
371 ?digest,
372 "received checkpoint with same sequence number but different digest, dropping new checkpoint"
373 );
374 return;
375 }
376
377 self.unprocessed_checkpoints.insert(digest, checkpoint);
378 self.sequence_number_to_digest
379 .insert(sequence_number, digest);
380 }
381
382 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
383 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
384 self.sequence_number_to_digest
385 .remove(checkpoint.sequence_number());
386 }
387 }
388
389 pub fn get_checkpoint_by_sequence_number(
390 &self,
391 sequence_number: CheckpointSequenceNumber,
392 ) -> Option<&Checkpoint> {
393 self.sequence_number_to_digest
394 .get(&sequence_number)
395 .and_then(|digest| self.get_checkpoint_by_digest(digest))
396 }
397
398 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
399 self.unprocessed_checkpoints.get(digest)
400 }
401
402 #[cfg(test)]
403 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
404 self.wait_interval_when_no_peer_to_sync_content = duration;
405 }
406
407 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
408 self.wait_interval_when_no_peer_to_sync_content
409 }
410
411 pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
412 self.scores
413 .entry(peer_id)
414 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
415 .record_success(size, response_time);
416 }
417
418 pub fn record_failure(&mut self, peer_id: PeerId) {
419 self.scores
420 .entry(peer_id)
421 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
422 .record_failure();
423 }
424
425 pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
426 self.scores
427 .get(peer_id)
428 .and_then(|s| s.effective_throughput())
429 }
430
431 pub fn is_failing(&self, peer_id: &PeerId) -> bool {
432 self.scores
433 .get(peer_id)
434 .map(|s| s.is_failing())
435 .unwrap_or(false)
436 }
437}
438
439#[derive(Clone)]
444struct PeerBalancer {
445 known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
446 unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
447 failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
448 requested_checkpoint: Option<CheckpointSequenceNumber>,
449 request_type: PeerCheckpointRequestType,
450 exploration_probability: f64,
451}
452
453#[derive(Clone)]
454enum PeerCheckpointRequestType {
455 Summary,
456 Content,
457}
458
459impl PeerBalancer {
460 pub fn new(
461 network: &anemo::Network,
462 peer_heights: Arc<RwLock<PeerHeights>>,
463 request_type: PeerCheckpointRequestType,
464 ) -> Self {
465 let peer_heights_guard = peer_heights.read().unwrap();
466
467 let mut known_peers = Vec::new();
468 let mut unknown_peers = Vec::new();
469 let mut failing_peers = Vec::new();
470 let exploration_probability = peer_heights_guard.exploration_probability;
471
472 for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
473 let Some(peer) = network.peer(*peer_id) else {
474 continue;
475 };
476 let rtt_secs = peer.connection_rtt().as_secs_f64();
477
478 if peer_heights_guard.is_failing(peer_id) {
479 failing_peers.push((peer, *info));
480 } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
481 known_peers.push((peer, *info, throughput));
482 } else {
483 unknown_peers.push((peer, *info, rtt_secs));
484 }
485 }
486 drop(peer_heights_guard);
487
488 unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
489 rtt_a
490 .partial_cmp(rtt_b)
491 .unwrap_or(std::cmp::Ordering::Equal)
492 });
493
494 Self {
495 known_peers,
496 unknown_peers,
497 failing_peers,
498 requested_checkpoint: None,
499 request_type,
500 exploration_probability,
501 }
502 }
503
504 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
505 self.requested_checkpoint = Some(checkpoint);
506 self
507 }
508
509 fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
510 let requested = self.requested_checkpoint.unwrap_or(0);
511 match &self.request_type {
512 PeerCheckpointRequestType::Summary => info.height >= requested,
513 PeerCheckpointRequestType::Content => {
514 info.height >= requested && info.lowest <= requested
515 }
516 }
517 }
518
519 fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
520 let eligible: Vec<_> = self
521 .known_peers
522 .iter()
523 .enumerate()
524 .filter(|(_, (_, info, _))| self.is_eligible(info))
525 .map(|(i, (_, _, t))| (i, *t))
526 .collect();
527
528 if eligible.is_empty() {
529 return None;
530 }
531
532 let total: f64 = eligible.iter().map(|(_, t)| t).sum();
533 let mut pick = rand::thread_rng().gen_range(0.0..total);
534
535 for (idx, throughput) in &eligible {
536 pick -= throughput;
537 if pick <= 0.0 {
538 let (peer, info, _) = self.known_peers.remove(*idx);
539 return Some((peer, info));
540 }
541 }
542
543 let (idx, _) = eligible.last().unwrap();
544 let (peer, info, _) = self.known_peers.remove(*idx);
545 Some((peer, info))
546 }
547
548 fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
549 let pos = self
550 .unknown_peers
551 .iter()
552 .position(|(_, info, _)| self.is_eligible(info))?;
553 let (peer, info, _) = self.unknown_peers.remove(pos);
554 Some((peer, info))
555 }
556
557 fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
558 let pos = self
559 .failing_peers
560 .iter()
561 .position(|(_, info)| self.is_eligible(info))?;
562 Some(self.failing_peers.remove(pos))
563 }
564}
565
566impl Iterator for PeerBalancer {
567 type Item = StateSyncClient<anemo::Peer>;
568
569 fn next(&mut self) -> Option<Self::Item> {
570 let has_eligible_known = self
571 .known_peers
572 .iter()
573 .any(|(_, info, _)| self.is_eligible(info));
574 let has_eligible_unknown = self
575 .unknown_peers
576 .iter()
577 .any(|(_, info, _)| self.is_eligible(info));
578
579 let explore = has_eligible_unknown
580 && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
581
582 if explore && let Some((peer, _)) = self.select_by_rtt() {
583 return Some(StateSyncClient::new(peer));
584 }
585
586 if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
587 return Some(StateSyncClient::new(peer));
588 }
589
590 if let Some((peer, _)) = self.select_failing() {
591 return Some(StateSyncClient::new(peer));
592 }
593
594 None
595 }
596}
597
598#[derive(Clone, Debug)]
599enum StateSyncMessage {
600 StartSyncJob,
601 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
604 SyncedCheckpoint(Box<VerifiedCheckpoint>),
608}
609
610struct StateSyncEventLoop<S> {
611 config: StateSyncConfig,
612
613 mailbox: mpsc::Receiver<StateSyncMessage>,
614 weak_sender: mpsc::WeakSender<StateSyncMessage>,
616
617 tasks: JoinSet<()>,
618 sync_checkpoint_summaries_task: Option<AbortHandle>,
619 sync_checkpoint_contents_task: Option<AbortHandle>,
620 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
621
622 store: S,
623 peer_heights: Arc<RwLock<PeerHeights>>,
624 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
625 network: anemo::Network,
626 metrics: Metrics,
627
628 sync_checkpoint_from_archive_task: Option<AbortHandle>,
629 archive_config: Option<ArchiveReaderConfig>,
630}
631
632impl<S> StateSyncEventLoop<S>
633where
634 S: WriteStore + Clone + Send + Sync + 'static,
635{
636 pub async fn start(mut self) {
641 info!("State-Synchronizer started");
642
643 self.config.pinned_checkpoints.sort();
644
645 let mut interval = tokio::time::interval(self.config.interval_period());
646 let mut peer_events = {
647 let (subscriber, peers) = self.network.subscribe().unwrap();
648 for peer_id in peers {
649 self.spawn_get_latest_from_peer(peer_id);
650 }
651 subscriber
652 };
653 let (
654 target_checkpoint_contents_sequence_sender,
655 target_checkpoint_contents_sequence_receiver,
656 ) = watch::channel(0);
657
658 let (_sender, receiver) = oneshot::channel();
660 tokio::spawn(update_checkpoint_watermark_metrics(
661 receiver,
662 self.store.clone(),
663 self.metrics.clone(),
664 ));
665
666 let task = sync_checkpoint_contents(
668 self.network.clone(),
669 self.store.clone(),
670 self.peer_heights.clone(),
671 self.weak_sender.clone(),
672 self.checkpoint_event_sender.clone(),
673 self.config.checkpoint_content_download_concurrency(),
674 self.config.checkpoint_content_download_tx_concurrency(),
675 self.config.use_get_checkpoint_contents_v2(),
676 target_checkpoint_contents_sequence_receiver,
677 );
678 let task_handle = self.tasks.spawn(task);
679 self.sync_checkpoint_contents_task = Some(task_handle);
680
681 let task = sync_checkpoint_contents_from_archive(
689 self.network.clone(),
690 self.archive_config.clone(),
691 self.store.clone(),
692 self.peer_heights.clone(),
693 self.metrics.clone(),
694 );
695 let task_handle = self.tasks.spawn(task);
696 self.sync_checkpoint_from_archive_task = Some(task_handle);
697
698 loop {
700 tokio::select! {
701 now = interval.tick() => {
702 self.handle_tick(now.into_std());
703 },
704 maybe_message = self.mailbox.recv() => {
705 if let Some(message) = maybe_message {
708 self.handle_message(message);
709 } else {
710 break;
711 }
712 },
713 peer_event = peer_events.recv() => {
714 self.handle_peer_event(peer_event);
715 },
716 Some(task_result) = self.tasks.join_next() => {
717 match task_result {
718 Ok(()) => {},
719 Err(e) => {
720 if e.is_cancelled() {
721 } else if e.is_panic() {
723 std::panic::resume_unwind(e.into_panic());
725 } else {
726 panic!("task failed: {e}");
727 }
728 },
729 };
730
731 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
732 panic!("sync_checkpoint_contents task unexpectedly terminated")
733 }
734
735 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
736 self.sync_checkpoint_summaries_task = None;
737 }
738
739 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
740 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
741 }
742 },
743 }
744
745 self.maybe_start_checkpoint_summary_sync_task();
746 self.maybe_trigger_checkpoint_contents_sync_task(
747 &target_checkpoint_contents_sequence_sender,
748 );
749 }
750
751 info!("State-Synchronizer ended");
752 }
753
754 fn handle_message(&mut self, message: StateSyncMessage) {
755 debug!("Received message: {:?}", message);
756 match message {
757 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
758 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
759 self.handle_checkpoint_from_consensus(checkpoint)
760 }
761 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
763 self.spawn_notify_peers_of_checkpoint(*checkpoint)
764 }
765 }
766 }
767
768 #[instrument(level = "debug", skip_all)]
770 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
771 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
774 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
775 .digest();
776 if checkpoint.previous_digest != Some(prev_digest) {
777 panic!(
778 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
779 checkpoint.sequence_number(),
780 Some(prev_digest),
781 checkpoint.previous_digest
782 );
783 }
784
785 let latest_checkpoint = self
786 .store
787 .get_highest_verified_checkpoint()
788 .expect("store operation should not fail");
789
790 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
792 return;
793 }
794
795 let checkpoint = *checkpoint;
796 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
797 if *checkpoint.sequence_number() > next_sequence_number {
798 debug!(
799 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
800 next_sequence_number,
801 checkpoint.sequence_number()
802 );
803 }
804
805 #[cfg(debug_assertions)]
808 {
809 let _ = (next_sequence_number..=*checkpoint.sequence_number())
810 .map(|n| {
811 let checkpoint = self
812 .store
813 .get_checkpoint_by_sequence_number(n)
814 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
815 self.store
816 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
817 .unwrap_or_else(|| {
818 panic!(
819 "store should contain checkpoint contents for {:?}",
820 checkpoint.content_digest
821 )
822 });
823 })
824 .collect::<Vec<_>>();
825 }
826
827 if let Some(EndOfEpochData {
834 next_epoch_committee,
835 ..
836 }) = checkpoint.end_of_epoch_data.as_ref()
837 {
838 let next_committee = next_epoch_committee.iter().cloned().collect();
839 let committee =
840 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
841 self.store
842 .insert_committee(committee)
843 .expect("store operation should not fail");
844 }
845
846 self.store
847 .update_highest_verified_checkpoint(&checkpoint)
848 .expect("store operation should not fail");
849 self.store
850 .update_highest_synced_checkpoint(&checkpoint)
851 .expect("store operation should not fail");
852
853 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
855
856 self.spawn_notify_peers_of_checkpoint(checkpoint);
857 }
858
859 fn handle_peer_event(
860 &mut self,
861 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
862 ) {
863 use tokio::sync::broadcast::error::RecvError;
864
865 match peer_event {
866 Ok(PeerEvent::NewPeer(peer_id)) => {
867 self.spawn_get_latest_from_peer(peer_id);
868 }
869 Ok(PeerEvent::LostPeer(peer_id, _)) => {
870 self.peer_heights.write().unwrap().peers.remove(&peer_id);
871 }
872
873 Err(RecvError::Closed) => {
874 panic!("PeerEvent channel shouldn't be able to be closed");
875 }
876
877 Err(RecvError::Lagged(_)) => {
878 trace!("State-Sync fell behind processing PeerEvents");
879 }
880 }
881 }
882
883 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
884 if let Some(peer) = self.network.peer(peer_id) {
885 let genesis_checkpoint_digest = *self
886 .store
887 .get_checkpoint_by_sequence_number(0)
888 .expect("store should contain genesis checkpoint")
889 .digest();
890 let task = get_latest_from_peer(
891 genesis_checkpoint_digest,
892 peer,
893 self.peer_heights.clone(),
894 self.config.timeout(),
895 );
896 self.tasks.spawn(task);
897 }
898 }
899
900 fn handle_tick(&mut self, _now: std::time::Instant) {
901 let task = query_peers_for_their_latest_checkpoint(
902 self.network.clone(),
903 self.peer_heights.clone(),
904 self.weak_sender.clone(),
905 self.config.timeout(),
906 );
907 self.tasks.spawn(task);
908
909 if let Some(layer) = self.download_limit_layer.as_ref() {
910 layer.maybe_prune_map();
911 }
912 }
913
914 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
915 if self.sync_checkpoint_summaries_task.is_some() {
917 return;
918 }
919
920 let highest_processed_checkpoint = self
921 .store
922 .get_highest_verified_checkpoint()
923 .expect("store operation should not fail");
924
925 let highest_known_sequence_number = self
926 .peer_heights
927 .read()
928 .unwrap()
929 .highest_known_checkpoint_sequence_number();
930
931 if let Some(target_seq) = highest_known_sequence_number
932 && *highest_processed_checkpoint.sequence_number() < target_seq
933 {
934 let max_batch_size = self.config.max_checkpoint_sync_batch_size();
936 let limited_target = std::cmp::min(
937 target_seq,
938 highest_processed_checkpoint
939 .sequence_number()
940 .saturating_add(max_batch_size),
941 );
942 let was_limited = limited_target < target_seq;
943
944 let weak_sender = self.weak_sender.clone();
946 let task = sync_to_checkpoint(
947 self.network.clone(),
948 self.store.clone(),
949 self.peer_heights.clone(),
950 self.metrics.clone(),
951 self.config.pinned_checkpoints.clone(),
952 self.config.checkpoint_header_download_concurrency(),
953 self.config.timeout(),
954 limited_target,
955 )
956 .map(move |result| match result {
957 Ok(()) => {
958 if was_limited && let Some(sender) = weak_sender.upgrade() {
961 let _ = sender.try_send(StateSyncMessage::StartSyncJob);
962 }
963 }
964 Err(e) => {
965 debug!("error syncing checkpoint {e}");
966 }
967 });
968 let task_handle = self.tasks.spawn(task);
969 self.sync_checkpoint_summaries_task = Some(task_handle);
970 }
971 }
972
973 fn maybe_trigger_checkpoint_contents_sync_task(
974 &mut self,
975 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
976 ) {
977 let highest_verified_checkpoint = self
978 .store
979 .get_highest_verified_checkpoint()
980 .expect("store operation should not fail");
981 let highest_synced_checkpoint = self
982 .store
983 .get_highest_synced_checkpoint()
984 .expect("store operation should not fail");
985
986 if highest_verified_checkpoint.sequence_number()
987 > highest_synced_checkpoint.sequence_number()
988 && self
990 .peer_heights
991 .read()
992 .unwrap()
993 .highest_known_checkpoint_sequence_number()
994 > Some(*highest_synced_checkpoint.sequence_number())
995 {
996 let _ = target_sequence_channel.send_if_modified(|num| {
997 let new_num = *highest_verified_checkpoint.sequence_number();
998 if *num == new_num {
999 return false;
1000 }
1001 *num = new_num;
1002 true
1003 });
1004 }
1005 }
1006
1007 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1008 let task = notify_peers_of_checkpoint(
1009 self.network.clone(),
1010 self.peer_heights.clone(),
1011 checkpoint,
1012 self.config.timeout(),
1013 );
1014 self.tasks.spawn(task);
1015 }
1016}
1017
1018async fn notify_peers_of_checkpoint(
1019 network: anemo::Network,
1020 peer_heights: Arc<RwLock<PeerHeights>>,
1021 checkpoint: VerifiedCheckpoint,
1022 timeout: Duration,
1023) {
1024 let futs = peer_heights
1025 .read()
1026 .unwrap()
1027 .peers_on_same_chain()
1028 .filter_map(|(peer_id, info)| {
1030 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1031 })
1032 .flat_map(|peer_id| network.peer(*peer_id))
1034 .map(StateSyncClient::new)
1035 .map(|mut client| {
1036 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1037 async move { client.push_checkpoint_summary(request).await }
1038 })
1039 .collect::<Vec<_>>();
1040 futures::future::join_all(futs).await;
1041}
1042
1043async fn get_latest_from_peer(
1044 our_genesis_checkpoint_digest: CheckpointDigest,
1045 peer: anemo::Peer,
1046 peer_heights: Arc<RwLock<PeerHeights>>,
1047 timeout: Duration,
1048) {
1049 let peer_id = peer.peer_id();
1050 let mut client = StateSyncClient::new(peer);
1051
1052 let info = {
1053 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1054
1055 if let Some(info) = maybe_info {
1056 info
1057 } else {
1058 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1063 .with_timeout(timeout);
1064 let response = client
1065 .get_checkpoint_summary(request)
1066 .await
1067 .map(Response::into_inner);
1068
1069 let info = match response {
1070 Ok(Some(checkpoint)) => {
1071 let digest = *checkpoint.digest();
1072 PeerStateSyncInfo {
1073 genesis_checkpoint_digest: digest,
1074 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1075 height: *checkpoint.sequence_number(),
1076 lowest: CheckpointSequenceNumber::default(),
1077 }
1078 }
1079 Ok(None) => PeerStateSyncInfo {
1080 genesis_checkpoint_digest: CheckpointDigest::default(),
1081 on_same_chain_as_us: false,
1082 height: CheckpointSequenceNumber::default(),
1083 lowest: CheckpointSequenceNumber::default(),
1084 },
1085 Err(status) => {
1086 trace!("get_latest_checkpoint_summary request failed: {status:?}");
1087 return;
1088 }
1089 };
1090 peer_heights
1091 .write()
1092 .unwrap()
1093 .insert_peer_info(peer_id, info);
1094 info
1095 }
1096 };
1097
1098 if !info.on_same_chain_as_us {
1100 trace!(?info, "Peer {peer_id} not on same chain as us");
1101 return;
1102 }
1103 let Some((highest_checkpoint, low_watermark)) =
1104 query_peer_for_latest_info(&mut client, timeout).await
1105 else {
1106 return;
1107 };
1108 peer_heights
1109 .write()
1110 .unwrap()
1111 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1112}
1113
1114async fn query_peer_for_latest_info(
1116 client: &mut StateSyncClient<anemo::Peer>,
1117 timeout: Duration,
1118) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1119 let request = Request::new(()).with_timeout(timeout);
1120 let response = client
1121 .get_checkpoint_availability(request)
1122 .await
1123 .map(Response::into_inner);
1124 match response {
1125 Ok(GetCheckpointAvailabilityResponse {
1126 highest_synced_checkpoint,
1127 lowest_available_checkpoint,
1128 }) => {
1129 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1130 }
1131 Err(status) => {
1132 if status.status() != anemo::types::response::StatusCode::NotFound {
1134 trace!("get_checkpoint_availability request failed: {status:?}");
1135 return None;
1136 }
1137 }
1138 };
1139
1140 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1143 let response = client
1144 .get_checkpoint_summary(request)
1145 .await
1146 .map(Response::into_inner);
1147 match response {
1148 Ok(Some(checkpoint)) => Some((checkpoint, None)),
1149 Ok(None) => None,
1150 Err(status) => {
1151 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1152 None
1153 }
1154 }
1155}
1156
1157#[instrument(level = "debug", skip_all)]
1158async fn query_peers_for_their_latest_checkpoint(
1159 network: anemo::Network,
1160 peer_heights: Arc<RwLock<PeerHeights>>,
1161 sender: mpsc::WeakSender<StateSyncMessage>,
1162 timeout: Duration,
1163) {
1164 let peer_heights = &peer_heights;
1165 let futs = peer_heights
1166 .read()
1167 .unwrap()
1168 .peers_on_same_chain()
1169 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1171 .map(|peer| {
1172 let peer_id = peer.peer_id();
1173 let mut client = StateSyncClient::new(peer);
1174
1175 async move {
1176 let response = query_peer_for_latest_info(&mut client, timeout).await;
1177 match response {
1178 Some((highest_checkpoint, low_watermark)) => peer_heights
1179 .write()
1180 .unwrap()
1181 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1182 .then_some(highest_checkpoint),
1183 None => None,
1184 }
1185 }
1186 })
1187 .collect::<Vec<_>>();
1188
1189 debug!("Query {} peers for latest checkpoint", futs.len());
1190
1191 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1192
1193 let highest_checkpoint_seq = checkpoints
1194 .map(|checkpoint| *checkpoint.sequence_number())
1195 .max();
1196
1197 let our_highest_seq = peer_heights
1198 .read()
1199 .unwrap()
1200 .highest_known_checkpoint_sequence_number();
1201
1202 debug!(
1203 "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1204 );
1205
1206 let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1207 (Some(theirs), None) => theirs,
1208 (Some(theirs), Some(ours)) if theirs > ours => theirs,
1209 _ => return,
1210 };
1211
1212 if let Some(sender) = sender.upgrade() {
1213 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1214 }
1215}
1216
1217async fn sync_to_checkpoint<S>(
1218 network: anemo::Network,
1219 store: S,
1220 peer_heights: Arc<RwLock<PeerHeights>>,
1221 metrics: Metrics,
1222 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1223 checkpoint_header_download_concurrency: usize,
1224 timeout: Duration,
1225 target_sequence_number: CheckpointSequenceNumber,
1226) -> Result<()>
1227where
1228 S: WriteStore,
1229{
1230 metrics.set_highest_known_checkpoint(target_sequence_number);
1231
1232 let mut current = store
1233 .get_highest_verified_checkpoint()
1234 .expect("store operation should not fail");
1235 if *current.sequence_number() >= target_sequence_number {
1236 return Err(anyhow::anyhow!(
1237 "target checkpoint {} is older than highest verified checkpoint {}",
1238 target_sequence_number,
1239 current.sequence_number(),
1240 ));
1241 }
1242
1243 let peer_balancer = PeerBalancer::new(
1244 &network,
1245 peer_heights.clone(),
1246 PeerCheckpointRequestType::Summary,
1247 );
1248 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1250 ..=target_sequence_number)
1251 .map(|next| {
1252 let peers = peer_balancer.clone().with_checkpoint(next);
1253 let peer_heights = peer_heights.clone();
1254 let pinned_checkpoints = &pinned_checkpoints;
1255 async move {
1256 if let Some(checkpoint) = peer_heights
1257 .read()
1258 .unwrap()
1259 .get_checkpoint_by_sequence_number(next)
1260 {
1261 return (Some(checkpoint.to_owned()), next, None);
1262 }
1263
1264 for mut peer in peers {
1267 let peer_id = peer.inner().peer_id();
1268 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1269 .with_timeout(timeout);
1270 let start = Instant::now();
1271 let result = peer.get_checkpoint_summary(request).await;
1272 let elapsed = start.elapsed();
1273
1274 let checkpoint = match result {
1275 Ok(response) => match response.into_inner() {
1276 Some(cp) => Some(cp),
1277 None => {
1278 trace!("peer unable to help sync");
1279 peer_heights.write().unwrap().record_failure(peer_id);
1280 None
1281 }
1282 },
1283 Err(e) => {
1284 trace!("{e:?}");
1285 peer_heights.write().unwrap().record_failure(peer_id);
1286 None
1287 }
1288 };
1289
1290 let Some(checkpoint) = checkpoint else {
1291 continue;
1292 };
1293
1294 let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1295 peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1296
1297 if *checkpoint.sequence_number() != next {
1299 tracing::debug!(
1300 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1301 checkpoint.sequence_number()
1302 );
1303 peer_heights
1304 .write()
1305 .unwrap()
1306 .mark_peer_as_not_on_same_chain(peer_id);
1307 continue;
1308 }
1309
1310 let checkpoint_digest = checkpoint.digest();
1312 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1313 checkpoint.sequence_number(),
1314 |(seq_num, _digest)| *seq_num
1315 )
1316 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1317 {
1318 tracing::debug!(
1319 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1320 pinned_checkpoints[pinned_digest_index].1,
1321 checkpoint_digest
1322 );
1323 peer_heights
1324 .write()
1325 .unwrap()
1326 .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1327 continue;
1328 }
1329
1330 peer_heights
1332 .write()
1333 .unwrap()
1334 .insert_checkpoint(checkpoint.clone());
1335 return (Some(checkpoint), next, Some(peer_id));
1336 }
1337 (None, next, None)
1338 }
1339 })
1340 .pipe(futures::stream::iter)
1341 .buffered(checkpoint_header_download_concurrency);
1342
1343 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1344 assert_eq!(
1345 current
1346 .sequence_number()
1347 .checked_add(1)
1348 .expect("exhausted u64"),
1349 next
1350 );
1351
1352 let checkpoint = 'cp: {
1354 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1355 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1356 })?;
1357 if pinned_checkpoints
1359 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1360 .is_ok()
1361 {
1362 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1363 }
1364 match verify_checkpoint(¤t, &store, checkpoint) {
1365 Ok(verified_checkpoint) => verified_checkpoint,
1366 Err(checkpoint) => {
1367 let mut peer_heights = peer_heights.write().unwrap();
1368 peer_heights.remove_checkpoint(checkpoint.digest());
1371
1372 if let Some(peer_id) = maybe_peer_id {
1374 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1375 }
1376
1377 return Err(anyhow::anyhow!(
1378 "unable to verify checkpoint {checkpoint:?}"
1379 ));
1380 }
1381 }
1382 };
1383
1384 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1385 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1386 metrics.checkpoint_summary_age_metrics()
1387 {
1388 checkpoint.report_checkpoint_age(
1389 checkpoint_summary_age_metric,
1390 checkpoint_summary_age_metric_deprecated,
1391 );
1392 }
1393
1394 store
1397 .insert_checkpoint(&checkpoint)
1398 .expect("store operation should not fail");
1399
1400 current = checkpoint;
1401 }
1402
1403 peer_heights
1404 .write()
1405 .unwrap()
1406 .cleanup_old_checkpoints(*current.sequence_number());
1407
1408 Ok(())
1409}
1410
1411async fn sync_checkpoint_contents_from_archive<S>(
1412 network: anemo::Network,
1413 archive_config: Option<ArchiveReaderConfig>,
1414 store: S,
1415 peer_heights: Arc<RwLock<PeerHeights>>,
1416 metrics: Metrics,
1417) where
1418 S: WriteStore + Clone + Send + Sync + 'static,
1419{
1420 loop {
1421 sync_checkpoint_contents_from_archive_iteration(
1422 &network,
1423 &archive_config,
1424 store.clone(),
1425 peer_heights.clone(),
1426 metrics.clone(),
1427 )
1428 .await;
1429 tokio::time::sleep(Duration::from_secs(5)).await;
1430 }
1431}
1432
1433async fn sync_checkpoint_contents_from_archive_iteration<S>(
1434 network: &anemo::Network,
1435 archive_config: &Option<ArchiveReaderConfig>,
1436 store: S,
1437 peer_heights: Arc<RwLock<PeerHeights>>,
1438 metrics: Metrics,
1439) where
1440 S: WriteStore + Clone + Send + Sync + 'static,
1441{
1442 let peers: Vec<_> = peer_heights
1443 .read()
1444 .unwrap()
1445 .peers_on_same_chain()
1446 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1448 .collect();
1449 let lowest_checkpoint_on_peers = peers
1450 .iter()
1451 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1452 .min();
1453 let highest_synced = store
1454 .get_highest_synced_checkpoint()
1455 .expect("store operation should not fail")
1456 .sequence_number;
1457 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1458 highest_synced < lowest_checkpoint_on_peers
1459 } else {
1460 false
1461 };
1462 debug!(
1463 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1464 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1465 );
1466 if sync_from_archive {
1467 let start = highest_synced
1468 .checked_add(1)
1469 .expect("Checkpoint seq num overflow");
1470 let end = lowest_checkpoint_on_peers.unwrap();
1471
1472 let Some(archive_config) = archive_config else {
1473 warn!("Failed to find an archive reader to complete the state sync request");
1474 return;
1475 };
1476 let Some(ingestion_url) = &archive_config.ingestion_url else {
1477 warn!("Archival ingestion url for state sync is not configured");
1478 return;
1479 };
1480 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1481 warn!("{} can't be used as an archival fallback", ingestion_url);
1482 return;
1483 }
1484 let reader_options = ReaderOptions {
1485 batch_size: archive_config.download_concurrency.into(),
1486 upper_limit: Some(end),
1487 ..Default::default()
1488 };
1489 let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1490 StateSyncWorker(store, metrics),
1491 ingestion_url.clone(),
1492 archive_config.remote_store_options.clone(),
1493 start,
1494 1,
1495 Some(reader_options),
1496 )
1497 .await
1498 else {
1499 return;
1500 };
1501 match executor.await {
1502 Ok(_) => info!(
1503 "State sync from archive is complete. Checkpoints downloaded = {:?}",
1504 end - start
1505 ),
1506 Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1507 }
1508 }
1509}
1510
1511async fn sync_checkpoint_contents<S>(
1512 network: anemo::Network,
1513 store: S,
1514 peer_heights: Arc<RwLock<PeerHeights>>,
1515 sender: mpsc::WeakSender<StateSyncMessage>,
1516 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1517 checkpoint_content_download_concurrency: usize,
1518 checkpoint_content_download_tx_concurrency: u64,
1519 use_get_checkpoint_contents_v2: bool,
1520 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1521) where
1522 S: WriteStore + Clone,
1523{
1524 let mut highest_synced = store
1525 .get_highest_synced_checkpoint()
1526 .expect("store operation should not fail");
1527
1528 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1529 let mut target_sequence_cursor = 0;
1530 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1531 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1532
1533 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1534
1535 loop {
1536 tokio::select! {
1537 result = target_sequence_channel.changed() => {
1538 match result {
1539 Ok(()) => {
1540 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1541 }
1542 Err(_) => {
1543 return
1545 }
1546 }
1547 },
1548 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1549 match maybe_checkpoint {
1550 Ok(checkpoint) => {
1551 let _: &VerifiedCheckpoint = &checkpoint; store
1554 .update_highest_synced_checkpoint(&checkpoint)
1555 .expect("store operation should not fail");
1556 let _ = checkpoint_event_sender.send(checkpoint.clone());
1558 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1559 highest_synced = checkpoint;
1560
1561 }
1562 Err(checkpoint) => {
1563 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1565 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1566 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1567 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1568 }
1569 } else {
1570 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1571
1572 }
1573 let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1575 checkpoint.network_total_transactions
1576 } else {
1577 let prev = store
1578 .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1579 .expect("previous checkpoint must exist")
1580 .network_total_transactions;
1581 checkpoint.network_total_transactions - prev
1582 };
1583 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1585 network.clone(),
1586 &store,
1587 peer_heights.clone(),
1588 use_get_checkpoint_contents_v2,
1589 retry_tx_count,
1590 checkpoint,
1591 ));
1592 }
1593 }
1594 },
1595 }
1596
1597 while current_sequence < target_sequence_cursor
1599 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1600 {
1601 let next_checkpoint = store
1602 .get_checkpoint_by_sequence_number(current_sequence)
1603 .unwrap_or_else(|| panic!(
1604 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1605 current_sequence
1606 ));
1607
1608 let tx_count = next_checkpoint.network_total_transactions
1610 - highest_started_network_total_transactions;
1611 if tx_count > tx_concurrency_remaining {
1612 break;
1613 }
1614 tx_concurrency_remaining -= tx_count;
1615
1616 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1617 current_sequence += 1;
1618 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1619 network.clone(),
1620 &store,
1621 peer_heights.clone(),
1622 use_get_checkpoint_contents_v2,
1623 tx_count,
1624 next_checkpoint,
1625 ));
1626 }
1627
1628 if highest_synced
1629 .sequence_number()
1630 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1631 || checkpoint_contents_tasks.is_empty()
1632 {
1633 if let Some(sender) = sender.upgrade() {
1635 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1636 let _ = sender.send(message).await;
1637 }
1638 }
1639 }
1640}
1641
1642#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1643async fn sync_one_checkpoint_contents<S>(
1644 network: anemo::Network,
1645 store: S,
1646 peer_heights: Arc<RwLock<PeerHeights>>,
1647 use_get_checkpoint_contents_v2: bool,
1648 tx_count: u64,
1649 checkpoint: VerifiedCheckpoint,
1650) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1651where
1652 S: WriteStore + Clone,
1653{
1654 let (timeout_min, timeout_max) = {
1655 let ph = peer_heights.read().unwrap();
1656 (
1657 ph.checkpoint_content_timeout_min,
1658 ph.checkpoint_content_timeout_max,
1659 )
1660 };
1661 let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1662 debug!(
1663 "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1664 timeout, tx_count
1665 );
1666
1667 if store
1670 .get_highest_synced_checkpoint()
1671 .expect("store operation should not fail")
1672 .sequence_number()
1673 >= checkpoint.sequence_number()
1674 {
1675 debug!("checkpoint was already created via consensus output");
1676 return Ok(checkpoint);
1677 }
1678
1679 let peers = PeerBalancer::new(
1681 &network,
1682 peer_heights.clone(),
1683 PeerCheckpointRequestType::Content,
1684 )
1685 .with_checkpoint(*checkpoint.sequence_number());
1686 let now = tokio::time::Instant::now();
1687 let Some(_contents) = get_full_checkpoint_contents(
1688 peers,
1689 &store,
1690 peer_heights.clone(),
1691 &checkpoint,
1692 use_get_checkpoint_contents_v2,
1693 timeout,
1694 )
1695 .await
1696 else {
1697 let duration = peer_heights
1699 .read()
1700 .unwrap()
1701 .wait_interval_when_no_peer_to_sync_content();
1702 if now.elapsed() < duration {
1703 let duration = duration - now.elapsed();
1704 info!("retrying checkpoint sync after {:?}", duration);
1705 tokio::time::sleep(duration).await;
1706 }
1707 return Err(checkpoint);
1708 };
1709 debug!("completed checkpoint contents sync");
1710 Ok(checkpoint)
1711}
1712
1713#[instrument(level = "debug", skip_all)]
1714async fn get_full_checkpoint_contents<S>(
1715 peers: PeerBalancer,
1716 store: S,
1717 peer_heights: Arc<RwLock<PeerHeights>>,
1718 checkpoint: &VerifiedCheckpoint,
1719 use_get_checkpoint_contents_v2: bool,
1720 timeout: Duration,
1721) -> Option<VersionedFullCheckpointContents>
1722where
1723 S: WriteStore,
1724{
1725 let sequence_number = checkpoint.sequence_number;
1726 let digest = checkpoint.content_digest;
1727 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1728 debug!("store already contains checkpoint contents");
1729 return Some(contents);
1730 }
1731
1732 for mut peer in peers {
1735 let peer_id = peer.inner().peer_id();
1736 debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1737 let request = Request::new(digest).with_timeout(timeout);
1738 let start = Instant::now();
1739 let result = if use_get_checkpoint_contents_v2 {
1740 peer.get_checkpoint_contents_v2(request).await
1741 } else {
1742 peer.get_checkpoint_contents(request)
1743 .await
1744 .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1745 };
1746 let elapsed = start.elapsed();
1747
1748 let contents = match result {
1749 Ok(response) => match response.into_inner() {
1750 Some(c) => Some(c),
1751 None => {
1752 trace!("peer unable to help sync");
1753 peer_heights.write().unwrap().record_failure(peer_id);
1754 None
1755 }
1756 },
1757 Err(e) => {
1758 trace!("{e:?}");
1759 peer_heights.write().unwrap().record_failure(peer_id);
1760 None
1761 }
1762 };
1763
1764 let Some(contents) = contents else {
1765 continue;
1766 };
1767
1768 if contents.verify_digests(digest).is_ok() {
1769 let size =
1770 bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1771 peer_heights
1772 .write()
1773 .unwrap()
1774 .record_success(peer_id, size, elapsed);
1775 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1776 store
1777 .insert_checkpoint_contents(checkpoint, verified_contents)
1778 .expect("store operation should not fail");
1779 return Some(contents);
1780 }
1781 }
1782 debug!("no peers had checkpoint contents");
1783 None
1784}
1785
1786async fn update_checkpoint_watermark_metrics<S>(
1787 mut recv: oneshot::Receiver<()>,
1788 store: S,
1789 metrics: Metrics,
1790) -> Result<()>
1791where
1792 S: WriteStore + Clone + Send + Sync,
1793{
1794 let mut interval = tokio::time::interval(Duration::from_secs(5));
1795 loop {
1796 tokio::select! {
1797 _now = interval.tick() => {
1798 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1799 .expect("store operation should not fail");
1800 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1801 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1802 .expect("store operation should not fail");
1803 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1804 },
1805 _ = &mut recv => break,
1806 }
1807 }
1808 Ok(())
1809}