1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::process_archive_checkpoint;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::object_store::util::{build_object_store, fetch_checkpoint};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107 metrics: Metrics,
108}
109
110impl Handle {
111 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
120 self.sender
121 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
122 .await
123 .unwrap()
124 }
125
126 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
128 self.checkpoint_event_sender.subscribe()
129 }
130
131 pub fn get_peers_reported_for_failure(&self) -> u64 {
133 self.metrics.get_peers_reported_for_failure()
134 }
135}
136
137pub(super) fn compute_adaptive_timeout(
138 tx_count: u64,
139 min_timeout: Duration,
140 max_timeout: Duration,
141) -> Duration {
142 const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
143 const JITTER_FRACTION: f64 = 0.1;
144
145 let max_timeout = max_timeout.max(min_timeout);
146 let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
147 let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
148 let base = min_timeout + extra;
149
150 let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
151 let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
152 Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
153}
154
155#[cfg_attr(test, derive(Debug))]
156pub(super) struct PeerScore {
157 successes: VecDeque<(Instant, u64 , Duration)>,
158 failures: VecDeque<Instant>,
159 window: Duration,
160 failure_rate: f64,
161 failing_since: Option<Instant>,
162}
163
164impl PeerScore {
165 const MAX_SAMPLES: usize = 20;
166 const MIN_SAMPLES_FOR_FAILURE: usize = 10;
167
168 pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
169 Self {
170 successes: VecDeque::new(),
171 failures: VecDeque::new(),
172 window,
173 failure_rate,
174 failing_since: None,
175 }
176 }
177
178 pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
179 let now = Instant::now();
180 self.successes.push_back((now, size, response_time));
181 while self.successes.len() > Self::MAX_SAMPLES {
182 self.successes.pop_front();
183 }
184 }
190
191 pub(super) fn record_failure(&mut self) {
192 let now = Instant::now();
193 self.failures.push_back(now);
194 while self.failures.len() > Self::MAX_SAMPLES {
195 self.failures.pop_front();
196 }
197 }
198
199 pub(super) fn is_failing(&self) -> bool {
200 let now = Instant::now();
201 let recent_failures = self
202 .failures
203 .iter()
204 .filter(|ts| now.duration_since(**ts) < self.window)
205 .count();
206 let recent_successes = self
207 .successes
208 .iter()
209 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
210 .count();
211
212 let total = recent_failures + recent_successes;
213 if total < Self::MIN_SAMPLES_FOR_FAILURE {
214 return false;
215 }
216
217 let rate = recent_failures as f64 / total as f64;
218 rate >= self.failure_rate
219 }
220
221 pub(super) fn update_failing_state(&mut self) {
222 if self.is_failing() {
223 if self.failing_since.is_none() {
226 self.failing_since = Some(Instant::now());
227 }
228 } else {
229 self.failing_since = None;
232 }
233 }
234
235 pub(super) fn reset_failing_since(&mut self) {
236 self.failing_since = None;
237 }
238
239 pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
240 match self.failing_since {
241 Some(since) => since.elapsed() >= threshold,
242 None => false,
243 }
244 }
245
246 pub(super) fn effective_throughput(&self) -> Option<f64> {
247 let now = Instant::now();
248 let (total_size, total_time) = self
249 .successes
250 .iter()
251 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
252 .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
253 (size + s, time + *d)
254 });
255
256 if total_size == 0 {
257 return None;
258 }
259
260 if total_time.is_zero() {
261 return None;
262 }
263
264 Some(total_size as f64 / total_time.as_secs_f64())
265 }
266}
267
268struct PeerHeights {
269 peers: HashMap<PeerId, PeerStateSyncInfo>,
271 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
272 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
273 scores: HashMap<PeerId, PeerScore>,
274
275 wait_interval_when_no_peer_to_sync_content: Duration,
276 peer_scoring_window: Duration,
277 peer_failure_rate: f64,
278 checkpoint_content_timeout_min: Duration,
279 checkpoint_content_timeout_max: Duration,
280 exploration_probability: f64,
281}
282
283#[derive(Copy, Clone, Debug, PartialEq, Eq)]
284struct PeerStateSyncInfo {
285 genesis_checkpoint_digest: CheckpointDigest,
287 on_same_chain_as_us: bool,
289 height: CheckpointSequenceNumber,
291 lowest: CheckpointSequenceNumber,
294}
295
296impl PeerHeights {
297 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
298 self.peers
299 .values()
300 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
301 .max()
302 }
303
304 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
305 self.peers
306 .iter()
307 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
308 }
309
310 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
315 pub fn update_peer_info(
316 &mut self,
317 peer_id: PeerId,
318 checkpoint: Checkpoint,
319 low_watermark: Option<CheckpointSequenceNumber>,
320 ) -> bool {
321 debug!("Update peer info");
322
323 let info = match self.peers.get_mut(&peer_id) {
324 Some(info) if info.on_same_chain_as_us => info,
325 _ => return false,
326 };
327
328 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
329 if let Some(low_watermark) = low_watermark {
330 info.lowest = low_watermark;
331 }
332 self.insert_checkpoint(checkpoint);
333
334 true
335 }
336
337 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
338 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
339 use std::collections::hash_map::Entry;
340 debug!("Insert peer info");
341
342 match self.peers.entry(peer_id) {
343 Entry::Occupied(mut entry) => {
344 let entry = entry.get_mut();
347 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
348 entry.height = std::cmp::max(entry.height, info.height);
349 } else {
350 *entry = info;
351 }
352 }
353 Entry::Vacant(entry) => {
354 entry.insert(info);
355 }
356 }
357 }
358
359 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
360 if let Some(info) = self.peers.get_mut(&peer_id) {
361 info.on_same_chain_as_us = false;
362 }
363 self.scores.remove(&peer_id);
364 }
365
366 pub fn update_peer_height(
369 &mut self,
370 peer_id: PeerId,
371 height: CheckpointSequenceNumber,
372 low_watermark: Option<CheckpointSequenceNumber>,
373 ) -> bool {
374 let info = match self.peers.get_mut(&peer_id) {
375 Some(info) if info.on_same_chain_as_us => info,
376 _ => return false,
377 };
378
379 info.height = std::cmp::max(height, info.height);
380 if let Some(low_watermark) = low_watermark {
381 info.lowest = low_watermark;
382 }
383
384 true
385 }
386
387 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
388 self.unprocessed_checkpoints
389 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
390 self.sequence_number_to_digest
391 .retain(|&s, _digest| s > sequence_number);
392 }
393
394 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
399 let digest = *checkpoint.digest();
400 let sequence_number = *checkpoint.sequence_number();
401
402 if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
404 if *existing_digest == digest {
405 return;
407 }
408 tracing::info!(
409 ?sequence_number,
410 ?existing_digest,
411 ?digest,
412 "received checkpoint with same sequence number but different digest, dropping new checkpoint"
413 );
414 return;
415 }
416
417 self.unprocessed_checkpoints.insert(digest, checkpoint);
418 self.sequence_number_to_digest
419 .insert(sequence_number, digest);
420 }
421
422 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
423 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
424 self.sequence_number_to_digest
425 .remove(checkpoint.sequence_number());
426 }
427 }
428
429 pub fn get_checkpoint_by_sequence_number(
430 &self,
431 sequence_number: CheckpointSequenceNumber,
432 ) -> Option<&Checkpoint> {
433 self.sequence_number_to_digest
434 .get(&sequence_number)
435 .and_then(|digest| self.get_checkpoint_by_digest(digest))
436 }
437
438 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
439 self.unprocessed_checkpoints.get(digest)
440 }
441
442 #[cfg(test)]
443 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
444 self.wait_interval_when_no_peer_to_sync_content = duration;
445 }
446
447 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
448 self.wait_interval_when_no_peer_to_sync_content
449 }
450
451 pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
452 if !self.peers.contains_key(&peer_id) {
453 return;
454 }
455 self.scores
456 .entry(peer_id)
457 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
458 .record_success(size, response_time);
459 }
460
461 pub fn record_failure(&mut self, peer_id: PeerId) {
462 if !self.peers.contains_key(&peer_id) {
463 return;
464 }
465 self.scores
466 .entry(peer_id)
467 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
468 .record_failure();
469 }
470
471 pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
472 self.scores
473 .get(peer_id)
474 .and_then(|s| s.effective_throughput())
475 }
476
477 pub fn is_failing(&self, peer_id: &PeerId) -> bool {
478 self.scores
479 .get(peer_id)
480 .map(|s| s.is_failing())
481 .unwrap_or(false)
482 }
483
484 pub fn update_failing_states(&mut self) {
485 for score in self.scores.values_mut() {
486 score.update_failing_state();
487 }
488 }
489
490 pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
491 self.scores
492 .iter()
493 .filter(|(peer_id, score)| {
494 score.consistently_failing(threshold)
495 && self
496 .peers
497 .get(peer_id)
498 .is_some_and(|info| info.on_same_chain_as_us)
499 })
500 .max_by_key(|(_, score)| {
501 score
502 .failing_since
503 .map(|since| since.elapsed())
504 .unwrap_or(Duration::ZERO)
505 })
506 .map(|(peer_id, _)| *peer_id)
507 }
508}
509
510#[derive(Clone)]
515struct PeerBalancer {
516 known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
517 unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
518 failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
519 requested_checkpoint: Option<CheckpointSequenceNumber>,
520 request_type: PeerCheckpointRequestType,
521 exploration_probability: f64,
522}
523
524#[derive(Clone)]
525enum PeerCheckpointRequestType {
526 Summary,
527 Content,
528}
529
530impl PeerBalancer {
531 pub fn new(
532 network: &anemo::Network,
533 peer_heights: Arc<RwLock<PeerHeights>>,
534 request_type: PeerCheckpointRequestType,
535 ) -> Self {
536 let peer_heights_guard = peer_heights.read().unwrap();
537
538 let mut known_peers = Vec::new();
539 let mut unknown_peers = Vec::new();
540 let mut failing_peers = Vec::new();
541 let exploration_probability = peer_heights_guard.exploration_probability;
542
543 for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
544 let Some(peer) = network.peer(*peer_id) else {
545 continue;
546 };
547 let rtt_secs = peer.connection_rtt().as_secs_f64();
548
549 if peer_heights_guard.is_failing(peer_id) {
550 failing_peers.push((peer, *info));
551 } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
552 known_peers.push((peer, *info, throughput));
553 } else {
554 unknown_peers.push((peer, *info, rtt_secs));
555 }
556 }
557 drop(peer_heights_guard);
558
559 unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
560 rtt_a
561 .partial_cmp(rtt_b)
562 .unwrap_or(std::cmp::Ordering::Equal)
563 });
564
565 Self {
566 known_peers,
567 unknown_peers,
568 failing_peers,
569 requested_checkpoint: None,
570 request_type,
571 exploration_probability,
572 }
573 }
574
575 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
576 self.requested_checkpoint = Some(checkpoint);
577 self
578 }
579
580 fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
581 let requested = self.requested_checkpoint.unwrap_or(0);
582 match &self.request_type {
583 PeerCheckpointRequestType::Summary => info.height >= requested,
584 PeerCheckpointRequestType::Content => {
585 info.height >= requested && info.lowest <= requested
586 }
587 }
588 }
589
590 fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
591 let eligible: Vec<_> = self
592 .known_peers
593 .iter()
594 .enumerate()
595 .filter(|(_, (_, info, _))| self.is_eligible(info))
596 .map(|(i, (_, _, t))| (i, *t))
597 .collect();
598
599 if eligible.is_empty() {
600 return None;
601 }
602
603 let total: f64 = eligible.iter().map(|(_, t)| t).sum();
604 let mut pick = rand::thread_rng().gen_range(0.0..total);
605
606 for (idx, throughput) in &eligible {
607 pick -= throughput;
608 if pick <= 0.0 {
609 let (peer, info, _) = self.known_peers.remove(*idx);
610 return Some((peer, info));
611 }
612 }
613
614 let (idx, _) = eligible.last().unwrap();
615 let (peer, info, _) = self.known_peers.remove(*idx);
616 Some((peer, info))
617 }
618
619 fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
620 let pos = self
621 .unknown_peers
622 .iter()
623 .position(|(_, info, _)| self.is_eligible(info))?;
624 let (peer, info, _) = self.unknown_peers.remove(pos);
625 Some((peer, info))
626 }
627
628 fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
629 let pos = self
630 .failing_peers
631 .iter()
632 .position(|(_, info)| self.is_eligible(info))?;
633 Some(self.failing_peers.remove(pos))
634 }
635}
636
637impl Iterator for PeerBalancer {
638 type Item = StateSyncClient<anemo::Peer>;
639
640 fn next(&mut self) -> Option<Self::Item> {
641 let has_eligible_known = self
642 .known_peers
643 .iter()
644 .any(|(_, info, _)| self.is_eligible(info));
645 let has_eligible_unknown = self
646 .unknown_peers
647 .iter()
648 .any(|(_, info, _)| self.is_eligible(info));
649
650 let explore = has_eligible_unknown
651 && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
652
653 if explore && let Some((peer, _)) = self.select_by_rtt() {
654 return Some(StateSyncClient::new(peer));
655 }
656
657 if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
658 return Some(StateSyncClient::new(peer));
659 }
660
661 if let Some((peer, _)) = self.select_failing() {
662 return Some(StateSyncClient::new(peer));
663 }
664
665 None
666 }
667}
668
669#[derive(Clone, Debug)]
670enum StateSyncMessage {
671 StartSyncJob,
672 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
675 SyncedCheckpoint(Box<VerifiedCheckpoint>),
679}
680
681struct StateSyncEventLoop<S> {
682 config: StateSyncConfig,
683
684 mailbox: mpsc::Receiver<StateSyncMessage>,
685 weak_sender: mpsc::WeakSender<StateSyncMessage>,
687
688 tasks: JoinSet<()>,
689 sync_checkpoint_summaries_task: Option<AbortHandle>,
690 sync_checkpoint_contents_task: Option<AbortHandle>,
691 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
692
693 store: S,
694 peer_heights: Arc<RwLock<PeerHeights>>,
695 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
696 network: anemo::Network,
697 metrics: Metrics,
698
699 sync_checkpoint_from_archive_task: Option<AbortHandle>,
700 archive_config: Option<ArchiveReaderConfig>,
701 discovery_sender: Option<crate::discovery::Sender>,
702}
703
704impl<S> StateSyncEventLoop<S>
705where
706 S: WriteStore + Clone + Send + Sync + 'static,
707{
708 pub async fn start(mut self) {
713 info!("State-Synchronizer started");
714
715 self.config.pinned_checkpoints.sort();
716
717 let mut interval = tokio::time::interval(self.config.interval_period());
718 let mut peer_events = {
719 let (subscriber, peers) = self.network.subscribe().unwrap();
720 for peer_id in peers {
721 self.spawn_get_latest_from_peer(peer_id);
722 }
723 subscriber
724 };
725 let (
726 target_checkpoint_contents_sequence_sender,
727 target_checkpoint_contents_sequence_receiver,
728 ) = watch::channel(0);
729
730 let (_sender, receiver) = oneshot::channel();
732 tokio::spawn(update_checkpoint_watermark_metrics(
733 receiver,
734 self.store.clone(),
735 self.metrics.clone(),
736 ));
737
738 let task = sync_checkpoint_contents(
740 self.network.clone(),
741 self.store.clone(),
742 self.peer_heights.clone(),
743 self.weak_sender.clone(),
744 self.checkpoint_event_sender.clone(),
745 self.config.checkpoint_content_download_concurrency(),
746 self.config.checkpoint_content_download_tx_concurrency(),
747 self.config.use_get_checkpoint_contents_v2(),
748 target_checkpoint_contents_sequence_receiver,
749 );
750 let task_handle = self.tasks.spawn(task);
751 self.sync_checkpoint_contents_task = Some(task_handle);
752
753 let task = sync_checkpoint_contents_from_archive(
761 self.network.clone(),
762 self.archive_config.clone(),
763 self.store.clone(),
764 self.peer_heights.clone(),
765 self.metrics.clone(),
766 );
767 let task_handle = self.tasks.spawn(task);
768 self.sync_checkpoint_from_archive_task = Some(task_handle);
769
770 loop {
772 tokio::select! {
773 now = interval.tick() => {
774 self.handle_tick(now.into_std());
775 },
776 maybe_message = self.mailbox.recv() => {
777 if let Some(message) = maybe_message {
780 self.handle_message(message);
781 } else {
782 break;
783 }
784 },
785 peer_event = peer_events.recv() => {
786 self.handle_peer_event(peer_event);
787 },
788 Some(task_result) = self.tasks.join_next() => {
789 match task_result {
790 Ok(()) => {},
791 Err(e) => {
792 if e.is_cancelled() {
793 } else if e.is_panic() {
795 std::panic::resume_unwind(e.into_panic());
797 } else {
798 panic!("task failed: {e}");
799 }
800 },
801 };
802
803 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
804 panic!("sync_checkpoint_contents task unexpectedly terminated")
805 }
806
807 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
808 self.sync_checkpoint_summaries_task = None;
809 }
810
811 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
812 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
813 }
814 },
815 }
816
817 self.maybe_start_checkpoint_summary_sync_task();
818 self.maybe_trigger_checkpoint_contents_sync_task(
819 &target_checkpoint_contents_sequence_sender,
820 );
821 }
822
823 info!("State-Synchronizer ended");
824 }
825
826 fn handle_message(&mut self, message: StateSyncMessage) {
827 debug!("Received message: {:?}", message);
828 match message {
829 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
830 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
831 self.handle_checkpoint_from_consensus(checkpoint)
832 }
833 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
835 self.spawn_notify_peers_of_checkpoint(*checkpoint)
836 }
837 }
838 }
839
840 #[instrument(level = "debug", skip_all)]
842 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
843 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
846 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
847 .digest();
848 if checkpoint.previous_digest != Some(prev_digest) {
849 panic!(
850 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
851 checkpoint.sequence_number(),
852 Some(prev_digest),
853 checkpoint.previous_digest
854 );
855 }
856
857 let latest_checkpoint = self
858 .store
859 .get_highest_verified_checkpoint()
860 .expect("store operation should not fail");
861
862 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
864 return;
865 }
866
867 let checkpoint = *checkpoint;
868 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
869 if *checkpoint.sequence_number() > next_sequence_number {
870 debug!(
871 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
872 next_sequence_number,
873 checkpoint.sequence_number()
874 );
875 }
876
877 #[cfg(debug_assertions)]
880 {
881 let _ = (next_sequence_number..=*checkpoint.sequence_number())
882 .map(|n| {
883 let checkpoint = self
884 .store
885 .get_checkpoint_by_sequence_number(n)
886 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
887 self.store
888 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
889 .unwrap_or_else(|| {
890 panic!(
891 "store should contain checkpoint contents for {:?}",
892 checkpoint.content_digest
893 )
894 });
895 })
896 .collect::<Vec<_>>();
897 }
898
899 if let Some(EndOfEpochData {
906 next_epoch_committee,
907 ..
908 }) = checkpoint.end_of_epoch_data.as_ref()
909 {
910 let next_committee = next_epoch_committee.iter().cloned().collect();
911 let committee =
912 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
913 self.store
914 .insert_committee(committee)
915 .expect("store operation should not fail");
916 }
917
918 self.store
919 .update_highest_verified_checkpoint(&checkpoint)
920 .expect("store operation should not fail");
921 self.store
922 .update_highest_synced_checkpoint(&checkpoint)
923 .expect("store operation should not fail");
924
925 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
927
928 self.spawn_notify_peers_of_checkpoint(checkpoint);
929 }
930
931 fn handle_peer_event(
932 &mut self,
933 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
934 ) {
935 use tokio::sync::broadcast::error::RecvError;
936
937 match peer_event {
938 Ok(PeerEvent::NewPeer(peer_id)) => {
939 self.spawn_get_latest_from_peer(peer_id);
940 }
941 Ok(PeerEvent::LostPeer(peer_id, _)) => {
942 let mut heights = self.peer_heights.write().unwrap();
943 heights.peers.remove(&peer_id);
944 heights.scores.remove(&peer_id);
945 }
946
947 Err(RecvError::Closed) => {
948 panic!("PeerEvent channel shouldn't be able to be closed");
949 }
950
951 Err(RecvError::Lagged(_)) => {
952 trace!("State-Sync fell behind processing PeerEvents");
953 }
954 }
955 }
956
957 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
958 if let Some(peer) = self.network.peer(peer_id) {
959 let genesis_checkpoint_digest = *self
960 .store
961 .get_checkpoint_by_sequence_number(0)
962 .expect("store should contain genesis checkpoint")
963 .digest();
964 let task = get_latest_from_peer(
965 genesis_checkpoint_digest,
966 peer,
967 self.peer_heights.clone(),
968 self.config.timeout(),
969 );
970 self.tasks.spawn(task);
971 }
972 }
973
974 fn handle_tick(&mut self, _now: std::time::Instant) {
975 let task = query_peers_for_their_latest_checkpoint(
976 self.network.clone(),
977 self.peer_heights.clone(),
978 self.weak_sender.clone(),
979 self.config.timeout(),
980 );
981 self.tasks.spawn(task);
982
983 if let Some(layer) = self.download_limit_layer.as_ref() {
984 layer.maybe_prune_map();
985 }
986
987 self.maybe_report_failing_peer();
988 }
989
990 fn maybe_report_failing_peer(&mut self) {
991 let threshold = self.config.peer_disconnect_threshold();
992 if threshold.is_zero() {
993 return;
994 }
995
996 let mut peer_heights = self.peer_heights.write().unwrap();
997 peer_heights.update_failing_states();
998
999 let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
1000 return;
1001 };
1002 if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
1008 score.reset_failing_since();
1009 }
1010 drop(peer_heights);
1011
1012 info!("reporting peer {peer_id} for consistent state sync failures");
1013 if let Some(sender) = &self.discovery_sender {
1014 sender.report_peer_failure(peer_id);
1015 } else {
1016 let _ = self.network.disconnect(peer_id);
1017 }
1018 self.metrics.inc_peers_reported_for_failure();
1019 }
1020
1021 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1022 if self.sync_checkpoint_summaries_task.is_some() {
1024 return;
1025 }
1026
1027 let highest_processed_checkpoint = self
1028 .store
1029 .get_highest_verified_checkpoint()
1030 .expect("store operation should not fail");
1031
1032 let highest_known_sequence_number = self
1033 .peer_heights
1034 .read()
1035 .unwrap()
1036 .highest_known_checkpoint_sequence_number();
1037
1038 if let Some(target_seq) = highest_known_sequence_number
1039 && *highest_processed_checkpoint.sequence_number() < target_seq
1040 {
1041 let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1043 let limited_target = std::cmp::min(
1044 target_seq,
1045 highest_processed_checkpoint
1046 .sequence_number()
1047 .saturating_add(max_batch_size),
1048 );
1049 let was_limited = limited_target < target_seq;
1050
1051 let weak_sender = self.weak_sender.clone();
1053 let task = sync_to_checkpoint(
1054 self.network.clone(),
1055 self.store.clone(),
1056 self.peer_heights.clone(),
1057 self.metrics.clone(),
1058 self.config.pinned_checkpoints.clone(),
1059 self.config.checkpoint_header_download_concurrency(),
1060 self.config.timeout(),
1061 limited_target,
1062 )
1063 .map(move |result| match result {
1064 Ok(()) => {
1065 if was_limited && let Some(sender) = weak_sender.upgrade() {
1068 let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1069 }
1070 }
1071 Err(e) => {
1072 debug!("error syncing checkpoint {e}");
1073 }
1074 });
1075 let task_handle = self.tasks.spawn(task);
1076 self.sync_checkpoint_summaries_task = Some(task_handle);
1077 }
1078 }
1079
1080 fn maybe_trigger_checkpoint_contents_sync_task(
1081 &mut self,
1082 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1083 ) {
1084 let highest_verified_checkpoint = self
1085 .store
1086 .get_highest_verified_checkpoint()
1087 .expect("store operation should not fail");
1088 let highest_synced_checkpoint = self
1089 .store
1090 .get_highest_synced_checkpoint()
1091 .expect("store operation should not fail");
1092
1093 if highest_verified_checkpoint.sequence_number()
1094 > highest_synced_checkpoint.sequence_number()
1095 && self
1097 .peer_heights
1098 .read()
1099 .unwrap()
1100 .highest_known_checkpoint_sequence_number()
1101 > Some(*highest_synced_checkpoint.sequence_number())
1102 {
1103 let _ = target_sequence_channel.send_if_modified(|num| {
1104 let new_num = *highest_verified_checkpoint.sequence_number();
1105 if *num == new_num {
1106 return false;
1107 }
1108 *num = new_num;
1109 true
1110 });
1111 }
1112 }
1113
1114 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1115 let task = notify_peers_of_checkpoint(
1116 self.network.clone(),
1117 self.peer_heights.clone(),
1118 checkpoint,
1119 self.config.timeout(),
1120 );
1121 self.tasks.spawn(task);
1122 }
1123}
1124
1125async fn notify_peers_of_checkpoint(
1126 network: anemo::Network,
1127 peer_heights: Arc<RwLock<PeerHeights>>,
1128 checkpoint: VerifiedCheckpoint,
1129 timeout: Duration,
1130) {
1131 let futs = peer_heights
1132 .read()
1133 .unwrap()
1134 .peers_on_same_chain()
1135 .filter_map(|(peer_id, info)| {
1137 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1138 })
1139 .flat_map(|peer_id| network.peer(*peer_id))
1141 .map(StateSyncClient::new)
1142 .map(|mut client| {
1143 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1144 async move { client.push_checkpoint_summary(request).await }
1145 })
1146 .collect::<Vec<_>>();
1147 futures::future::join_all(futs).await;
1148}
1149
1150async fn get_latest_from_peer(
1151 our_genesis_checkpoint_digest: CheckpointDigest,
1152 peer: anemo::Peer,
1153 peer_heights: Arc<RwLock<PeerHeights>>,
1154 timeout: Duration,
1155) {
1156 let peer_id = peer.peer_id();
1157 let mut client = StateSyncClient::new(peer);
1158
1159 let info = {
1160 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1161
1162 if let Some(info) = maybe_info {
1163 info
1164 } else {
1165 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1170 .with_timeout(timeout);
1171 let response = client
1172 .get_checkpoint_summary(request)
1173 .await
1174 .map(Response::into_inner);
1175
1176 let info = match response {
1177 Ok(Some(checkpoint)) => {
1178 let digest = *checkpoint.digest();
1179 PeerStateSyncInfo {
1180 genesis_checkpoint_digest: digest,
1181 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1182 height: *checkpoint.sequence_number(),
1183 lowest: CheckpointSequenceNumber::default(),
1184 }
1185 }
1186 Ok(None) => PeerStateSyncInfo {
1187 genesis_checkpoint_digest: CheckpointDigest::default(),
1188 on_same_chain_as_us: false,
1189 height: CheckpointSequenceNumber::default(),
1190 lowest: CheckpointSequenceNumber::default(),
1191 },
1192 Err(status) => {
1193 trace!("get_latest_checkpoint_summary request failed: {status:?}");
1194 return;
1195 }
1196 };
1197 peer_heights
1198 .write()
1199 .unwrap()
1200 .insert_peer_info(peer_id, info);
1201 info
1202 }
1203 };
1204
1205 if !info.on_same_chain_as_us {
1207 trace!(?info, "Peer {peer_id} not on same chain as us");
1208 return;
1209 }
1210 let Ok(Some((highest_checkpoint, low_watermark))) =
1211 query_peer_for_latest_info(&mut client, timeout).await
1212 else {
1213 return;
1214 };
1215 peer_heights
1216 .write()
1217 .unwrap()
1218 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1219}
1220
1221async fn query_peer_for_latest_info(
1230 client: &mut StateSyncClient<anemo::Peer>,
1231 timeout: Duration,
1232) -> Result<Option<(Checkpoint, Option<CheckpointSequenceNumber>)>, ()> {
1233 let request = Request::new(()).with_timeout(timeout);
1234 let response = client
1235 .get_checkpoint_availability(request)
1236 .await
1237 .map(Response::into_inner);
1238 match response {
1239 Ok(GetCheckpointAvailabilityResponse {
1240 highest_synced_checkpoint,
1241 lowest_available_checkpoint,
1242 }) => {
1243 return Ok(Some((
1244 highest_synced_checkpoint,
1245 Some(lowest_available_checkpoint),
1246 )));
1247 }
1248 Err(status) => {
1249 if status.status() != anemo::types::response::StatusCode::NotFound {
1251 trace!("get_checkpoint_availability request failed: {status:?}");
1252 return Err(());
1253 }
1254 }
1255 };
1256
1257 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1260 let response = client
1261 .get_checkpoint_summary(request)
1262 .await
1263 .map(Response::into_inner);
1264 match response {
1265 Ok(Some(checkpoint)) => Ok(Some((checkpoint, None))),
1266 Ok(None) => Ok(None),
1267 Err(status) => {
1268 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1269 Err(())
1270 }
1271 }
1272}
1273
1274#[instrument(level = "debug", skip_all)]
1275async fn query_peers_for_their_latest_checkpoint(
1276 network: anemo::Network,
1277 peer_heights: Arc<RwLock<PeerHeights>>,
1278 sender: mpsc::WeakSender<StateSyncMessage>,
1279 timeout: Duration,
1280) {
1281 let peer_heights = &peer_heights;
1282 let futs = peer_heights
1283 .read()
1284 .unwrap()
1285 .peers_on_same_chain()
1286 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1288 .map(|peer| {
1289 let peer_id = peer.peer_id();
1290 let mut client = StateSyncClient::new(peer);
1291
1292 async move {
1293 let start = Instant::now();
1294 let response = query_peer_for_latest_info(&mut client, timeout).await;
1295 let elapsed = start.elapsed();
1296 match response {
1297 Ok(Some((highest_checkpoint, low_watermark))) => {
1298 let size = bcs::serialized_size(&highest_checkpoint).unwrap_or(0) as u64;
1301 let mut peer_heights = peer_heights.write().unwrap();
1302 peer_heights.record_success(peer_id, size, elapsed);
1303 peer_heights
1304 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1305 .then_some(highest_checkpoint)
1306 }
1307 Ok(None) => None,
1309 Err(()) => {
1316 peer_heights.write().unwrap().record_failure(peer_id);
1317 None
1318 }
1319 }
1320 }
1321 })
1322 .collect::<Vec<_>>();
1323
1324 debug!("Query {} peers for latest checkpoint", futs.len());
1325
1326 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1327
1328 let highest_checkpoint_seq = checkpoints
1329 .map(|checkpoint| *checkpoint.sequence_number())
1330 .max();
1331
1332 let our_highest_seq = peer_heights
1333 .read()
1334 .unwrap()
1335 .highest_known_checkpoint_sequence_number();
1336
1337 debug!(
1338 "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1339 );
1340
1341 let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1342 (Some(theirs), None) => theirs,
1343 (Some(theirs), Some(ours)) if theirs > ours => theirs,
1344 _ => return,
1345 };
1346
1347 if let Some(sender) = sender.upgrade() {
1348 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1349 }
1350}
1351
1352async fn sync_to_checkpoint<S>(
1353 network: anemo::Network,
1354 store: S,
1355 peer_heights: Arc<RwLock<PeerHeights>>,
1356 metrics: Metrics,
1357 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1358 checkpoint_header_download_concurrency: usize,
1359 timeout: Duration,
1360 target_sequence_number: CheckpointSequenceNumber,
1361) -> Result<()>
1362where
1363 S: WriteStore,
1364{
1365 metrics.set_highest_known_checkpoint(target_sequence_number);
1366
1367 let mut current = store
1368 .get_highest_verified_checkpoint()
1369 .expect("store operation should not fail");
1370 if *current.sequence_number() >= target_sequence_number {
1371 return Err(anyhow::anyhow!(
1372 "target checkpoint {} is older than highest verified checkpoint {}",
1373 target_sequence_number,
1374 current.sequence_number(),
1375 ));
1376 }
1377
1378 let peer_balancer = PeerBalancer::new(
1379 &network,
1380 peer_heights.clone(),
1381 PeerCheckpointRequestType::Summary,
1382 );
1383 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1385 ..=target_sequence_number)
1386 .map(|next| {
1387 let peers = peer_balancer.clone().with_checkpoint(next);
1388 let peer_heights = peer_heights.clone();
1389 let pinned_checkpoints = &pinned_checkpoints;
1390 async move {
1391 if let Some(checkpoint) = peer_heights
1392 .read()
1393 .unwrap()
1394 .get_checkpoint_by_sequence_number(next)
1395 {
1396 return (Some(checkpoint.to_owned()), next, None);
1397 }
1398
1399 for mut peer in peers {
1402 let peer_id = peer.inner().peer_id();
1403 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1404 .with_timeout(timeout);
1405 let start = Instant::now();
1406 let result = peer.get_checkpoint_summary(request).await;
1407 let elapsed = start.elapsed();
1408
1409 let checkpoint = match result {
1410 Ok(response) => match response.into_inner() {
1411 Some(cp) => Some(cp),
1412 None => {
1413 trace!("peer unable to help sync");
1414 peer_heights.write().unwrap().record_failure(peer_id);
1415 None
1416 }
1417 },
1418 Err(e) => {
1419 trace!("{e:?}");
1420 peer_heights.write().unwrap().record_failure(peer_id);
1421 None
1422 }
1423 };
1424
1425 let Some(checkpoint) = checkpoint else {
1426 continue;
1427 };
1428
1429 let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1430 peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1431
1432 if *checkpoint.sequence_number() != next {
1434 tracing::debug!(
1435 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1436 checkpoint.sequence_number()
1437 );
1438 peer_heights
1439 .write()
1440 .unwrap()
1441 .mark_peer_as_not_on_same_chain(peer_id);
1442 continue;
1443 }
1444
1445 let checkpoint_digest = checkpoint.digest();
1447 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1448 checkpoint.sequence_number(),
1449 |(seq_num, _digest)| *seq_num
1450 )
1451 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1452 {
1453 tracing::debug!(
1454 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1455 pinned_checkpoints[pinned_digest_index].1,
1456 checkpoint_digest
1457 );
1458 peer_heights
1459 .write()
1460 .unwrap()
1461 .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1462 continue;
1463 }
1464
1465 peer_heights
1467 .write()
1468 .unwrap()
1469 .insert_checkpoint(checkpoint.clone());
1470 return (Some(checkpoint), next, Some(peer_id));
1471 }
1472 (None, next, None)
1473 }
1474 })
1475 .pipe(futures::stream::iter)
1476 .buffered(checkpoint_header_download_concurrency);
1477
1478 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1479 assert_eq!(
1480 current
1481 .sequence_number()
1482 .checked_add(1)
1483 .expect("exhausted u64"),
1484 next
1485 );
1486
1487 let checkpoint = 'cp: {
1489 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1490 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1491 })?;
1492 if pinned_checkpoints
1494 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1495 .is_ok()
1496 {
1497 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1498 }
1499 match verify_checkpoint(¤t, &store, checkpoint) {
1500 Ok(verified_checkpoint) => verified_checkpoint,
1501 Err(checkpoint) => {
1502 let mut peer_heights = peer_heights.write().unwrap();
1503 peer_heights.remove_checkpoint(checkpoint.digest());
1506
1507 if let Some(peer_id) = maybe_peer_id {
1509 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1510 }
1511
1512 return Err(anyhow::anyhow!(
1513 "unable to verify checkpoint {checkpoint:?}"
1514 ));
1515 }
1516 }
1517 };
1518
1519 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1520 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1521 metrics.checkpoint_summary_age_metrics()
1522 {
1523 checkpoint.report_checkpoint_age(
1524 checkpoint_summary_age_metric,
1525 checkpoint_summary_age_metric_deprecated,
1526 );
1527 }
1528
1529 store
1532 .insert_checkpoint(&checkpoint)
1533 .expect("store operation should not fail");
1534
1535 current = checkpoint;
1536 }
1537
1538 peer_heights
1539 .write()
1540 .unwrap()
1541 .cleanup_old_checkpoints(*current.sequence_number());
1542
1543 Ok(())
1544}
1545
1546async fn sync_checkpoint_contents_from_archive<S>(
1547 network: anemo::Network,
1548 archive_config: Option<ArchiveReaderConfig>,
1549 store: S,
1550 peer_heights: Arc<RwLock<PeerHeights>>,
1551 metrics: Metrics,
1552) where
1553 S: WriteStore + Clone + Send + Sync + 'static,
1554{
1555 loop {
1556 sync_checkpoint_contents_from_archive_iteration(
1557 &network,
1558 &archive_config,
1559 store.clone(),
1560 peer_heights.clone(),
1561 metrics.clone(),
1562 )
1563 .await;
1564 tokio::time::sleep(Duration::from_secs(5)).await;
1565 }
1566}
1567
1568async fn sync_checkpoint_contents_from_archive_iteration<S>(
1569 network: &anemo::Network,
1570 archive_config: &Option<ArchiveReaderConfig>,
1571 store: S,
1572 peer_heights: Arc<RwLock<PeerHeights>>,
1573 metrics: Metrics,
1574) where
1575 S: WriteStore + Clone + Send + Sync + 'static,
1576{
1577 let peers: Vec<_> = peer_heights
1578 .read()
1579 .unwrap()
1580 .peers_on_same_chain()
1581 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1583 .collect();
1584 let lowest_checkpoint_on_peers = peers
1585 .iter()
1586 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1587 .min();
1588 let highest_synced = store
1589 .get_highest_synced_checkpoint()
1590 .expect("store operation should not fail")
1591 .sequence_number;
1592 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1593 highest_synced < lowest_checkpoint_on_peers
1594 } else {
1595 false
1596 };
1597 debug!(
1598 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1599 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1600 );
1601 if sync_from_archive {
1602 let start = highest_synced
1603 .checked_add(1)
1604 .expect("Checkpoint seq num overflow");
1605 let end = lowest_checkpoint_on_peers.unwrap();
1606
1607 let Some(archive_config) = archive_config else {
1608 warn!("Failed to find an archive reader to complete the state sync request");
1609 return;
1610 };
1611 let Some(ingestion_url) = &archive_config.ingestion_url else {
1612 warn!("Archival ingestion url for state sync is not configured");
1613 return;
1614 };
1615 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1616 warn!("{} can't be used as an archival fallback", ingestion_url);
1617 return;
1618 }
1619 let obj_store = build_object_store(
1620 ingestion_url,
1621 archive_config.remote_store_options.clone(),
1622 archive_config.remote_store_headers.clone(),
1623 );
1624 let mut checkpoint_stream = futures::stream::iter(start..=end)
1625 .map(|seq| {
1626 let obj_store = obj_store.clone();
1627 async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1628 })
1629 .buffered(archive_config.download_concurrency.get());
1630
1631 while let Some((seq, result)) = checkpoint_stream.next().await {
1632 let checkpoint = match result {
1633 Ok(checkpoint) => checkpoint,
1634 Err(err) => {
1635 warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1636 return;
1637 }
1638 };
1639 if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1640 warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1641 return;
1642 }
1643 }
1644 }
1645}
1646
1647async fn sync_checkpoint_contents<S>(
1648 network: anemo::Network,
1649 store: S,
1650 peer_heights: Arc<RwLock<PeerHeights>>,
1651 sender: mpsc::WeakSender<StateSyncMessage>,
1652 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1653 checkpoint_content_download_concurrency: usize,
1654 checkpoint_content_download_tx_concurrency: u64,
1655 use_get_checkpoint_contents_v2: bool,
1656 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1657) where
1658 S: WriteStore + Clone,
1659{
1660 let mut highest_synced = store
1661 .get_highest_synced_checkpoint()
1662 .expect("store operation should not fail");
1663
1664 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1665 let mut target_sequence_cursor = 0;
1666 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1667 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1668
1669 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1670
1671 loop {
1672 tokio::select! {
1673 result = target_sequence_channel.changed() => {
1674 match result {
1675 Ok(()) => {
1676 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1677 }
1678 Err(_) => {
1679 return
1681 }
1682 }
1683 },
1684 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1685 match maybe_checkpoint {
1686 Ok(checkpoint) => {
1687 let _: &VerifiedCheckpoint = &checkpoint; store
1690 .update_highest_synced_checkpoint(&checkpoint)
1691 .expect("store operation should not fail");
1692 let _ = checkpoint_event_sender.send(checkpoint.clone());
1694 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1695 highest_synced = checkpoint;
1696
1697 }
1698 Err(checkpoint) => {
1699 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1701 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1702 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1703 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1704 }
1705 } else {
1706 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1707
1708 }
1709 let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1711 checkpoint.network_total_transactions
1712 } else {
1713 let prev = store
1714 .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1715 .expect("previous checkpoint must exist")
1716 .network_total_transactions;
1717 checkpoint.network_total_transactions - prev
1718 };
1719 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1721 network.clone(),
1722 &store,
1723 peer_heights.clone(),
1724 use_get_checkpoint_contents_v2,
1725 retry_tx_count,
1726 checkpoint,
1727 ));
1728 }
1729 }
1730 },
1731 }
1732
1733 while current_sequence < target_sequence_cursor
1735 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1736 {
1737 let next_checkpoint = store
1738 .get_checkpoint_by_sequence_number(current_sequence)
1739 .unwrap_or_else(|| panic!(
1740 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1741 current_sequence
1742 ));
1743
1744 let tx_count = next_checkpoint.network_total_transactions
1746 - highest_started_network_total_transactions;
1747 if tx_count > tx_concurrency_remaining {
1748 break;
1749 }
1750 tx_concurrency_remaining -= tx_count;
1751
1752 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1753 current_sequence += 1;
1754 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1755 network.clone(),
1756 &store,
1757 peer_heights.clone(),
1758 use_get_checkpoint_contents_v2,
1759 tx_count,
1760 next_checkpoint,
1761 ));
1762 }
1763
1764 if highest_synced
1765 .sequence_number()
1766 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1767 || checkpoint_contents_tasks.is_empty()
1768 {
1769 if let Some(sender) = sender.upgrade() {
1771 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1772 let _ = sender.send(message).await;
1773 }
1774 }
1775 }
1776}
1777
1778#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1779async fn sync_one_checkpoint_contents<S>(
1780 network: anemo::Network,
1781 store: S,
1782 peer_heights: Arc<RwLock<PeerHeights>>,
1783 use_get_checkpoint_contents_v2: bool,
1784 tx_count: u64,
1785 checkpoint: VerifiedCheckpoint,
1786) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1787where
1788 S: WriteStore + Clone,
1789{
1790 let (timeout_min, timeout_max) = {
1791 let ph = peer_heights.read().unwrap();
1792 (
1793 ph.checkpoint_content_timeout_min,
1794 ph.checkpoint_content_timeout_max,
1795 )
1796 };
1797 let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1798 debug!(
1799 "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1800 timeout, tx_count
1801 );
1802
1803 if store
1806 .get_highest_synced_checkpoint()
1807 .expect("store operation should not fail")
1808 .sequence_number()
1809 >= checkpoint.sequence_number()
1810 {
1811 debug!("checkpoint was already created via consensus output");
1812 return Ok(checkpoint);
1813 }
1814
1815 let peers = PeerBalancer::new(
1817 &network,
1818 peer_heights.clone(),
1819 PeerCheckpointRequestType::Content,
1820 )
1821 .with_checkpoint(*checkpoint.sequence_number());
1822 let now = tokio::time::Instant::now();
1823 let Some(_contents) = get_full_checkpoint_contents(
1824 peers,
1825 &store,
1826 peer_heights.clone(),
1827 &checkpoint,
1828 use_get_checkpoint_contents_v2,
1829 timeout,
1830 )
1831 .await
1832 else {
1833 let duration = peer_heights
1835 .read()
1836 .unwrap()
1837 .wait_interval_when_no_peer_to_sync_content();
1838 if now.elapsed() < duration {
1839 let duration = duration - now.elapsed();
1840 info!("retrying checkpoint sync after {:?}", duration);
1841 tokio::time::sleep(duration).await;
1842 }
1843 return Err(checkpoint);
1844 };
1845 debug!("completed checkpoint contents sync");
1846 Ok(checkpoint)
1847}
1848
1849#[instrument(level = "debug", skip_all)]
1850async fn get_full_checkpoint_contents<S>(
1851 peers: PeerBalancer,
1852 store: S,
1853 peer_heights: Arc<RwLock<PeerHeights>>,
1854 checkpoint: &VerifiedCheckpoint,
1855 use_get_checkpoint_contents_v2: bool,
1856 timeout: Duration,
1857) -> Option<VersionedFullCheckpointContents>
1858where
1859 S: WriteStore,
1860{
1861 let sequence_number = checkpoint.sequence_number;
1862 let digest = checkpoint.content_digest;
1863 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1864 debug!("store already contains checkpoint contents");
1865 return Some(contents);
1866 }
1867
1868 for mut peer in peers {
1871 let peer_id = peer.inner().peer_id();
1872 debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1873 let request = Request::new(digest).with_timeout(timeout);
1874 let start = Instant::now();
1875 let result = if use_get_checkpoint_contents_v2 {
1876 peer.get_checkpoint_contents_v2(request).await
1877 } else {
1878 peer.get_checkpoint_contents(request)
1879 .await
1880 .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1881 };
1882 let elapsed = start.elapsed();
1883
1884 let contents = match result {
1885 Ok(response) => match response.into_inner() {
1886 Some(c) => Some(c),
1887 None => {
1888 trace!("peer unable to help sync");
1889 peer_heights.write().unwrap().record_failure(peer_id);
1890 None
1891 }
1892 },
1893 Err(e) => {
1894 trace!("{e:?}");
1895 peer_heights.write().unwrap().record_failure(peer_id);
1896 None
1897 }
1898 };
1899
1900 let Some(contents) = contents else {
1901 continue;
1902 };
1903
1904 if contents.verify_digests(digest).is_ok() {
1905 let size =
1906 bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1907 peer_heights
1908 .write()
1909 .unwrap()
1910 .record_success(peer_id, size, elapsed);
1911 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1912 store
1913 .insert_checkpoint_contents(checkpoint, verified_contents)
1914 .expect("store operation should not fail");
1915 return Some(contents);
1916 }
1917 }
1918 debug!("no peers had checkpoint contents");
1919 None
1920}
1921
1922async fn update_checkpoint_watermark_metrics<S>(
1923 mut recv: oneshot::Receiver<()>,
1924 store: S,
1925 metrics: Metrics,
1926) -> Result<()>
1927where
1928 S: WriteStore + Clone + Send + Sync,
1929{
1930 let mut interval = tokio::time::interval(Duration::from_secs(5));
1931 loop {
1932 tokio::select! {
1933 _now = interval.tick() => {
1934 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1935 .expect("store operation should not fail");
1936 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1937 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1938 .expect("store operation should not fail");
1939 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1940 },
1941 _ = &mut recv => break,
1942 }
1943 }
1944 Ok(())
1945}