1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::process_archive_checkpoint;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::object_store::util::{build_object_store, fetch_checkpoint};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107 metrics: Metrics,
108}
109
110impl Handle {
111 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
120 self.sender
121 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
122 .await
123 .unwrap()
124 }
125
126 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
128 self.checkpoint_event_sender.subscribe()
129 }
130
131 pub fn get_peers_reported_for_failure(&self) -> u64 {
133 self.metrics.get_peers_reported_for_failure()
134 }
135}
136
137pub(super) fn compute_adaptive_timeout(
138 tx_count: u64,
139 min_timeout: Duration,
140 max_timeout: Duration,
141) -> Duration {
142 const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
143 const JITTER_FRACTION: f64 = 0.1;
144
145 let max_timeout = max_timeout.max(min_timeout);
146 let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
147 let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
148 let base = min_timeout + extra;
149
150 let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
151 let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
152 Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
153}
154
155#[cfg_attr(test, derive(Debug))]
156pub(super) struct PeerScore {
157 successes: VecDeque<(Instant, u64 , Duration)>,
158 failures: VecDeque<Instant>,
159 window: Duration,
160 failure_rate: f64,
161 failing_since: Option<Instant>,
162}
163
164impl PeerScore {
165 const MAX_SAMPLES: usize = 20;
166 const MIN_SAMPLES_FOR_FAILURE: usize = 10;
167
168 pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
169 Self {
170 successes: VecDeque::new(),
171 failures: VecDeque::new(),
172 window,
173 failure_rate,
174 failing_since: None,
175 }
176 }
177
178 pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
179 let now = Instant::now();
180 self.successes.push_back((now, size, response_time));
181 while self.successes.len() > Self::MAX_SAMPLES {
182 self.successes.pop_front();
183 }
184 self.failing_since = None;
185 }
186
187 pub(super) fn record_failure(&mut self) {
188 let now = Instant::now();
189 self.failures.push_back(now);
190 while self.failures.len() > Self::MAX_SAMPLES {
191 self.failures.pop_front();
192 }
193 }
194
195 pub(super) fn is_failing(&self) -> bool {
196 let now = Instant::now();
197 let recent_failures = self
198 .failures
199 .iter()
200 .filter(|ts| now.duration_since(**ts) < self.window)
201 .count();
202 let recent_successes = self
203 .successes
204 .iter()
205 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
206 .count();
207
208 let total = recent_failures + recent_successes;
209 if total < Self::MIN_SAMPLES_FOR_FAILURE {
210 return false;
211 }
212
213 let rate = recent_failures as f64 / total as f64;
214 rate >= self.failure_rate
215 }
216
217 pub(super) fn update_failing_state(&mut self) {
218 if self.is_failing() && self.failing_since.is_none() {
219 self.failing_since = Some(Instant::now());
220 }
221 }
222
223 pub(super) fn reset_failing_since(&mut self) {
224 self.failing_since = None;
225 }
226
227 pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
228 match self.failing_since {
229 Some(since) => since.elapsed() >= threshold,
230 None => false,
231 }
232 }
233
234 pub(super) fn effective_throughput(&self) -> Option<f64> {
235 let now = Instant::now();
236 let (total_size, total_time) = self
237 .successes
238 .iter()
239 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
240 .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
241 (size + s, time + *d)
242 });
243
244 if total_size == 0 {
245 return None;
246 }
247
248 if total_time.is_zero() {
249 return None;
250 }
251
252 Some(total_size as f64 / total_time.as_secs_f64())
253 }
254}
255
256struct PeerHeights {
257 peers: HashMap<PeerId, PeerStateSyncInfo>,
259 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
260 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
261 scores: HashMap<PeerId, PeerScore>,
262
263 wait_interval_when_no_peer_to_sync_content: Duration,
264 peer_scoring_window: Duration,
265 peer_failure_rate: f64,
266 checkpoint_content_timeout_min: Duration,
267 checkpoint_content_timeout_max: Duration,
268 exploration_probability: f64,
269}
270
271#[derive(Copy, Clone, Debug, PartialEq, Eq)]
272struct PeerStateSyncInfo {
273 genesis_checkpoint_digest: CheckpointDigest,
275 on_same_chain_as_us: bool,
277 height: CheckpointSequenceNumber,
279 lowest: CheckpointSequenceNumber,
282}
283
284impl PeerHeights {
285 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
286 self.peers
287 .values()
288 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
289 .max()
290 }
291
292 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
293 self.peers
294 .iter()
295 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
296 }
297
298 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
303 pub fn update_peer_info(
304 &mut self,
305 peer_id: PeerId,
306 checkpoint: Checkpoint,
307 low_watermark: Option<CheckpointSequenceNumber>,
308 ) -> bool {
309 debug!("Update peer info");
310
311 let info = match self.peers.get_mut(&peer_id) {
312 Some(info) if info.on_same_chain_as_us => info,
313 _ => return false,
314 };
315
316 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
317 if let Some(low_watermark) = low_watermark {
318 info.lowest = low_watermark;
319 }
320 self.insert_checkpoint(checkpoint);
321
322 true
323 }
324
325 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
326 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
327 use std::collections::hash_map::Entry;
328 debug!("Insert peer info");
329
330 match self.peers.entry(peer_id) {
331 Entry::Occupied(mut entry) => {
332 let entry = entry.get_mut();
335 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
336 entry.height = std::cmp::max(entry.height, info.height);
337 } else {
338 *entry = info;
339 }
340 }
341 Entry::Vacant(entry) => {
342 entry.insert(info);
343 }
344 }
345 }
346
347 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
348 if let Some(info) = self.peers.get_mut(&peer_id) {
349 info.on_same_chain_as_us = false;
350 }
351 self.scores.remove(&peer_id);
352 }
353
354 pub fn update_peer_height(
357 &mut self,
358 peer_id: PeerId,
359 height: CheckpointSequenceNumber,
360 low_watermark: Option<CheckpointSequenceNumber>,
361 ) -> bool {
362 let info = match self.peers.get_mut(&peer_id) {
363 Some(info) if info.on_same_chain_as_us => info,
364 _ => return false,
365 };
366
367 info.height = std::cmp::max(height, info.height);
368 if let Some(low_watermark) = low_watermark {
369 info.lowest = low_watermark;
370 }
371
372 true
373 }
374
375 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
376 self.unprocessed_checkpoints
377 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
378 self.sequence_number_to_digest
379 .retain(|&s, _digest| s > sequence_number);
380 }
381
382 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
387 let digest = *checkpoint.digest();
388 let sequence_number = *checkpoint.sequence_number();
389
390 if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
392 if *existing_digest == digest {
393 return;
395 }
396 tracing::info!(
397 ?sequence_number,
398 ?existing_digest,
399 ?digest,
400 "received checkpoint with same sequence number but different digest, dropping new checkpoint"
401 );
402 return;
403 }
404
405 self.unprocessed_checkpoints.insert(digest, checkpoint);
406 self.sequence_number_to_digest
407 .insert(sequence_number, digest);
408 }
409
410 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
411 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
412 self.sequence_number_to_digest
413 .remove(checkpoint.sequence_number());
414 }
415 }
416
417 pub fn get_checkpoint_by_sequence_number(
418 &self,
419 sequence_number: CheckpointSequenceNumber,
420 ) -> Option<&Checkpoint> {
421 self.sequence_number_to_digest
422 .get(&sequence_number)
423 .and_then(|digest| self.get_checkpoint_by_digest(digest))
424 }
425
426 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
427 self.unprocessed_checkpoints.get(digest)
428 }
429
430 #[cfg(test)]
431 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
432 self.wait_interval_when_no_peer_to_sync_content = duration;
433 }
434
435 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
436 self.wait_interval_when_no_peer_to_sync_content
437 }
438
439 pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
440 if !self.peers.contains_key(&peer_id) {
441 return;
442 }
443 self.scores
444 .entry(peer_id)
445 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
446 .record_success(size, response_time);
447 }
448
449 pub fn record_failure(&mut self, peer_id: PeerId) {
450 if !self.peers.contains_key(&peer_id) {
451 return;
452 }
453 self.scores
454 .entry(peer_id)
455 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
456 .record_failure();
457 }
458
459 pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
460 self.scores
461 .get(peer_id)
462 .and_then(|s| s.effective_throughput())
463 }
464
465 pub fn is_failing(&self, peer_id: &PeerId) -> bool {
466 self.scores
467 .get(peer_id)
468 .map(|s| s.is_failing())
469 .unwrap_or(false)
470 }
471
472 pub fn update_failing_states(&mut self) {
473 for score in self.scores.values_mut() {
474 score.update_failing_state();
475 }
476 }
477
478 pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
479 self.scores
480 .iter()
481 .filter(|(peer_id, score)| {
482 score.consistently_failing(threshold)
483 && self
484 .peers
485 .get(peer_id)
486 .is_some_and(|info| info.on_same_chain_as_us)
487 })
488 .max_by_key(|(_, score)| {
489 score
490 .failing_since
491 .map(|since| since.elapsed())
492 .unwrap_or(Duration::ZERO)
493 })
494 .map(|(peer_id, _)| *peer_id)
495 }
496}
497
498#[derive(Clone)]
503struct PeerBalancer {
504 known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
505 unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
506 failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
507 requested_checkpoint: Option<CheckpointSequenceNumber>,
508 request_type: PeerCheckpointRequestType,
509 exploration_probability: f64,
510}
511
512#[derive(Clone)]
513enum PeerCheckpointRequestType {
514 Summary,
515 Content,
516}
517
518impl PeerBalancer {
519 pub fn new(
520 network: &anemo::Network,
521 peer_heights: Arc<RwLock<PeerHeights>>,
522 request_type: PeerCheckpointRequestType,
523 ) -> Self {
524 let peer_heights_guard = peer_heights.read().unwrap();
525
526 let mut known_peers = Vec::new();
527 let mut unknown_peers = Vec::new();
528 let mut failing_peers = Vec::new();
529 let exploration_probability = peer_heights_guard.exploration_probability;
530
531 for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
532 let Some(peer) = network.peer(*peer_id) else {
533 continue;
534 };
535 let rtt_secs = peer.connection_rtt().as_secs_f64();
536
537 if peer_heights_guard.is_failing(peer_id) {
538 failing_peers.push((peer, *info));
539 } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
540 known_peers.push((peer, *info, throughput));
541 } else {
542 unknown_peers.push((peer, *info, rtt_secs));
543 }
544 }
545 drop(peer_heights_guard);
546
547 unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
548 rtt_a
549 .partial_cmp(rtt_b)
550 .unwrap_or(std::cmp::Ordering::Equal)
551 });
552
553 Self {
554 known_peers,
555 unknown_peers,
556 failing_peers,
557 requested_checkpoint: None,
558 request_type,
559 exploration_probability,
560 }
561 }
562
563 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
564 self.requested_checkpoint = Some(checkpoint);
565 self
566 }
567
568 fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
569 let requested = self.requested_checkpoint.unwrap_or(0);
570 match &self.request_type {
571 PeerCheckpointRequestType::Summary => info.height >= requested,
572 PeerCheckpointRequestType::Content => {
573 info.height >= requested && info.lowest <= requested
574 }
575 }
576 }
577
578 fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
579 let eligible: Vec<_> = self
580 .known_peers
581 .iter()
582 .enumerate()
583 .filter(|(_, (_, info, _))| self.is_eligible(info))
584 .map(|(i, (_, _, t))| (i, *t))
585 .collect();
586
587 if eligible.is_empty() {
588 return None;
589 }
590
591 let total: f64 = eligible.iter().map(|(_, t)| t).sum();
592 let mut pick = rand::thread_rng().gen_range(0.0..total);
593
594 for (idx, throughput) in &eligible {
595 pick -= throughput;
596 if pick <= 0.0 {
597 let (peer, info, _) = self.known_peers.remove(*idx);
598 return Some((peer, info));
599 }
600 }
601
602 let (idx, _) = eligible.last().unwrap();
603 let (peer, info, _) = self.known_peers.remove(*idx);
604 Some((peer, info))
605 }
606
607 fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
608 let pos = self
609 .unknown_peers
610 .iter()
611 .position(|(_, info, _)| self.is_eligible(info))?;
612 let (peer, info, _) = self.unknown_peers.remove(pos);
613 Some((peer, info))
614 }
615
616 fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
617 let pos = self
618 .failing_peers
619 .iter()
620 .position(|(_, info)| self.is_eligible(info))?;
621 Some(self.failing_peers.remove(pos))
622 }
623}
624
625impl Iterator for PeerBalancer {
626 type Item = StateSyncClient<anemo::Peer>;
627
628 fn next(&mut self) -> Option<Self::Item> {
629 let has_eligible_known = self
630 .known_peers
631 .iter()
632 .any(|(_, info, _)| self.is_eligible(info));
633 let has_eligible_unknown = self
634 .unknown_peers
635 .iter()
636 .any(|(_, info, _)| self.is_eligible(info));
637
638 let explore = has_eligible_unknown
639 && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
640
641 if explore && let Some((peer, _)) = self.select_by_rtt() {
642 return Some(StateSyncClient::new(peer));
643 }
644
645 if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
646 return Some(StateSyncClient::new(peer));
647 }
648
649 if let Some((peer, _)) = self.select_failing() {
650 return Some(StateSyncClient::new(peer));
651 }
652
653 None
654 }
655}
656
657#[derive(Clone, Debug)]
658enum StateSyncMessage {
659 StartSyncJob,
660 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
663 SyncedCheckpoint(Box<VerifiedCheckpoint>),
667}
668
669struct StateSyncEventLoop<S> {
670 config: StateSyncConfig,
671
672 mailbox: mpsc::Receiver<StateSyncMessage>,
673 weak_sender: mpsc::WeakSender<StateSyncMessage>,
675
676 tasks: JoinSet<()>,
677 sync_checkpoint_summaries_task: Option<AbortHandle>,
678 sync_checkpoint_contents_task: Option<AbortHandle>,
679 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
680
681 store: S,
682 peer_heights: Arc<RwLock<PeerHeights>>,
683 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
684 network: anemo::Network,
685 metrics: Metrics,
686
687 sync_checkpoint_from_archive_task: Option<AbortHandle>,
688 archive_config: Option<ArchiveReaderConfig>,
689 discovery_sender: Option<crate::discovery::Sender>,
690}
691
692impl<S> StateSyncEventLoop<S>
693where
694 S: WriteStore + Clone + Send + Sync + 'static,
695{
696 pub async fn start(mut self) {
701 info!("State-Synchronizer started");
702
703 self.config.pinned_checkpoints.sort();
704
705 let mut interval = tokio::time::interval(self.config.interval_period());
706 let mut peer_events = {
707 let (subscriber, peers) = self.network.subscribe().unwrap();
708 for peer_id in peers {
709 self.spawn_get_latest_from_peer(peer_id);
710 }
711 subscriber
712 };
713 let (
714 target_checkpoint_contents_sequence_sender,
715 target_checkpoint_contents_sequence_receiver,
716 ) = watch::channel(0);
717
718 let (_sender, receiver) = oneshot::channel();
720 tokio::spawn(update_checkpoint_watermark_metrics(
721 receiver,
722 self.store.clone(),
723 self.metrics.clone(),
724 ));
725
726 let task = sync_checkpoint_contents(
728 self.network.clone(),
729 self.store.clone(),
730 self.peer_heights.clone(),
731 self.weak_sender.clone(),
732 self.checkpoint_event_sender.clone(),
733 self.config.checkpoint_content_download_concurrency(),
734 self.config.checkpoint_content_download_tx_concurrency(),
735 self.config.use_get_checkpoint_contents_v2(),
736 target_checkpoint_contents_sequence_receiver,
737 );
738 let task_handle = self.tasks.spawn(task);
739 self.sync_checkpoint_contents_task = Some(task_handle);
740
741 let task = sync_checkpoint_contents_from_archive(
749 self.network.clone(),
750 self.archive_config.clone(),
751 self.store.clone(),
752 self.peer_heights.clone(),
753 self.metrics.clone(),
754 );
755 let task_handle = self.tasks.spawn(task);
756 self.sync_checkpoint_from_archive_task = Some(task_handle);
757
758 loop {
760 tokio::select! {
761 now = interval.tick() => {
762 self.handle_tick(now.into_std());
763 },
764 maybe_message = self.mailbox.recv() => {
765 if let Some(message) = maybe_message {
768 self.handle_message(message);
769 } else {
770 break;
771 }
772 },
773 peer_event = peer_events.recv() => {
774 self.handle_peer_event(peer_event);
775 },
776 Some(task_result) = self.tasks.join_next() => {
777 match task_result {
778 Ok(()) => {},
779 Err(e) => {
780 if e.is_cancelled() {
781 } else if e.is_panic() {
783 std::panic::resume_unwind(e.into_panic());
785 } else {
786 panic!("task failed: {e}");
787 }
788 },
789 };
790
791 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
792 panic!("sync_checkpoint_contents task unexpectedly terminated")
793 }
794
795 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
796 self.sync_checkpoint_summaries_task = None;
797 }
798
799 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
800 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
801 }
802 },
803 }
804
805 self.maybe_start_checkpoint_summary_sync_task();
806 self.maybe_trigger_checkpoint_contents_sync_task(
807 &target_checkpoint_contents_sequence_sender,
808 );
809 }
810
811 info!("State-Synchronizer ended");
812 }
813
814 fn handle_message(&mut self, message: StateSyncMessage) {
815 debug!("Received message: {:?}", message);
816 match message {
817 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
818 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
819 self.handle_checkpoint_from_consensus(checkpoint)
820 }
821 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
823 self.spawn_notify_peers_of_checkpoint(*checkpoint)
824 }
825 }
826 }
827
828 #[instrument(level = "debug", skip_all)]
830 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
831 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
834 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
835 .digest();
836 if checkpoint.previous_digest != Some(prev_digest) {
837 panic!(
838 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
839 checkpoint.sequence_number(),
840 Some(prev_digest),
841 checkpoint.previous_digest
842 );
843 }
844
845 let latest_checkpoint = self
846 .store
847 .get_highest_verified_checkpoint()
848 .expect("store operation should not fail");
849
850 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
852 return;
853 }
854
855 let checkpoint = *checkpoint;
856 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
857 if *checkpoint.sequence_number() > next_sequence_number {
858 debug!(
859 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
860 next_sequence_number,
861 checkpoint.sequence_number()
862 );
863 }
864
865 #[cfg(debug_assertions)]
868 {
869 let _ = (next_sequence_number..=*checkpoint.sequence_number())
870 .map(|n| {
871 let checkpoint = self
872 .store
873 .get_checkpoint_by_sequence_number(n)
874 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
875 self.store
876 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
877 .unwrap_or_else(|| {
878 panic!(
879 "store should contain checkpoint contents for {:?}",
880 checkpoint.content_digest
881 )
882 });
883 })
884 .collect::<Vec<_>>();
885 }
886
887 if let Some(EndOfEpochData {
894 next_epoch_committee,
895 ..
896 }) = checkpoint.end_of_epoch_data.as_ref()
897 {
898 let next_committee = next_epoch_committee.iter().cloned().collect();
899 let committee =
900 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
901 self.store
902 .insert_committee(committee)
903 .expect("store operation should not fail");
904 }
905
906 self.store
907 .update_highest_verified_checkpoint(&checkpoint)
908 .expect("store operation should not fail");
909 self.store
910 .update_highest_synced_checkpoint(&checkpoint)
911 .expect("store operation should not fail");
912
913 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
915
916 self.spawn_notify_peers_of_checkpoint(checkpoint);
917 }
918
919 fn handle_peer_event(
920 &mut self,
921 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
922 ) {
923 use tokio::sync::broadcast::error::RecvError;
924
925 match peer_event {
926 Ok(PeerEvent::NewPeer(peer_id)) => {
927 self.spawn_get_latest_from_peer(peer_id);
928 }
929 Ok(PeerEvent::LostPeer(peer_id, _)) => {
930 let mut heights = self.peer_heights.write().unwrap();
931 heights.peers.remove(&peer_id);
932 heights.scores.remove(&peer_id);
933 }
934
935 Err(RecvError::Closed) => {
936 panic!("PeerEvent channel shouldn't be able to be closed");
937 }
938
939 Err(RecvError::Lagged(_)) => {
940 trace!("State-Sync fell behind processing PeerEvents");
941 }
942 }
943 }
944
945 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
946 if let Some(peer) = self.network.peer(peer_id) {
947 let genesis_checkpoint_digest = *self
948 .store
949 .get_checkpoint_by_sequence_number(0)
950 .expect("store should contain genesis checkpoint")
951 .digest();
952 let task = get_latest_from_peer(
953 genesis_checkpoint_digest,
954 peer,
955 self.peer_heights.clone(),
956 self.config.timeout(),
957 );
958 self.tasks.spawn(task);
959 }
960 }
961
962 fn handle_tick(&mut self, _now: std::time::Instant) {
963 let task = query_peers_for_their_latest_checkpoint(
964 self.network.clone(),
965 self.peer_heights.clone(),
966 self.weak_sender.clone(),
967 self.config.timeout(),
968 );
969 self.tasks.spawn(task);
970
971 if let Some(layer) = self.download_limit_layer.as_ref() {
972 layer.maybe_prune_map();
973 }
974
975 self.maybe_report_failing_peer();
976 }
977
978 fn maybe_report_failing_peer(&mut self) {
979 let threshold = self.config.peer_disconnect_threshold();
980 if threshold.is_zero() {
981 return;
982 }
983
984 let mut peer_heights = self.peer_heights.write().unwrap();
985 peer_heights.update_failing_states();
986
987 let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
988 return;
989 };
990 if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
996 score.reset_failing_since();
997 }
998 drop(peer_heights);
999
1000 info!("reporting peer {peer_id} for consistent state sync failures");
1001 if let Some(sender) = &self.discovery_sender {
1002 sender.report_peer_failure(peer_id);
1003 } else {
1004 let _ = self.network.disconnect(peer_id);
1005 }
1006 self.metrics.inc_peers_reported_for_failure();
1007 }
1008
1009 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1010 if self.sync_checkpoint_summaries_task.is_some() {
1012 return;
1013 }
1014
1015 let highest_processed_checkpoint = self
1016 .store
1017 .get_highest_verified_checkpoint()
1018 .expect("store operation should not fail");
1019
1020 let highest_known_sequence_number = self
1021 .peer_heights
1022 .read()
1023 .unwrap()
1024 .highest_known_checkpoint_sequence_number();
1025
1026 if let Some(target_seq) = highest_known_sequence_number
1027 && *highest_processed_checkpoint.sequence_number() < target_seq
1028 {
1029 let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1031 let limited_target = std::cmp::min(
1032 target_seq,
1033 highest_processed_checkpoint
1034 .sequence_number()
1035 .saturating_add(max_batch_size),
1036 );
1037 let was_limited = limited_target < target_seq;
1038
1039 let weak_sender = self.weak_sender.clone();
1041 let task = sync_to_checkpoint(
1042 self.network.clone(),
1043 self.store.clone(),
1044 self.peer_heights.clone(),
1045 self.metrics.clone(),
1046 self.config.pinned_checkpoints.clone(),
1047 self.config.checkpoint_header_download_concurrency(),
1048 self.config.timeout(),
1049 limited_target,
1050 )
1051 .map(move |result| match result {
1052 Ok(()) => {
1053 if was_limited && let Some(sender) = weak_sender.upgrade() {
1056 let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1057 }
1058 }
1059 Err(e) => {
1060 debug!("error syncing checkpoint {e}");
1061 }
1062 });
1063 let task_handle = self.tasks.spawn(task);
1064 self.sync_checkpoint_summaries_task = Some(task_handle);
1065 }
1066 }
1067
1068 fn maybe_trigger_checkpoint_contents_sync_task(
1069 &mut self,
1070 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1071 ) {
1072 let highest_verified_checkpoint = self
1073 .store
1074 .get_highest_verified_checkpoint()
1075 .expect("store operation should not fail");
1076 let highest_synced_checkpoint = self
1077 .store
1078 .get_highest_synced_checkpoint()
1079 .expect("store operation should not fail");
1080
1081 if highest_verified_checkpoint.sequence_number()
1082 > highest_synced_checkpoint.sequence_number()
1083 && self
1085 .peer_heights
1086 .read()
1087 .unwrap()
1088 .highest_known_checkpoint_sequence_number()
1089 > Some(*highest_synced_checkpoint.sequence_number())
1090 {
1091 let _ = target_sequence_channel.send_if_modified(|num| {
1092 let new_num = *highest_verified_checkpoint.sequence_number();
1093 if *num == new_num {
1094 return false;
1095 }
1096 *num = new_num;
1097 true
1098 });
1099 }
1100 }
1101
1102 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1103 let task = notify_peers_of_checkpoint(
1104 self.network.clone(),
1105 self.peer_heights.clone(),
1106 checkpoint,
1107 self.config.timeout(),
1108 );
1109 self.tasks.spawn(task);
1110 }
1111}
1112
1113async fn notify_peers_of_checkpoint(
1114 network: anemo::Network,
1115 peer_heights: Arc<RwLock<PeerHeights>>,
1116 checkpoint: VerifiedCheckpoint,
1117 timeout: Duration,
1118) {
1119 let futs = peer_heights
1120 .read()
1121 .unwrap()
1122 .peers_on_same_chain()
1123 .filter_map(|(peer_id, info)| {
1125 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1126 })
1127 .flat_map(|peer_id| network.peer(*peer_id))
1129 .map(StateSyncClient::new)
1130 .map(|mut client| {
1131 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1132 async move { client.push_checkpoint_summary(request).await }
1133 })
1134 .collect::<Vec<_>>();
1135 futures::future::join_all(futs).await;
1136}
1137
1138async fn get_latest_from_peer(
1139 our_genesis_checkpoint_digest: CheckpointDigest,
1140 peer: anemo::Peer,
1141 peer_heights: Arc<RwLock<PeerHeights>>,
1142 timeout: Duration,
1143) {
1144 let peer_id = peer.peer_id();
1145 let mut client = StateSyncClient::new(peer);
1146
1147 let info = {
1148 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1149
1150 if let Some(info) = maybe_info {
1151 info
1152 } else {
1153 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1158 .with_timeout(timeout);
1159 let response = client
1160 .get_checkpoint_summary(request)
1161 .await
1162 .map(Response::into_inner);
1163
1164 let info = match response {
1165 Ok(Some(checkpoint)) => {
1166 let digest = *checkpoint.digest();
1167 PeerStateSyncInfo {
1168 genesis_checkpoint_digest: digest,
1169 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1170 height: *checkpoint.sequence_number(),
1171 lowest: CheckpointSequenceNumber::default(),
1172 }
1173 }
1174 Ok(None) => PeerStateSyncInfo {
1175 genesis_checkpoint_digest: CheckpointDigest::default(),
1176 on_same_chain_as_us: false,
1177 height: CheckpointSequenceNumber::default(),
1178 lowest: CheckpointSequenceNumber::default(),
1179 },
1180 Err(status) => {
1181 trace!("get_latest_checkpoint_summary request failed: {status:?}");
1182 return;
1183 }
1184 };
1185 peer_heights
1186 .write()
1187 .unwrap()
1188 .insert_peer_info(peer_id, info);
1189 info
1190 }
1191 };
1192
1193 if !info.on_same_chain_as_us {
1195 trace!(?info, "Peer {peer_id} not on same chain as us");
1196 return;
1197 }
1198 let Some((highest_checkpoint, low_watermark)) =
1199 query_peer_for_latest_info(&mut client, timeout).await
1200 else {
1201 return;
1202 };
1203 peer_heights
1204 .write()
1205 .unwrap()
1206 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1207}
1208
1209async fn query_peer_for_latest_info(
1211 client: &mut StateSyncClient<anemo::Peer>,
1212 timeout: Duration,
1213) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1214 let request = Request::new(()).with_timeout(timeout);
1215 let response = client
1216 .get_checkpoint_availability(request)
1217 .await
1218 .map(Response::into_inner);
1219 match response {
1220 Ok(GetCheckpointAvailabilityResponse {
1221 highest_synced_checkpoint,
1222 lowest_available_checkpoint,
1223 }) => {
1224 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1225 }
1226 Err(status) => {
1227 if status.status() != anemo::types::response::StatusCode::NotFound {
1229 trace!("get_checkpoint_availability request failed: {status:?}");
1230 return None;
1231 }
1232 }
1233 };
1234
1235 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1238 let response = client
1239 .get_checkpoint_summary(request)
1240 .await
1241 .map(Response::into_inner);
1242 match response {
1243 Ok(Some(checkpoint)) => Some((checkpoint, None)),
1244 Ok(None) => None,
1245 Err(status) => {
1246 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1247 None
1248 }
1249 }
1250}
1251
1252#[instrument(level = "debug", skip_all)]
1253async fn query_peers_for_their_latest_checkpoint(
1254 network: anemo::Network,
1255 peer_heights: Arc<RwLock<PeerHeights>>,
1256 sender: mpsc::WeakSender<StateSyncMessage>,
1257 timeout: Duration,
1258) {
1259 let peer_heights = &peer_heights;
1260 let futs = peer_heights
1261 .read()
1262 .unwrap()
1263 .peers_on_same_chain()
1264 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1266 .map(|peer| {
1267 let peer_id = peer.peer_id();
1268 let mut client = StateSyncClient::new(peer);
1269
1270 async move {
1271 let response = query_peer_for_latest_info(&mut client, timeout).await;
1272 match response {
1273 Some((highest_checkpoint, low_watermark)) => peer_heights
1274 .write()
1275 .unwrap()
1276 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1277 .then_some(highest_checkpoint),
1278 None => None,
1279 }
1280 }
1281 })
1282 .collect::<Vec<_>>();
1283
1284 debug!("Query {} peers for latest checkpoint", futs.len());
1285
1286 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1287
1288 let highest_checkpoint_seq = checkpoints
1289 .map(|checkpoint| *checkpoint.sequence_number())
1290 .max();
1291
1292 let our_highest_seq = peer_heights
1293 .read()
1294 .unwrap()
1295 .highest_known_checkpoint_sequence_number();
1296
1297 debug!(
1298 "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1299 );
1300
1301 let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1302 (Some(theirs), None) => theirs,
1303 (Some(theirs), Some(ours)) if theirs > ours => theirs,
1304 _ => return,
1305 };
1306
1307 if let Some(sender) = sender.upgrade() {
1308 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1309 }
1310}
1311
1312async fn sync_to_checkpoint<S>(
1313 network: anemo::Network,
1314 store: S,
1315 peer_heights: Arc<RwLock<PeerHeights>>,
1316 metrics: Metrics,
1317 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1318 checkpoint_header_download_concurrency: usize,
1319 timeout: Duration,
1320 target_sequence_number: CheckpointSequenceNumber,
1321) -> Result<()>
1322where
1323 S: WriteStore,
1324{
1325 metrics.set_highest_known_checkpoint(target_sequence_number);
1326
1327 let mut current = store
1328 .get_highest_verified_checkpoint()
1329 .expect("store operation should not fail");
1330 if *current.sequence_number() >= target_sequence_number {
1331 return Err(anyhow::anyhow!(
1332 "target checkpoint {} is older than highest verified checkpoint {}",
1333 target_sequence_number,
1334 current.sequence_number(),
1335 ));
1336 }
1337
1338 let peer_balancer = PeerBalancer::new(
1339 &network,
1340 peer_heights.clone(),
1341 PeerCheckpointRequestType::Summary,
1342 );
1343 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1345 ..=target_sequence_number)
1346 .map(|next| {
1347 let peers = peer_balancer.clone().with_checkpoint(next);
1348 let peer_heights = peer_heights.clone();
1349 let pinned_checkpoints = &pinned_checkpoints;
1350 async move {
1351 if let Some(checkpoint) = peer_heights
1352 .read()
1353 .unwrap()
1354 .get_checkpoint_by_sequence_number(next)
1355 {
1356 return (Some(checkpoint.to_owned()), next, None);
1357 }
1358
1359 for mut peer in peers {
1362 let peer_id = peer.inner().peer_id();
1363 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1364 .with_timeout(timeout);
1365 let start = Instant::now();
1366 let result = peer.get_checkpoint_summary(request).await;
1367 let elapsed = start.elapsed();
1368
1369 let checkpoint = match result {
1370 Ok(response) => match response.into_inner() {
1371 Some(cp) => Some(cp),
1372 None => {
1373 trace!("peer unable to help sync");
1374 peer_heights.write().unwrap().record_failure(peer_id);
1375 None
1376 }
1377 },
1378 Err(e) => {
1379 trace!("{e:?}");
1380 peer_heights.write().unwrap().record_failure(peer_id);
1381 None
1382 }
1383 };
1384
1385 let Some(checkpoint) = checkpoint else {
1386 continue;
1387 };
1388
1389 let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1390 peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1391
1392 if *checkpoint.sequence_number() != next {
1394 tracing::debug!(
1395 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1396 checkpoint.sequence_number()
1397 );
1398 peer_heights
1399 .write()
1400 .unwrap()
1401 .mark_peer_as_not_on_same_chain(peer_id);
1402 continue;
1403 }
1404
1405 let checkpoint_digest = checkpoint.digest();
1407 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1408 checkpoint.sequence_number(),
1409 |(seq_num, _digest)| *seq_num
1410 )
1411 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1412 {
1413 tracing::debug!(
1414 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1415 pinned_checkpoints[pinned_digest_index].1,
1416 checkpoint_digest
1417 );
1418 peer_heights
1419 .write()
1420 .unwrap()
1421 .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1422 continue;
1423 }
1424
1425 peer_heights
1427 .write()
1428 .unwrap()
1429 .insert_checkpoint(checkpoint.clone());
1430 return (Some(checkpoint), next, Some(peer_id));
1431 }
1432 (None, next, None)
1433 }
1434 })
1435 .pipe(futures::stream::iter)
1436 .buffered(checkpoint_header_download_concurrency);
1437
1438 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1439 assert_eq!(
1440 current
1441 .sequence_number()
1442 .checked_add(1)
1443 .expect("exhausted u64"),
1444 next
1445 );
1446
1447 let checkpoint = 'cp: {
1449 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1450 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1451 })?;
1452 if pinned_checkpoints
1454 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1455 .is_ok()
1456 {
1457 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1458 }
1459 match verify_checkpoint(¤t, &store, checkpoint) {
1460 Ok(verified_checkpoint) => verified_checkpoint,
1461 Err(checkpoint) => {
1462 let mut peer_heights = peer_heights.write().unwrap();
1463 peer_heights.remove_checkpoint(checkpoint.digest());
1466
1467 if let Some(peer_id) = maybe_peer_id {
1469 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1470 }
1471
1472 return Err(anyhow::anyhow!(
1473 "unable to verify checkpoint {checkpoint:?}"
1474 ));
1475 }
1476 }
1477 };
1478
1479 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1480 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1481 metrics.checkpoint_summary_age_metrics()
1482 {
1483 checkpoint.report_checkpoint_age(
1484 checkpoint_summary_age_metric,
1485 checkpoint_summary_age_metric_deprecated,
1486 );
1487 }
1488
1489 store
1492 .insert_checkpoint(&checkpoint)
1493 .expect("store operation should not fail");
1494
1495 current = checkpoint;
1496 }
1497
1498 peer_heights
1499 .write()
1500 .unwrap()
1501 .cleanup_old_checkpoints(*current.sequence_number());
1502
1503 Ok(())
1504}
1505
1506async fn sync_checkpoint_contents_from_archive<S>(
1507 network: anemo::Network,
1508 archive_config: Option<ArchiveReaderConfig>,
1509 store: S,
1510 peer_heights: Arc<RwLock<PeerHeights>>,
1511 metrics: Metrics,
1512) where
1513 S: WriteStore + Clone + Send + Sync + 'static,
1514{
1515 loop {
1516 sync_checkpoint_contents_from_archive_iteration(
1517 &network,
1518 &archive_config,
1519 store.clone(),
1520 peer_heights.clone(),
1521 metrics.clone(),
1522 )
1523 .await;
1524 tokio::time::sleep(Duration::from_secs(5)).await;
1525 }
1526}
1527
1528async fn sync_checkpoint_contents_from_archive_iteration<S>(
1529 network: &anemo::Network,
1530 archive_config: &Option<ArchiveReaderConfig>,
1531 store: S,
1532 peer_heights: Arc<RwLock<PeerHeights>>,
1533 metrics: Metrics,
1534) where
1535 S: WriteStore + Clone + Send + Sync + 'static,
1536{
1537 let peers: Vec<_> = peer_heights
1538 .read()
1539 .unwrap()
1540 .peers_on_same_chain()
1541 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1543 .collect();
1544 let lowest_checkpoint_on_peers = peers
1545 .iter()
1546 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1547 .min();
1548 let highest_synced = store
1549 .get_highest_synced_checkpoint()
1550 .expect("store operation should not fail")
1551 .sequence_number;
1552 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1553 highest_synced < lowest_checkpoint_on_peers
1554 } else {
1555 false
1556 };
1557 debug!(
1558 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1559 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1560 );
1561 if sync_from_archive {
1562 let start = highest_synced
1563 .checked_add(1)
1564 .expect("Checkpoint seq num overflow");
1565 let end = lowest_checkpoint_on_peers.unwrap();
1566
1567 let Some(archive_config) = archive_config else {
1568 warn!("Failed to find an archive reader to complete the state sync request");
1569 return;
1570 };
1571 let Some(ingestion_url) = &archive_config.ingestion_url else {
1572 warn!("Archival ingestion url for state sync is not configured");
1573 return;
1574 };
1575 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1576 warn!("{} can't be used as an archival fallback", ingestion_url);
1577 return;
1578 }
1579 let obj_store =
1580 build_object_store(ingestion_url, archive_config.remote_store_options.clone());
1581 let mut checkpoint_stream = futures::stream::iter(start..=end)
1582 .map(|seq| {
1583 let obj_store = obj_store.clone();
1584 async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1585 })
1586 .buffered(archive_config.download_concurrency.get());
1587
1588 while let Some((seq, result)) = checkpoint_stream.next().await {
1589 let checkpoint = match result {
1590 Ok(checkpoint) => checkpoint,
1591 Err(err) => {
1592 warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1593 return;
1594 }
1595 };
1596 if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1597 warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1598 return;
1599 }
1600 }
1601 }
1602}
1603
1604async fn sync_checkpoint_contents<S>(
1605 network: anemo::Network,
1606 store: S,
1607 peer_heights: Arc<RwLock<PeerHeights>>,
1608 sender: mpsc::WeakSender<StateSyncMessage>,
1609 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1610 checkpoint_content_download_concurrency: usize,
1611 checkpoint_content_download_tx_concurrency: u64,
1612 use_get_checkpoint_contents_v2: bool,
1613 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1614) where
1615 S: WriteStore + Clone,
1616{
1617 let mut highest_synced = store
1618 .get_highest_synced_checkpoint()
1619 .expect("store operation should not fail");
1620
1621 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1622 let mut target_sequence_cursor = 0;
1623 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1624 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1625
1626 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1627
1628 loop {
1629 tokio::select! {
1630 result = target_sequence_channel.changed() => {
1631 match result {
1632 Ok(()) => {
1633 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1634 }
1635 Err(_) => {
1636 return
1638 }
1639 }
1640 },
1641 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1642 match maybe_checkpoint {
1643 Ok(checkpoint) => {
1644 let _: &VerifiedCheckpoint = &checkpoint; store
1647 .update_highest_synced_checkpoint(&checkpoint)
1648 .expect("store operation should not fail");
1649 let _ = checkpoint_event_sender.send(checkpoint.clone());
1651 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1652 highest_synced = checkpoint;
1653
1654 }
1655 Err(checkpoint) => {
1656 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1658 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1659 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1660 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1661 }
1662 } else {
1663 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1664
1665 }
1666 let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1668 checkpoint.network_total_transactions
1669 } else {
1670 let prev = store
1671 .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1672 .expect("previous checkpoint must exist")
1673 .network_total_transactions;
1674 checkpoint.network_total_transactions - prev
1675 };
1676 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1678 network.clone(),
1679 &store,
1680 peer_heights.clone(),
1681 use_get_checkpoint_contents_v2,
1682 retry_tx_count,
1683 checkpoint,
1684 ));
1685 }
1686 }
1687 },
1688 }
1689
1690 while current_sequence < target_sequence_cursor
1692 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1693 {
1694 let next_checkpoint = store
1695 .get_checkpoint_by_sequence_number(current_sequence)
1696 .unwrap_or_else(|| panic!(
1697 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1698 current_sequence
1699 ));
1700
1701 let tx_count = next_checkpoint.network_total_transactions
1703 - highest_started_network_total_transactions;
1704 if tx_count > tx_concurrency_remaining {
1705 break;
1706 }
1707 tx_concurrency_remaining -= tx_count;
1708
1709 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1710 current_sequence += 1;
1711 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1712 network.clone(),
1713 &store,
1714 peer_heights.clone(),
1715 use_get_checkpoint_contents_v2,
1716 tx_count,
1717 next_checkpoint,
1718 ));
1719 }
1720
1721 if highest_synced
1722 .sequence_number()
1723 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1724 || checkpoint_contents_tasks.is_empty()
1725 {
1726 if let Some(sender) = sender.upgrade() {
1728 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1729 let _ = sender.send(message).await;
1730 }
1731 }
1732 }
1733}
1734
1735#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1736async fn sync_one_checkpoint_contents<S>(
1737 network: anemo::Network,
1738 store: S,
1739 peer_heights: Arc<RwLock<PeerHeights>>,
1740 use_get_checkpoint_contents_v2: bool,
1741 tx_count: u64,
1742 checkpoint: VerifiedCheckpoint,
1743) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1744where
1745 S: WriteStore + Clone,
1746{
1747 let (timeout_min, timeout_max) = {
1748 let ph = peer_heights.read().unwrap();
1749 (
1750 ph.checkpoint_content_timeout_min,
1751 ph.checkpoint_content_timeout_max,
1752 )
1753 };
1754 let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1755 debug!(
1756 "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1757 timeout, tx_count
1758 );
1759
1760 if store
1763 .get_highest_synced_checkpoint()
1764 .expect("store operation should not fail")
1765 .sequence_number()
1766 >= checkpoint.sequence_number()
1767 {
1768 debug!("checkpoint was already created via consensus output");
1769 return Ok(checkpoint);
1770 }
1771
1772 let peers = PeerBalancer::new(
1774 &network,
1775 peer_heights.clone(),
1776 PeerCheckpointRequestType::Content,
1777 )
1778 .with_checkpoint(*checkpoint.sequence_number());
1779 let now = tokio::time::Instant::now();
1780 let Some(_contents) = get_full_checkpoint_contents(
1781 peers,
1782 &store,
1783 peer_heights.clone(),
1784 &checkpoint,
1785 use_get_checkpoint_contents_v2,
1786 timeout,
1787 )
1788 .await
1789 else {
1790 let duration = peer_heights
1792 .read()
1793 .unwrap()
1794 .wait_interval_when_no_peer_to_sync_content();
1795 if now.elapsed() < duration {
1796 let duration = duration - now.elapsed();
1797 info!("retrying checkpoint sync after {:?}", duration);
1798 tokio::time::sleep(duration).await;
1799 }
1800 return Err(checkpoint);
1801 };
1802 debug!("completed checkpoint contents sync");
1803 Ok(checkpoint)
1804}
1805
1806#[instrument(level = "debug", skip_all)]
1807async fn get_full_checkpoint_contents<S>(
1808 peers: PeerBalancer,
1809 store: S,
1810 peer_heights: Arc<RwLock<PeerHeights>>,
1811 checkpoint: &VerifiedCheckpoint,
1812 use_get_checkpoint_contents_v2: bool,
1813 timeout: Duration,
1814) -> Option<VersionedFullCheckpointContents>
1815where
1816 S: WriteStore,
1817{
1818 let sequence_number = checkpoint.sequence_number;
1819 let digest = checkpoint.content_digest;
1820 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1821 debug!("store already contains checkpoint contents");
1822 return Some(contents);
1823 }
1824
1825 for mut peer in peers {
1828 let peer_id = peer.inner().peer_id();
1829 debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1830 let request = Request::new(digest).with_timeout(timeout);
1831 let start = Instant::now();
1832 let result = if use_get_checkpoint_contents_v2 {
1833 peer.get_checkpoint_contents_v2(request).await
1834 } else {
1835 peer.get_checkpoint_contents(request)
1836 .await
1837 .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1838 };
1839 let elapsed = start.elapsed();
1840
1841 let contents = match result {
1842 Ok(response) => match response.into_inner() {
1843 Some(c) => Some(c),
1844 None => {
1845 trace!("peer unable to help sync");
1846 peer_heights.write().unwrap().record_failure(peer_id);
1847 None
1848 }
1849 },
1850 Err(e) => {
1851 trace!("{e:?}");
1852 peer_heights.write().unwrap().record_failure(peer_id);
1853 None
1854 }
1855 };
1856
1857 let Some(contents) = contents else {
1858 continue;
1859 };
1860
1861 if contents.verify_digests(digest).is_ok() {
1862 let size =
1863 bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1864 peer_heights
1865 .write()
1866 .unwrap()
1867 .record_success(peer_id, size, elapsed);
1868 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1869 store
1870 .insert_checkpoint_contents(checkpoint, verified_contents)
1871 .expect("store operation should not fail");
1872 return Some(contents);
1873 }
1874 }
1875 debug!("no peers had checkpoint contents");
1876 None
1877}
1878
1879async fn update_checkpoint_watermark_metrics<S>(
1880 mut recv: oneshot::Receiver<()>,
1881 store: S,
1882 metrics: Metrics,
1883) -> Result<()>
1884where
1885 S: WriteStore + Clone + Send + Sync,
1886{
1887 let mut interval = tokio::time::interval(Duration::from_secs(5));
1888 loop {
1889 tokio::select! {
1890 _now = interval.tick() => {
1891 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1892 .expect("store operation should not fail");
1893 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1894 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1895 .expect("store operation should not fail");
1896 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1897 },
1898 _ = &mut recv => break,
1899 }
1900 }
1901 Ok(())
1902}