1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::{Duration, Instant},
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::Pipe;
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::{build_object_store, fetch_checkpoint, process_archive_checkpoint};
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_storage::verify_checkpoint;
97
98#[derive(Clone, Debug)]
103pub struct Handle {
104 sender: mpsc::Sender<StateSyncMessage>,
105 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
106 metrics: Metrics,
107}
108
109impl Handle {
110 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119 self.sender
120 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121 .await
122 .unwrap()
123 }
124
125 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127 self.checkpoint_event_sender.subscribe()
128 }
129
130 pub fn get_peers_reported_for_failure(&self) -> u64 {
132 self.metrics.get_peers_reported_for_failure()
133 }
134}
135
136pub(super) fn compute_adaptive_timeout(
137 tx_count: u64,
138 min_timeout: Duration,
139 max_timeout: Duration,
140) -> Duration {
141 const MAX_TRANSACTIONS_PER_CHECKPOINT: u64 = 10_000;
142 const JITTER_FRACTION: f64 = 0.1;
143
144 let max_timeout = max_timeout.max(min_timeout);
145 let ratio = (tx_count as f64 / MAX_TRANSACTIONS_PER_CHECKPOINT as f64).min(1.0);
146 let extra = Duration::from_secs_f64((max_timeout - min_timeout).as_secs_f64() * ratio);
147 let base = min_timeout + extra;
148
149 let jitter_range = base.as_secs_f64() * JITTER_FRACTION;
150 let jitter = rand::thread_rng().gen_range(-jitter_range..jitter_range);
151 Duration::from_secs_f64((base.as_secs_f64() + jitter).max(min_timeout.as_secs_f64()))
152}
153
154#[cfg_attr(test, derive(Debug))]
155pub(super) struct PeerScore {
156 successes: VecDeque<(Instant, u64 , Duration)>,
157 failures: VecDeque<Instant>,
158 window: Duration,
159 failure_rate: f64,
160 failing_since: Option<Instant>,
161}
162
163impl PeerScore {
164 const MAX_SAMPLES: usize = 20;
165 const MIN_SAMPLES_FOR_FAILURE: usize = 10;
166
167 pub(super) fn new(window: Duration, failure_rate: f64) -> Self {
168 Self {
169 successes: VecDeque::new(),
170 failures: VecDeque::new(),
171 window,
172 failure_rate,
173 failing_since: None,
174 }
175 }
176
177 pub(super) fn record_success(&mut self, size: u64, response_time: Duration) {
178 let now = Instant::now();
179 self.successes.push_back((now, size, response_time));
180 while self.successes.len() > Self::MAX_SAMPLES {
181 self.successes.pop_front();
182 }
183 self.failing_since = None;
184 }
185
186 pub(super) fn record_failure(&mut self) {
187 let now = Instant::now();
188 self.failures.push_back(now);
189 while self.failures.len() > Self::MAX_SAMPLES {
190 self.failures.pop_front();
191 }
192 }
193
194 pub(super) fn is_failing(&self) -> bool {
195 let now = Instant::now();
196 let recent_failures = self
197 .failures
198 .iter()
199 .filter(|ts| now.duration_since(**ts) < self.window)
200 .count();
201 let recent_successes = self
202 .successes
203 .iter()
204 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
205 .count();
206
207 let total = recent_failures + recent_successes;
208 if total < Self::MIN_SAMPLES_FOR_FAILURE {
209 return false;
210 }
211
212 let rate = recent_failures as f64 / total as f64;
213 rate >= self.failure_rate
214 }
215
216 pub(super) fn update_failing_state(&mut self) {
217 if self.is_failing() && self.failing_since.is_none() {
218 self.failing_since = Some(Instant::now());
219 }
220 }
221
222 pub(super) fn reset_failing_since(&mut self) {
223 self.failing_since = None;
224 }
225
226 pub(super) fn consistently_failing(&self, threshold: Duration) -> bool {
227 match self.failing_since {
228 Some(since) => since.elapsed() >= threshold,
229 None => false,
230 }
231 }
232
233 pub(super) fn effective_throughput(&self) -> Option<f64> {
234 let now = Instant::now();
235 let (total_size, total_time) = self
236 .successes
237 .iter()
238 .filter(|(ts, _, _)| now.duration_since(*ts) < self.window)
239 .fold((0u64, Duration::ZERO), |(size, time), (_, s, d)| {
240 (size + s, time + *d)
241 });
242
243 if total_size == 0 {
244 return None;
245 }
246
247 if total_time.is_zero() {
248 return None;
249 }
250
251 Some(total_size as f64 / total_time.as_secs_f64())
252 }
253}
254
255struct PeerHeights {
256 peers: HashMap<PeerId, PeerStateSyncInfo>,
258 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
259 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
260 scores: HashMap<PeerId, PeerScore>,
261
262 wait_interval_when_no_peer_to_sync_content: Duration,
263 peer_scoring_window: Duration,
264 peer_failure_rate: f64,
265 checkpoint_content_timeout_min: Duration,
266 checkpoint_content_timeout_max: Duration,
267 exploration_probability: f64,
268}
269
270#[derive(Copy, Clone, Debug, PartialEq, Eq)]
271struct PeerStateSyncInfo {
272 genesis_checkpoint_digest: CheckpointDigest,
274 on_same_chain_as_us: bool,
276 height: CheckpointSequenceNumber,
278 lowest: CheckpointSequenceNumber,
281}
282
283impl PeerHeights {
284 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
285 self.peers
286 .values()
287 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
288 .max()
289 }
290
291 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
292 self.peers
293 .iter()
294 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
295 }
296
297 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
302 pub fn update_peer_info(
303 &mut self,
304 peer_id: PeerId,
305 checkpoint: Checkpoint,
306 low_watermark: Option<CheckpointSequenceNumber>,
307 ) -> bool {
308 debug!("Update peer info");
309
310 let info = match self.peers.get_mut(&peer_id) {
311 Some(info) if info.on_same_chain_as_us => info,
312 _ => return false,
313 };
314
315 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
316 if let Some(low_watermark) = low_watermark {
317 info.lowest = low_watermark;
318 }
319 self.insert_checkpoint(checkpoint);
320
321 true
322 }
323
324 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
325 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
326 use std::collections::hash_map::Entry;
327 debug!("Insert peer info");
328
329 match self.peers.entry(peer_id) {
330 Entry::Occupied(mut entry) => {
331 let entry = entry.get_mut();
334 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
335 entry.height = std::cmp::max(entry.height, info.height);
336 } else {
337 *entry = info;
338 }
339 }
340 Entry::Vacant(entry) => {
341 entry.insert(info);
342 }
343 }
344 }
345
346 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
347 if let Some(info) = self.peers.get_mut(&peer_id) {
348 info.on_same_chain_as_us = false;
349 }
350 self.scores.remove(&peer_id);
351 }
352
353 pub fn update_peer_height(
356 &mut self,
357 peer_id: PeerId,
358 height: CheckpointSequenceNumber,
359 low_watermark: Option<CheckpointSequenceNumber>,
360 ) -> bool {
361 let info = match self.peers.get_mut(&peer_id) {
362 Some(info) if info.on_same_chain_as_us => info,
363 _ => return false,
364 };
365
366 info.height = std::cmp::max(height, info.height);
367 if let Some(low_watermark) = low_watermark {
368 info.lowest = low_watermark;
369 }
370
371 true
372 }
373
374 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
375 self.unprocessed_checkpoints
376 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
377 self.sequence_number_to_digest
378 .retain(|&s, _digest| s > sequence_number);
379 }
380
381 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
386 let digest = *checkpoint.digest();
387 let sequence_number = *checkpoint.sequence_number();
388
389 if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
391 if *existing_digest == digest {
392 return;
394 }
395 tracing::info!(
396 ?sequence_number,
397 ?existing_digest,
398 ?digest,
399 "received checkpoint with same sequence number but different digest, dropping new checkpoint"
400 );
401 return;
402 }
403
404 self.unprocessed_checkpoints.insert(digest, checkpoint);
405 self.sequence_number_to_digest
406 .insert(sequence_number, digest);
407 }
408
409 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
410 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
411 self.sequence_number_to_digest
412 .remove(checkpoint.sequence_number());
413 }
414 }
415
416 pub fn get_checkpoint_by_sequence_number(
417 &self,
418 sequence_number: CheckpointSequenceNumber,
419 ) -> Option<&Checkpoint> {
420 self.sequence_number_to_digest
421 .get(&sequence_number)
422 .and_then(|digest| self.get_checkpoint_by_digest(digest))
423 }
424
425 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
426 self.unprocessed_checkpoints.get(digest)
427 }
428
429 #[cfg(test)]
430 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
431 self.wait_interval_when_no_peer_to_sync_content = duration;
432 }
433
434 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
435 self.wait_interval_when_no_peer_to_sync_content
436 }
437
438 pub fn record_success(&mut self, peer_id: PeerId, size: u64, response_time: Duration) {
439 if !self.peers.contains_key(&peer_id) {
440 return;
441 }
442 self.scores
443 .entry(peer_id)
444 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
445 .record_success(size, response_time);
446 }
447
448 pub fn record_failure(&mut self, peer_id: PeerId) {
449 if !self.peers.contains_key(&peer_id) {
450 return;
451 }
452 self.scores
453 .entry(peer_id)
454 .or_insert_with(|| PeerScore::new(self.peer_scoring_window, self.peer_failure_rate))
455 .record_failure();
456 }
457
458 pub fn get_throughput(&self, peer_id: &PeerId) -> Option<f64> {
459 self.scores
460 .get(peer_id)
461 .and_then(|s| s.effective_throughput())
462 }
463
464 pub fn is_failing(&self, peer_id: &PeerId) -> bool {
465 self.scores
466 .get(peer_id)
467 .map(|s| s.is_failing())
468 .unwrap_or(false)
469 }
470
471 pub fn update_failing_states(&mut self) {
472 for score in self.scores.values_mut() {
473 score.update_failing_state();
474 }
475 }
476
477 pub fn find_peer_to_report_for_failure(&self, threshold: Duration) -> Option<PeerId> {
478 self.scores
479 .iter()
480 .filter(|(peer_id, score)| {
481 score.consistently_failing(threshold)
482 && self
483 .peers
484 .get(peer_id)
485 .is_some_and(|info| info.on_same_chain_as_us)
486 })
487 .max_by_key(|(_, score)| {
488 score
489 .failing_since
490 .map(|since| since.elapsed())
491 .unwrap_or(Duration::ZERO)
492 })
493 .map(|(peer_id, _)| *peer_id)
494 }
495}
496
497#[derive(Clone)]
502struct PeerBalancer {
503 known_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
504 unknown_peers: Vec<(anemo::Peer, PeerStateSyncInfo, f64)>,
505 failing_peers: Vec<(anemo::Peer, PeerStateSyncInfo)>,
506 requested_checkpoint: Option<CheckpointSequenceNumber>,
507 request_type: PeerCheckpointRequestType,
508 exploration_probability: f64,
509}
510
511#[derive(Clone)]
512enum PeerCheckpointRequestType {
513 Summary,
514 Content,
515}
516
517impl PeerBalancer {
518 pub fn new(
519 network: &anemo::Network,
520 peer_heights: Arc<RwLock<PeerHeights>>,
521 request_type: PeerCheckpointRequestType,
522 ) -> Self {
523 let peer_heights_guard = peer_heights.read().unwrap();
524
525 let mut known_peers = Vec::new();
526 let mut unknown_peers = Vec::new();
527 let mut failing_peers = Vec::new();
528 let exploration_probability = peer_heights_guard.exploration_probability;
529
530 for (peer_id, info) in peer_heights_guard.peers_on_same_chain() {
531 let Some(peer) = network.peer(*peer_id) else {
532 continue;
533 };
534 let rtt_secs = peer.connection_rtt().as_secs_f64();
535
536 if peer_heights_guard.is_failing(peer_id) {
537 failing_peers.push((peer, *info));
538 } else if let Some(throughput) = peer_heights_guard.get_throughput(peer_id) {
539 known_peers.push((peer, *info, throughput));
540 } else {
541 unknown_peers.push((peer, *info, rtt_secs));
542 }
543 }
544 drop(peer_heights_guard);
545
546 unknown_peers.sort_by(|(_, _, rtt_a), (_, _, rtt_b)| {
547 rtt_a
548 .partial_cmp(rtt_b)
549 .unwrap_or(std::cmp::Ordering::Equal)
550 });
551
552 Self {
553 known_peers,
554 unknown_peers,
555 failing_peers,
556 requested_checkpoint: None,
557 request_type,
558 exploration_probability,
559 }
560 }
561
562 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
563 self.requested_checkpoint = Some(checkpoint);
564 self
565 }
566
567 fn is_eligible(&self, info: &PeerStateSyncInfo) -> bool {
568 let requested = self.requested_checkpoint.unwrap_or(0);
569 match &self.request_type {
570 PeerCheckpointRequestType::Summary => info.height >= requested,
571 PeerCheckpointRequestType::Content => {
572 info.height >= requested && info.lowest <= requested
573 }
574 }
575 }
576
577 fn select_by_throughput(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
578 let eligible: Vec<_> = self
579 .known_peers
580 .iter()
581 .enumerate()
582 .filter(|(_, (_, info, _))| self.is_eligible(info))
583 .map(|(i, (_, _, t))| (i, *t))
584 .collect();
585
586 if eligible.is_empty() {
587 return None;
588 }
589
590 let total: f64 = eligible.iter().map(|(_, t)| t).sum();
591 let mut pick = rand::thread_rng().gen_range(0.0..total);
592
593 for (idx, throughput) in &eligible {
594 pick -= throughput;
595 if pick <= 0.0 {
596 let (peer, info, _) = self.known_peers.remove(*idx);
597 return Some((peer, info));
598 }
599 }
600
601 let (idx, _) = eligible.last().unwrap();
602 let (peer, info, _) = self.known_peers.remove(*idx);
603 Some((peer, info))
604 }
605
606 fn select_by_rtt(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
607 let pos = self
608 .unknown_peers
609 .iter()
610 .position(|(_, info, _)| self.is_eligible(info))?;
611 let (peer, info, _) = self.unknown_peers.remove(pos);
612 Some((peer, info))
613 }
614
615 fn select_failing(&mut self) -> Option<(anemo::Peer, PeerStateSyncInfo)> {
616 let pos = self
617 .failing_peers
618 .iter()
619 .position(|(_, info)| self.is_eligible(info))?;
620 Some(self.failing_peers.remove(pos))
621 }
622}
623
624impl Iterator for PeerBalancer {
625 type Item = StateSyncClient<anemo::Peer>;
626
627 fn next(&mut self) -> Option<Self::Item> {
628 let has_eligible_known = self
629 .known_peers
630 .iter()
631 .any(|(_, info, _)| self.is_eligible(info));
632 let has_eligible_unknown = self
633 .unknown_peers
634 .iter()
635 .any(|(_, info, _)| self.is_eligible(info));
636
637 let explore = has_eligible_unknown
638 && (!has_eligible_known || rand::thread_rng().gen_bool(self.exploration_probability));
639
640 if explore && let Some((peer, _)) = self.select_by_rtt() {
641 return Some(StateSyncClient::new(peer));
642 }
643
644 if has_eligible_known && let Some((peer, _)) = self.select_by_throughput() {
645 return Some(StateSyncClient::new(peer));
646 }
647
648 if let Some((peer, _)) = self.select_failing() {
649 return Some(StateSyncClient::new(peer));
650 }
651
652 None
653 }
654}
655
656#[derive(Clone, Debug)]
657enum StateSyncMessage {
658 StartSyncJob,
659 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
662 SyncedCheckpoint(Box<VerifiedCheckpoint>),
666}
667
668struct StateSyncEventLoop<S> {
669 config: StateSyncConfig,
670
671 mailbox: mpsc::Receiver<StateSyncMessage>,
672 weak_sender: mpsc::WeakSender<StateSyncMessage>,
674
675 tasks: JoinSet<()>,
676 sync_checkpoint_summaries_task: Option<AbortHandle>,
677 sync_checkpoint_contents_task: Option<AbortHandle>,
678 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
679
680 store: S,
681 peer_heights: Arc<RwLock<PeerHeights>>,
682 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
683 network: anemo::Network,
684 metrics: Metrics,
685
686 sync_checkpoint_from_archive_task: Option<AbortHandle>,
687 archive_config: Option<ArchiveReaderConfig>,
688 discovery_sender: Option<crate::discovery::Sender>,
689}
690
691impl<S> StateSyncEventLoop<S>
692where
693 S: WriteStore + Clone + Send + Sync + 'static,
694{
695 pub async fn start(mut self) {
700 info!("State-Synchronizer started");
701
702 self.config.pinned_checkpoints.sort();
703
704 let mut interval = tokio::time::interval(self.config.interval_period());
705 let mut peer_events = {
706 let (subscriber, peers) = self.network.subscribe().unwrap();
707 for peer_id in peers {
708 self.spawn_get_latest_from_peer(peer_id);
709 }
710 subscriber
711 };
712 let (
713 target_checkpoint_contents_sequence_sender,
714 target_checkpoint_contents_sequence_receiver,
715 ) = watch::channel(0);
716
717 let (_sender, receiver) = oneshot::channel();
719 tokio::spawn(update_checkpoint_watermark_metrics(
720 receiver,
721 self.store.clone(),
722 self.metrics.clone(),
723 ));
724
725 let task = sync_checkpoint_contents(
727 self.network.clone(),
728 self.store.clone(),
729 self.peer_heights.clone(),
730 self.weak_sender.clone(),
731 self.checkpoint_event_sender.clone(),
732 self.config.checkpoint_content_download_concurrency(),
733 self.config.checkpoint_content_download_tx_concurrency(),
734 self.config.use_get_checkpoint_contents_v2(),
735 target_checkpoint_contents_sequence_receiver,
736 );
737 let task_handle = self.tasks.spawn(task);
738 self.sync_checkpoint_contents_task = Some(task_handle);
739
740 let task = sync_checkpoint_contents_from_archive(
748 self.network.clone(),
749 self.archive_config.clone(),
750 self.store.clone(),
751 self.peer_heights.clone(),
752 self.metrics.clone(),
753 );
754 let task_handle = self.tasks.spawn(task);
755 self.sync_checkpoint_from_archive_task = Some(task_handle);
756
757 loop {
759 tokio::select! {
760 now = interval.tick() => {
761 self.handle_tick(now.into_std());
762 },
763 maybe_message = self.mailbox.recv() => {
764 if let Some(message) = maybe_message {
767 self.handle_message(message);
768 } else {
769 break;
770 }
771 },
772 peer_event = peer_events.recv() => {
773 self.handle_peer_event(peer_event);
774 },
775 Some(task_result) = self.tasks.join_next() => {
776 match task_result {
777 Ok(()) => {},
778 Err(e) => {
779 if e.is_cancelled() {
780 } else if e.is_panic() {
782 std::panic::resume_unwind(e.into_panic());
784 } else {
785 panic!("task failed: {e}");
786 }
787 },
788 };
789
790 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
791 panic!("sync_checkpoint_contents task unexpectedly terminated")
792 }
793
794 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
795 self.sync_checkpoint_summaries_task = None;
796 }
797
798 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
799 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
800 }
801 },
802 }
803
804 self.maybe_start_checkpoint_summary_sync_task();
805 self.maybe_trigger_checkpoint_contents_sync_task(
806 &target_checkpoint_contents_sequence_sender,
807 );
808 }
809
810 info!("State-Synchronizer ended");
811 }
812
813 fn handle_message(&mut self, message: StateSyncMessage) {
814 debug!("Received message: {:?}", message);
815 match message {
816 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
817 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
818 self.handle_checkpoint_from_consensus(checkpoint)
819 }
820 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
822 self.spawn_notify_peers_of_checkpoint(*checkpoint)
823 }
824 }
825 }
826
827 #[instrument(level = "debug", skip_all)]
829 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
830 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
833 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
834 .digest();
835 if checkpoint.previous_digest != Some(prev_digest) {
836 panic!(
837 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
838 checkpoint.sequence_number(),
839 Some(prev_digest),
840 checkpoint.previous_digest
841 );
842 }
843
844 let latest_checkpoint = self
845 .store
846 .get_highest_verified_checkpoint()
847 .expect("store operation should not fail");
848
849 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
851 return;
852 }
853
854 let checkpoint = *checkpoint;
855 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
856 if *checkpoint.sequence_number() > next_sequence_number {
857 debug!(
858 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
859 next_sequence_number,
860 checkpoint.sequence_number()
861 );
862 }
863
864 #[cfg(debug_assertions)]
867 {
868 let _ = (next_sequence_number..=*checkpoint.sequence_number())
869 .map(|n| {
870 let checkpoint = self
871 .store
872 .get_checkpoint_by_sequence_number(n)
873 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
874 self.store
875 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
876 .unwrap_or_else(|| {
877 panic!(
878 "store should contain checkpoint contents for {:?}",
879 checkpoint.content_digest
880 )
881 });
882 })
883 .collect::<Vec<_>>();
884 }
885
886 if let Some(EndOfEpochData {
893 next_epoch_committee,
894 ..
895 }) = checkpoint.end_of_epoch_data.as_ref()
896 {
897 let next_committee = next_epoch_committee.iter().cloned().collect();
898 let committee =
899 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
900 self.store
901 .insert_committee(committee)
902 .expect("store operation should not fail");
903 }
904
905 self.store
906 .update_highest_verified_checkpoint(&checkpoint)
907 .expect("store operation should not fail");
908 self.store
909 .update_highest_synced_checkpoint(&checkpoint)
910 .expect("store operation should not fail");
911
912 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
914
915 self.spawn_notify_peers_of_checkpoint(checkpoint);
916 }
917
918 fn handle_peer_event(
919 &mut self,
920 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
921 ) {
922 use tokio::sync::broadcast::error::RecvError;
923
924 match peer_event {
925 Ok(PeerEvent::NewPeer(peer_id)) => {
926 self.spawn_get_latest_from_peer(peer_id);
927 }
928 Ok(PeerEvent::LostPeer(peer_id, _)) => {
929 let mut heights = self.peer_heights.write().unwrap();
930 heights.peers.remove(&peer_id);
931 heights.scores.remove(&peer_id);
932 }
933
934 Err(RecvError::Closed) => {
935 panic!("PeerEvent channel shouldn't be able to be closed");
936 }
937
938 Err(RecvError::Lagged(_)) => {
939 trace!("State-Sync fell behind processing PeerEvents");
940 }
941 }
942 }
943
944 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
945 if let Some(peer) = self.network.peer(peer_id) {
946 let genesis_checkpoint_digest = *self
947 .store
948 .get_checkpoint_by_sequence_number(0)
949 .expect("store should contain genesis checkpoint")
950 .digest();
951 let task = get_latest_from_peer(
952 genesis_checkpoint_digest,
953 peer,
954 self.peer_heights.clone(),
955 self.config.timeout(),
956 );
957 self.tasks.spawn(task);
958 }
959 }
960
961 fn handle_tick(&mut self, _now: std::time::Instant) {
962 let task = query_peers_for_their_latest_checkpoint(
963 self.network.clone(),
964 self.peer_heights.clone(),
965 self.weak_sender.clone(),
966 self.config.timeout(),
967 );
968 self.tasks.spawn(task);
969
970 if let Some(layer) = self.download_limit_layer.as_ref() {
971 layer.maybe_prune_map();
972 }
973
974 self.maybe_report_failing_peer();
975 }
976
977 fn maybe_report_failing_peer(&mut self) {
978 let threshold = self.config.peer_disconnect_threshold();
979 if threshold.is_zero() {
980 return;
981 }
982
983 let mut peer_heights = self.peer_heights.write().unwrap();
984 peer_heights.update_failing_states();
985
986 let Some(peer_id) = peer_heights.find_peer_to_report_for_failure(threshold) else {
987 return;
988 };
989 if let Some(score) = peer_heights.scores.get_mut(&peer_id) {
995 score.reset_failing_since();
996 }
997 drop(peer_heights);
998
999 info!("reporting peer {peer_id} for consistent state sync failures");
1000 if let Some(sender) = &self.discovery_sender {
1001 sender.report_peer_failure(peer_id);
1002 } else {
1003 let _ = self.network.disconnect(peer_id);
1004 }
1005 self.metrics.inc_peers_reported_for_failure();
1006 }
1007
1008 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
1009 if self.sync_checkpoint_summaries_task.is_some() {
1011 return;
1012 }
1013
1014 let highest_processed_checkpoint = self
1015 .store
1016 .get_highest_verified_checkpoint()
1017 .expect("store operation should not fail");
1018
1019 let highest_known_sequence_number = self
1020 .peer_heights
1021 .read()
1022 .unwrap()
1023 .highest_known_checkpoint_sequence_number();
1024
1025 if let Some(target_seq) = highest_known_sequence_number
1026 && *highest_processed_checkpoint.sequence_number() < target_seq
1027 {
1028 let max_batch_size = self.config.max_checkpoint_sync_batch_size();
1030 let limited_target = std::cmp::min(
1031 target_seq,
1032 highest_processed_checkpoint
1033 .sequence_number()
1034 .saturating_add(max_batch_size),
1035 );
1036 let was_limited = limited_target < target_seq;
1037
1038 let weak_sender = self.weak_sender.clone();
1040 let task = sync_to_checkpoint(
1041 self.network.clone(),
1042 self.store.clone(),
1043 self.peer_heights.clone(),
1044 self.metrics.clone(),
1045 self.config.pinned_checkpoints.clone(),
1046 self.config.checkpoint_header_download_concurrency(),
1047 self.config.timeout(),
1048 limited_target,
1049 )
1050 .map(move |result| match result {
1051 Ok(()) => {
1052 if was_limited && let Some(sender) = weak_sender.upgrade() {
1055 let _ = sender.try_send(StateSyncMessage::StartSyncJob);
1056 }
1057 }
1058 Err(e) => {
1059 debug!("error syncing checkpoint {e}");
1060 }
1061 });
1062 let task_handle = self.tasks.spawn(task);
1063 self.sync_checkpoint_summaries_task = Some(task_handle);
1064 }
1065 }
1066
1067 fn maybe_trigger_checkpoint_contents_sync_task(
1068 &mut self,
1069 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
1070 ) {
1071 let highest_verified_checkpoint = self
1072 .store
1073 .get_highest_verified_checkpoint()
1074 .expect("store operation should not fail");
1075 let highest_synced_checkpoint = self
1076 .store
1077 .get_highest_synced_checkpoint()
1078 .expect("store operation should not fail");
1079
1080 if highest_verified_checkpoint.sequence_number()
1081 > highest_synced_checkpoint.sequence_number()
1082 && self
1084 .peer_heights
1085 .read()
1086 .unwrap()
1087 .highest_known_checkpoint_sequence_number()
1088 > Some(*highest_synced_checkpoint.sequence_number())
1089 {
1090 let _ = target_sequence_channel.send_if_modified(|num| {
1091 let new_num = *highest_verified_checkpoint.sequence_number();
1092 if *num == new_num {
1093 return false;
1094 }
1095 *num = new_num;
1096 true
1097 });
1098 }
1099 }
1100
1101 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
1102 let task = notify_peers_of_checkpoint(
1103 self.network.clone(),
1104 self.peer_heights.clone(),
1105 checkpoint,
1106 self.config.timeout(),
1107 );
1108 self.tasks.spawn(task);
1109 }
1110}
1111
1112async fn notify_peers_of_checkpoint(
1113 network: anemo::Network,
1114 peer_heights: Arc<RwLock<PeerHeights>>,
1115 checkpoint: VerifiedCheckpoint,
1116 timeout: Duration,
1117) {
1118 let futs = peer_heights
1119 .read()
1120 .unwrap()
1121 .peers_on_same_chain()
1122 .filter_map(|(peer_id, info)| {
1124 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
1125 })
1126 .flat_map(|peer_id| network.peer(*peer_id))
1128 .map(StateSyncClient::new)
1129 .map(|mut client| {
1130 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
1131 async move { client.push_checkpoint_summary(request).await }
1132 })
1133 .collect::<Vec<_>>();
1134 futures::future::join_all(futs).await;
1135}
1136
1137async fn get_latest_from_peer(
1138 our_genesis_checkpoint_digest: CheckpointDigest,
1139 peer: anemo::Peer,
1140 peer_heights: Arc<RwLock<PeerHeights>>,
1141 timeout: Duration,
1142) {
1143 let peer_id = peer.peer_id();
1144 let mut client = StateSyncClient::new(peer);
1145
1146 let info = {
1147 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
1148
1149 if let Some(info) = maybe_info {
1150 info
1151 } else {
1152 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
1157 .with_timeout(timeout);
1158 let response = client
1159 .get_checkpoint_summary(request)
1160 .await
1161 .map(Response::into_inner);
1162
1163 let info = match response {
1164 Ok(Some(checkpoint)) => {
1165 let digest = *checkpoint.digest();
1166 PeerStateSyncInfo {
1167 genesis_checkpoint_digest: digest,
1168 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
1169 height: *checkpoint.sequence_number(),
1170 lowest: CheckpointSequenceNumber::default(),
1171 }
1172 }
1173 Ok(None) => PeerStateSyncInfo {
1174 genesis_checkpoint_digest: CheckpointDigest::default(),
1175 on_same_chain_as_us: false,
1176 height: CheckpointSequenceNumber::default(),
1177 lowest: CheckpointSequenceNumber::default(),
1178 },
1179 Err(status) => {
1180 trace!("get_latest_checkpoint_summary request failed: {status:?}");
1181 return;
1182 }
1183 };
1184 peer_heights
1185 .write()
1186 .unwrap()
1187 .insert_peer_info(peer_id, info);
1188 info
1189 }
1190 };
1191
1192 if !info.on_same_chain_as_us {
1194 trace!(?info, "Peer {peer_id} not on same chain as us");
1195 return;
1196 }
1197 let Some((highest_checkpoint, low_watermark)) =
1198 query_peer_for_latest_info(&mut client, timeout).await
1199 else {
1200 return;
1201 };
1202 peer_heights
1203 .write()
1204 .unwrap()
1205 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
1206}
1207
1208async fn query_peer_for_latest_info(
1210 client: &mut StateSyncClient<anemo::Peer>,
1211 timeout: Duration,
1212) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
1213 let request = Request::new(()).with_timeout(timeout);
1214 let response = client
1215 .get_checkpoint_availability(request)
1216 .await
1217 .map(Response::into_inner);
1218 match response {
1219 Ok(GetCheckpointAvailabilityResponse {
1220 highest_synced_checkpoint,
1221 lowest_available_checkpoint,
1222 }) => {
1223 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
1224 }
1225 Err(status) => {
1226 if status.status() != anemo::types::response::StatusCode::NotFound {
1228 trace!("get_checkpoint_availability request failed: {status:?}");
1229 return None;
1230 }
1231 }
1232 };
1233
1234 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
1237 let response = client
1238 .get_checkpoint_summary(request)
1239 .await
1240 .map(Response::into_inner);
1241 match response {
1242 Ok(Some(checkpoint)) => Some((checkpoint, None)),
1243 Ok(None) => None,
1244 Err(status) => {
1245 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
1246 None
1247 }
1248 }
1249}
1250
1251#[instrument(level = "debug", skip_all)]
1252async fn query_peers_for_their_latest_checkpoint(
1253 network: anemo::Network,
1254 peer_heights: Arc<RwLock<PeerHeights>>,
1255 sender: mpsc::WeakSender<StateSyncMessage>,
1256 timeout: Duration,
1257) {
1258 let peer_heights = &peer_heights;
1259 let futs = peer_heights
1260 .read()
1261 .unwrap()
1262 .peers_on_same_chain()
1263 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
1265 .map(|peer| {
1266 let peer_id = peer.peer_id();
1267 let mut client = StateSyncClient::new(peer);
1268
1269 async move {
1270 let response = query_peer_for_latest_info(&mut client, timeout).await;
1271 match response {
1272 Some((highest_checkpoint, low_watermark)) => peer_heights
1273 .write()
1274 .unwrap()
1275 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
1276 .then_some(highest_checkpoint),
1277 None => None,
1278 }
1279 }
1280 })
1281 .collect::<Vec<_>>();
1282
1283 debug!("Query {} peers for latest checkpoint", futs.len());
1284
1285 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
1286
1287 let highest_checkpoint_seq = checkpoints
1288 .map(|checkpoint| *checkpoint.sequence_number())
1289 .max();
1290
1291 let our_highest_seq = peer_heights
1292 .read()
1293 .unwrap()
1294 .highest_known_checkpoint_sequence_number();
1295
1296 debug!(
1297 "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
1298 );
1299
1300 let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
1301 (Some(theirs), None) => theirs,
1302 (Some(theirs), Some(ours)) if theirs > ours => theirs,
1303 _ => return,
1304 };
1305
1306 if let Some(sender) = sender.upgrade() {
1307 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1308 }
1309}
1310
1311async fn sync_to_checkpoint<S>(
1312 network: anemo::Network,
1313 store: S,
1314 peer_heights: Arc<RwLock<PeerHeights>>,
1315 metrics: Metrics,
1316 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1317 checkpoint_header_download_concurrency: usize,
1318 timeout: Duration,
1319 target_sequence_number: CheckpointSequenceNumber,
1320) -> Result<()>
1321where
1322 S: WriteStore,
1323{
1324 metrics.set_highest_known_checkpoint(target_sequence_number);
1325
1326 let mut current = store
1327 .get_highest_verified_checkpoint()
1328 .expect("store operation should not fail");
1329 if *current.sequence_number() >= target_sequence_number {
1330 return Err(anyhow::anyhow!(
1331 "target checkpoint {} is older than highest verified checkpoint {}",
1332 target_sequence_number,
1333 current.sequence_number(),
1334 ));
1335 }
1336
1337 let peer_balancer = PeerBalancer::new(
1338 &network,
1339 peer_heights.clone(),
1340 PeerCheckpointRequestType::Summary,
1341 );
1342 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1344 ..=target_sequence_number)
1345 .map(|next| {
1346 let peers = peer_balancer.clone().with_checkpoint(next);
1347 let peer_heights = peer_heights.clone();
1348 let pinned_checkpoints = &pinned_checkpoints;
1349 async move {
1350 if let Some(checkpoint) = peer_heights
1351 .read()
1352 .unwrap()
1353 .get_checkpoint_by_sequence_number(next)
1354 {
1355 return (Some(checkpoint.to_owned()), next, None);
1356 }
1357
1358 for mut peer in peers {
1361 let peer_id = peer.inner().peer_id();
1362 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1363 .with_timeout(timeout);
1364 let start = Instant::now();
1365 let result = peer.get_checkpoint_summary(request).await;
1366 let elapsed = start.elapsed();
1367
1368 let checkpoint = match result {
1369 Ok(response) => match response.into_inner() {
1370 Some(cp) => Some(cp),
1371 None => {
1372 trace!("peer unable to help sync");
1373 peer_heights.write().unwrap().record_failure(peer_id);
1374 None
1375 }
1376 },
1377 Err(e) => {
1378 trace!("{e:?}");
1379 peer_heights.write().unwrap().record_failure(peer_id);
1380 None
1381 }
1382 };
1383
1384 let Some(checkpoint) = checkpoint else {
1385 continue;
1386 };
1387
1388 let size = bcs::serialized_size(&checkpoint).expect("serialization should not fail") as u64;
1389 peer_heights.write().unwrap().record_success(peer_id, size, elapsed);
1390
1391 if *checkpoint.sequence_number() != next {
1393 tracing::debug!(
1394 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1395 checkpoint.sequence_number()
1396 );
1397 peer_heights
1398 .write()
1399 .unwrap()
1400 .mark_peer_as_not_on_same_chain(peer_id);
1401 continue;
1402 }
1403
1404 let checkpoint_digest = checkpoint.digest();
1406 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1407 checkpoint.sequence_number(),
1408 |(seq_num, _digest)| *seq_num
1409 )
1410 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest
1411 {
1412 tracing::debug!(
1413 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1414 pinned_checkpoints[pinned_digest_index].1,
1415 checkpoint_digest
1416 );
1417 peer_heights
1418 .write()
1419 .unwrap()
1420 .mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1421 continue;
1422 }
1423
1424 peer_heights
1426 .write()
1427 .unwrap()
1428 .insert_checkpoint(checkpoint.clone());
1429 return (Some(checkpoint), next, Some(peer_id));
1430 }
1431 (None, next, None)
1432 }
1433 })
1434 .pipe(futures::stream::iter)
1435 .buffered(checkpoint_header_download_concurrency);
1436
1437 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1438 assert_eq!(
1439 current
1440 .sequence_number()
1441 .checked_add(1)
1442 .expect("exhausted u64"),
1443 next
1444 );
1445
1446 let checkpoint = 'cp: {
1448 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1449 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1450 })?;
1451 if pinned_checkpoints
1453 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1454 .is_ok()
1455 {
1456 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1457 }
1458 match verify_checkpoint(¤t, &store, checkpoint) {
1459 Ok(verified_checkpoint) => verified_checkpoint,
1460 Err(checkpoint) => {
1461 let mut peer_heights = peer_heights.write().unwrap();
1462 peer_heights.remove_checkpoint(checkpoint.digest());
1465
1466 if let Some(peer_id) = maybe_peer_id {
1468 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1469 }
1470
1471 return Err(anyhow::anyhow!(
1472 "unable to verify checkpoint {checkpoint:?}"
1473 ));
1474 }
1475 }
1476 };
1477
1478 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1479 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1480 metrics.checkpoint_summary_age_metrics()
1481 {
1482 checkpoint.report_checkpoint_age(
1483 checkpoint_summary_age_metric,
1484 checkpoint_summary_age_metric_deprecated,
1485 );
1486 }
1487
1488 store
1491 .insert_checkpoint(&checkpoint)
1492 .expect("store operation should not fail");
1493
1494 current = checkpoint;
1495 }
1496
1497 peer_heights
1498 .write()
1499 .unwrap()
1500 .cleanup_old_checkpoints(*current.sequence_number());
1501
1502 Ok(())
1503}
1504
1505async fn sync_checkpoint_contents_from_archive<S>(
1506 network: anemo::Network,
1507 archive_config: Option<ArchiveReaderConfig>,
1508 store: S,
1509 peer_heights: Arc<RwLock<PeerHeights>>,
1510 metrics: Metrics,
1511) where
1512 S: WriteStore + Clone + Send + Sync + 'static,
1513{
1514 loop {
1515 sync_checkpoint_contents_from_archive_iteration(
1516 &network,
1517 &archive_config,
1518 store.clone(),
1519 peer_heights.clone(),
1520 metrics.clone(),
1521 )
1522 .await;
1523 tokio::time::sleep(Duration::from_secs(5)).await;
1524 }
1525}
1526
1527async fn sync_checkpoint_contents_from_archive_iteration<S>(
1528 network: &anemo::Network,
1529 archive_config: &Option<ArchiveReaderConfig>,
1530 store: S,
1531 peer_heights: Arc<RwLock<PeerHeights>>,
1532 metrics: Metrics,
1533) where
1534 S: WriteStore + Clone + Send + Sync + 'static,
1535{
1536 let peers: Vec<_> = peer_heights
1537 .read()
1538 .unwrap()
1539 .peers_on_same_chain()
1540 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1542 .collect();
1543 let lowest_checkpoint_on_peers = peers
1544 .iter()
1545 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1546 .min();
1547 let highest_synced = store
1548 .get_highest_synced_checkpoint()
1549 .expect("store operation should not fail")
1550 .sequence_number;
1551 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1552 highest_synced < lowest_checkpoint_on_peers
1553 } else {
1554 false
1555 };
1556 debug!(
1557 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1558 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1559 );
1560 if sync_from_archive {
1561 let start = highest_synced
1562 .checked_add(1)
1563 .expect("Checkpoint seq num overflow");
1564 let end = lowest_checkpoint_on_peers.unwrap();
1565
1566 let Some(archive_config) = archive_config else {
1567 warn!("Failed to find an archive reader to complete the state sync request");
1568 return;
1569 };
1570 let Some(ingestion_url) = &archive_config.ingestion_url else {
1571 warn!("Archival ingestion url for state sync is not configured");
1572 return;
1573 };
1574 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1575 warn!("{} can't be used as an archival fallback", ingestion_url);
1576 return;
1577 }
1578 let obj_store =
1579 build_object_store(ingestion_url, archive_config.remote_store_options.clone());
1580 let mut checkpoint_stream = futures::stream::iter(start..=end)
1581 .map(|seq| {
1582 let obj_store = obj_store.clone();
1583 async move { (seq, fetch_checkpoint(&obj_store, seq).await) }
1584 })
1585 .buffered(archive_config.download_concurrency.get());
1586
1587 while let Some((seq, result)) = checkpoint_stream.next().await {
1588 let checkpoint = match result {
1589 Ok(checkpoint) => checkpoint,
1590 Err(err) => {
1591 warn!("State sync from archive failed fetching checkpoint {seq}: {err}");
1592 return;
1593 }
1594 };
1595 if let Err(err) = process_archive_checkpoint(&store, &checkpoint, &metrics) {
1596 warn!("State sync from archive failed processing checkpoint {seq}: {err}");
1597 return;
1598 }
1599 }
1600 }
1601}
1602
1603async fn sync_checkpoint_contents<S>(
1604 network: anemo::Network,
1605 store: S,
1606 peer_heights: Arc<RwLock<PeerHeights>>,
1607 sender: mpsc::WeakSender<StateSyncMessage>,
1608 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1609 checkpoint_content_download_concurrency: usize,
1610 checkpoint_content_download_tx_concurrency: u64,
1611 use_get_checkpoint_contents_v2: bool,
1612 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1613) where
1614 S: WriteStore + Clone,
1615{
1616 let mut highest_synced = store
1617 .get_highest_synced_checkpoint()
1618 .expect("store operation should not fail");
1619
1620 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1621 let mut target_sequence_cursor = 0;
1622 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1623 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1624
1625 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1626
1627 loop {
1628 tokio::select! {
1629 result = target_sequence_channel.changed() => {
1630 match result {
1631 Ok(()) => {
1632 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1633 }
1634 Err(_) => {
1635 return
1637 }
1638 }
1639 },
1640 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1641 match maybe_checkpoint {
1642 Ok(checkpoint) => {
1643 let _: &VerifiedCheckpoint = &checkpoint; store
1646 .update_highest_synced_checkpoint(&checkpoint)
1647 .expect("store operation should not fail");
1648 let _ = checkpoint_event_sender.send(checkpoint.clone());
1650 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1651 highest_synced = checkpoint;
1652
1653 }
1654 Err(checkpoint) => {
1655 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1657 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1658 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1659 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1660 }
1661 } else {
1662 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1663
1664 }
1665 let retry_tx_count = if *checkpoint.sequence_number() == 0 {
1667 checkpoint.network_total_transactions
1668 } else {
1669 let prev = store
1670 .get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
1671 .expect("previous checkpoint must exist")
1672 .network_total_transactions;
1673 checkpoint.network_total_transactions - prev
1674 };
1675 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1677 network.clone(),
1678 &store,
1679 peer_heights.clone(),
1680 use_get_checkpoint_contents_v2,
1681 retry_tx_count,
1682 checkpoint,
1683 ));
1684 }
1685 }
1686 },
1687 }
1688
1689 while current_sequence < target_sequence_cursor
1691 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1692 {
1693 let next_checkpoint = store
1694 .get_checkpoint_by_sequence_number(current_sequence)
1695 .unwrap_or_else(|| panic!(
1696 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1697 current_sequence
1698 ));
1699
1700 let tx_count = next_checkpoint.network_total_transactions
1702 - highest_started_network_total_transactions;
1703 if tx_count > tx_concurrency_remaining {
1704 break;
1705 }
1706 tx_concurrency_remaining -= tx_count;
1707
1708 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1709 current_sequence += 1;
1710 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1711 network.clone(),
1712 &store,
1713 peer_heights.clone(),
1714 use_get_checkpoint_contents_v2,
1715 tx_count,
1716 next_checkpoint,
1717 ));
1718 }
1719
1720 if highest_synced
1721 .sequence_number()
1722 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1723 || checkpoint_contents_tasks.is_empty()
1724 {
1725 if let Some(sender) = sender.upgrade() {
1727 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1728 let _ = sender.send(message).await;
1729 }
1730 }
1731 }
1732}
1733
1734#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1735async fn sync_one_checkpoint_contents<S>(
1736 network: anemo::Network,
1737 store: S,
1738 peer_heights: Arc<RwLock<PeerHeights>>,
1739 use_get_checkpoint_contents_v2: bool,
1740 tx_count: u64,
1741 checkpoint: VerifiedCheckpoint,
1742) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1743where
1744 S: WriteStore + Clone,
1745{
1746 let (timeout_min, timeout_max) = {
1747 let ph = peer_heights.read().unwrap();
1748 (
1749 ph.checkpoint_content_timeout_min,
1750 ph.checkpoint_content_timeout_max,
1751 )
1752 };
1753 let timeout = compute_adaptive_timeout(tx_count, timeout_min, timeout_max);
1754 debug!(
1755 "syncing checkpoint contents with adaptive timeout {:?} for {} txns",
1756 timeout, tx_count
1757 );
1758
1759 if store
1762 .get_highest_synced_checkpoint()
1763 .expect("store operation should not fail")
1764 .sequence_number()
1765 >= checkpoint.sequence_number()
1766 {
1767 debug!("checkpoint was already created via consensus output");
1768 return Ok(checkpoint);
1769 }
1770
1771 let peers = PeerBalancer::new(
1773 &network,
1774 peer_heights.clone(),
1775 PeerCheckpointRequestType::Content,
1776 )
1777 .with_checkpoint(*checkpoint.sequence_number());
1778 let now = tokio::time::Instant::now();
1779 let Some(_contents) = get_full_checkpoint_contents(
1780 peers,
1781 &store,
1782 peer_heights.clone(),
1783 &checkpoint,
1784 use_get_checkpoint_contents_v2,
1785 timeout,
1786 )
1787 .await
1788 else {
1789 let duration = peer_heights
1791 .read()
1792 .unwrap()
1793 .wait_interval_when_no_peer_to_sync_content();
1794 if now.elapsed() < duration {
1795 let duration = duration - now.elapsed();
1796 info!("retrying checkpoint sync after {:?}", duration);
1797 tokio::time::sleep(duration).await;
1798 }
1799 return Err(checkpoint);
1800 };
1801 debug!("completed checkpoint contents sync");
1802 Ok(checkpoint)
1803}
1804
1805#[instrument(level = "debug", skip_all)]
1806async fn get_full_checkpoint_contents<S>(
1807 peers: PeerBalancer,
1808 store: S,
1809 peer_heights: Arc<RwLock<PeerHeights>>,
1810 checkpoint: &VerifiedCheckpoint,
1811 use_get_checkpoint_contents_v2: bool,
1812 timeout: Duration,
1813) -> Option<VersionedFullCheckpointContents>
1814where
1815 S: WriteStore,
1816{
1817 let sequence_number = checkpoint.sequence_number;
1818 let digest = checkpoint.content_digest;
1819 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1820 debug!("store already contains checkpoint contents");
1821 return Some(contents);
1822 }
1823
1824 for mut peer in peers {
1827 let peer_id = peer.inner().peer_id();
1828 debug!(?timeout, "requesting checkpoint contents from {}", peer_id);
1829 let request = Request::new(digest).with_timeout(timeout);
1830 let start = Instant::now();
1831 let result = if use_get_checkpoint_contents_v2 {
1832 peer.get_checkpoint_contents_v2(request).await
1833 } else {
1834 peer.get_checkpoint_contents(request)
1835 .await
1836 .map(|r| r.map(|c| c.map(VersionedFullCheckpointContents::V1)))
1837 };
1838 let elapsed = start.elapsed();
1839
1840 let contents = match result {
1841 Ok(response) => match response.into_inner() {
1842 Some(c) => Some(c),
1843 None => {
1844 trace!("peer unable to help sync");
1845 peer_heights.write().unwrap().record_failure(peer_id);
1846 None
1847 }
1848 },
1849 Err(e) => {
1850 trace!("{e:?}");
1851 peer_heights.write().unwrap().record_failure(peer_id);
1852 None
1853 }
1854 };
1855
1856 let Some(contents) = contents else {
1857 continue;
1858 };
1859
1860 if contents.verify_digests(digest).is_ok() {
1861 let size =
1862 bcs::serialized_size(&contents).expect("serialization should not fail") as u64;
1863 peer_heights
1864 .write()
1865 .unwrap()
1866 .record_success(peer_id, size, elapsed);
1867 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1868 store
1869 .insert_checkpoint_contents(checkpoint, verified_contents)
1870 .expect("store operation should not fail");
1871 return Some(contents);
1872 }
1873 }
1874 debug!("no peers had checkpoint contents");
1875 None
1876}
1877
1878async fn update_checkpoint_watermark_metrics<S>(
1879 mut recv: oneshot::Receiver<()>,
1880 store: S,
1881 metrics: Metrics,
1882) -> Result<()>
1883where
1884 S: WriteStore + Clone + Send + Sync,
1885{
1886 let mut interval = tokio::time::interval(Duration::from_secs(5));
1887 loop {
1888 tokio::select! {
1889 _now = interval.tick() => {
1890 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1891 .expect("store operation should not fail");
1892 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1893 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1894 .expect("store operation should not fail");
1895 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1896 },
1897 _ = &mut recv => break,
1898 }
1899 }
1900 Ok(())
1901}