1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::Duration,
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::{Pipe, TapFallible, TapOptional};
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119 self.sender
120 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121 .await
122 .unwrap()
123 }
124
125 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127 self.checkpoint_event_sender.subscribe()
128 }
129}
130
131struct PeerHeights {
132 peers: HashMap<PeerId, PeerStateSyncInfo>,
134 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
135 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
136
137 wait_interval_when_no_peer_to_sync_content: Duration,
139}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq)]
142struct PeerStateSyncInfo {
143 genesis_checkpoint_digest: CheckpointDigest,
145 on_same_chain_as_us: bool,
147 height: CheckpointSequenceNumber,
149 lowest: CheckpointSequenceNumber,
152}
153
154impl PeerHeights {
155 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
156 self.peers
157 .values()
158 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
159 .max()
160 }
161
162 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
163 self.peers
164 .iter()
165 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
166 }
167
168 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
173 pub fn update_peer_info(
174 &mut self,
175 peer_id: PeerId,
176 checkpoint: Checkpoint,
177 low_watermark: Option<CheckpointSequenceNumber>,
178 ) -> bool {
179 debug!("Update peer info");
180
181 let info = match self.peers.get_mut(&peer_id) {
182 Some(info) if info.on_same_chain_as_us => info,
183 _ => return false,
184 };
185
186 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
187 if let Some(low_watermark) = low_watermark {
188 info.lowest = low_watermark;
189 }
190 self.insert_checkpoint(checkpoint);
191
192 true
193 }
194
195 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
196 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
197 use std::collections::hash_map::Entry;
198 debug!("Insert peer info");
199
200 match self.peers.entry(peer_id) {
201 Entry::Occupied(mut entry) => {
202 let entry = entry.get_mut();
205 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
206 entry.height = std::cmp::max(entry.height, info.height);
207 } else {
208 *entry = info;
209 }
210 }
211 Entry::Vacant(entry) => {
212 entry.insert(info);
213 }
214 }
215 }
216
217 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
218 if let Some(info) = self.peers.get_mut(&peer_id) {
219 info.on_same_chain_as_us = false;
220 }
221 }
222
223 pub fn update_peer_height(
226 &mut self,
227 peer_id: PeerId,
228 height: CheckpointSequenceNumber,
229 low_watermark: Option<CheckpointSequenceNumber>,
230 ) -> bool {
231 let info = match self.peers.get_mut(&peer_id) {
232 Some(info) if info.on_same_chain_as_us => info,
233 _ => return false,
234 };
235
236 info.height = std::cmp::max(height, info.height);
237 if let Some(low_watermark) = low_watermark {
238 info.lowest = low_watermark;
239 }
240
241 true
242 }
243
244 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
245 self.unprocessed_checkpoints
246 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
247 self.sequence_number_to_digest
248 .retain(|&s, _digest| s > sequence_number);
249 }
250
251 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
256 let digest = *checkpoint.digest();
257 let sequence_number = *checkpoint.sequence_number();
258
259 if let Some(existing_digest) = self.sequence_number_to_digest.get(&sequence_number) {
261 if *existing_digest == digest {
262 return;
264 }
265 tracing::info!(
266 ?sequence_number,
267 ?existing_digest,
268 ?digest,
269 "received checkpoint with same sequence number but different digest, dropping new checkpoint"
270 );
271 return;
272 }
273
274 self.unprocessed_checkpoints.insert(digest, checkpoint);
275 self.sequence_number_to_digest
276 .insert(sequence_number, digest);
277 }
278
279 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
280 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
281 self.sequence_number_to_digest
282 .remove(checkpoint.sequence_number());
283 }
284 }
285
286 pub fn get_checkpoint_by_sequence_number(
287 &self,
288 sequence_number: CheckpointSequenceNumber,
289 ) -> Option<&Checkpoint> {
290 self.sequence_number_to_digest
291 .get(&sequence_number)
292 .and_then(|digest| self.get_checkpoint_by_digest(digest))
293 }
294
295 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
296 self.unprocessed_checkpoints.get(digest)
297 }
298
299 #[cfg(test)]
300 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
301 self.wait_interval_when_no_peer_to_sync_content = duration;
302 }
303
304 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
305 self.wait_interval_when_no_peer_to_sync_content
306 }
307}
308
309#[derive(Clone)]
311struct PeerBalancer {
312 peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
313 requested_checkpoint: Option<CheckpointSequenceNumber>,
314 request_type: PeerCheckpointRequestType,
315}
316
317#[derive(Clone)]
318enum PeerCheckpointRequestType {
319 Summary,
320 Content,
321}
322
323impl PeerBalancer {
324 pub fn new(
325 network: &anemo::Network,
326 peer_heights: Arc<RwLock<PeerHeights>>,
327 request_type: PeerCheckpointRequestType,
328 ) -> Self {
329 let mut peers: Vec<_> = peer_heights
330 .read()
331 .unwrap()
332 .peers_on_same_chain()
333 .filter_map(|(peer_id, info)| {
335 network
336 .peer(*peer_id)
337 .map(|peer| (peer.connection_rtt(), peer, *info))
338 })
339 .collect();
340 peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
341 Self {
342 peers: peers
343 .into_iter()
344 .map(|(_, peer, info)| (peer, info))
345 .collect(),
346 requested_checkpoint: None,
347 request_type,
348 }
349 }
350
351 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
352 self.requested_checkpoint = Some(checkpoint);
353 self
354 }
355}
356
357impl Iterator for PeerBalancer {
358 type Item = StateSyncClient<anemo::Peer>;
359
360 fn next(&mut self) -> Option<Self::Item> {
361 while !self.peers.is_empty() {
362 const SELECTION_WINDOW: usize = 2;
363 let idx =
364 rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
365 let (peer, info) = self.peers.remove(idx).unwrap();
366 let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
367 match &self.request_type {
368 PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
370 return Some(StateSyncClient::new(peer));
371 }
372 PeerCheckpointRequestType::Content
373 if info.height >= requested_checkpoint
374 && info.lowest <= requested_checkpoint =>
375 {
376 return Some(StateSyncClient::new(peer));
377 }
378 _ => {}
379 }
380 }
381 None
382 }
383}
384
385#[derive(Clone, Debug)]
386enum StateSyncMessage {
387 StartSyncJob,
388 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
391 SyncedCheckpoint(Box<VerifiedCheckpoint>),
395}
396
397struct StateSyncEventLoop<S> {
398 config: StateSyncConfig,
399
400 mailbox: mpsc::Receiver<StateSyncMessage>,
401 weak_sender: mpsc::WeakSender<StateSyncMessage>,
403
404 tasks: JoinSet<()>,
405 sync_checkpoint_summaries_task: Option<AbortHandle>,
406 sync_checkpoint_contents_task: Option<AbortHandle>,
407 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
408
409 store: S,
410 peer_heights: Arc<RwLock<PeerHeights>>,
411 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
412 network: anemo::Network,
413 metrics: Metrics,
414
415 sync_checkpoint_from_archive_task: Option<AbortHandle>,
416 archive_config: Option<ArchiveReaderConfig>,
417}
418
419impl<S> StateSyncEventLoop<S>
420where
421 S: WriteStore + Clone + Send + Sync + 'static,
422{
423 pub async fn start(mut self) {
428 info!("State-Synchronizer started");
429
430 self.config.pinned_checkpoints.sort();
431
432 let mut interval = tokio::time::interval(self.config.interval_period());
433 let mut peer_events = {
434 let (subscriber, peers) = self.network.subscribe().unwrap();
435 for peer_id in peers {
436 self.spawn_get_latest_from_peer(peer_id);
437 }
438 subscriber
439 };
440 let (
441 target_checkpoint_contents_sequence_sender,
442 target_checkpoint_contents_sequence_receiver,
443 ) = watch::channel(0);
444
445 let (_sender, receiver) = oneshot::channel();
447 tokio::spawn(update_checkpoint_watermark_metrics(
448 receiver,
449 self.store.clone(),
450 self.metrics.clone(),
451 ));
452
453 let task = sync_checkpoint_contents(
455 self.network.clone(),
456 self.store.clone(),
457 self.peer_heights.clone(),
458 self.weak_sender.clone(),
459 self.checkpoint_event_sender.clone(),
460 self.config.checkpoint_content_download_concurrency(),
461 self.config.checkpoint_content_download_tx_concurrency(),
462 self.config.use_get_checkpoint_contents_v2(),
463 self.config.checkpoint_content_timeout(),
464 target_checkpoint_contents_sequence_receiver,
465 );
466 let task_handle = self.tasks.spawn(task);
467 self.sync_checkpoint_contents_task = Some(task_handle);
468
469 let task = sync_checkpoint_contents_from_archive(
477 self.network.clone(),
478 self.archive_config.clone(),
479 self.store.clone(),
480 self.peer_heights.clone(),
481 self.metrics.clone(),
482 );
483 let task_handle = self.tasks.spawn(task);
484 self.sync_checkpoint_from_archive_task = Some(task_handle);
485
486 loop {
488 tokio::select! {
489 now = interval.tick() => {
490 self.handle_tick(now.into_std());
491 },
492 maybe_message = self.mailbox.recv() => {
493 if let Some(message) = maybe_message {
496 self.handle_message(message);
497 } else {
498 break;
499 }
500 },
501 peer_event = peer_events.recv() => {
502 self.handle_peer_event(peer_event);
503 },
504 Some(task_result) = self.tasks.join_next() => {
505 match task_result {
506 Ok(()) => {},
507 Err(e) => {
508 if e.is_cancelled() {
509 } else if e.is_panic() {
511 std::panic::resume_unwind(e.into_panic());
513 } else {
514 panic!("task failed: {e}");
515 }
516 },
517 };
518
519 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
520 panic!("sync_checkpoint_contents task unexpectedly terminated")
521 }
522
523 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
524 self.sync_checkpoint_summaries_task = None;
525 }
526
527 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
528 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
529 }
530 },
531 }
532
533 self.maybe_start_checkpoint_summary_sync_task();
534 self.maybe_trigger_checkpoint_contents_sync_task(
535 &target_checkpoint_contents_sequence_sender,
536 );
537 }
538
539 info!("State-Synchronizer ended");
540 }
541
542 fn handle_message(&mut self, message: StateSyncMessage) {
543 debug!("Received message: {:?}", message);
544 match message {
545 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
546 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
547 self.handle_checkpoint_from_consensus(checkpoint)
548 }
549 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
551 self.spawn_notify_peers_of_checkpoint(*checkpoint)
552 }
553 }
554 }
555
556 #[instrument(level = "debug", skip_all)]
558 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
559 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
562 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
563 .digest();
564 if checkpoint.previous_digest != Some(prev_digest) {
565 panic!(
566 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
567 checkpoint.sequence_number(),
568 Some(prev_digest),
569 checkpoint.previous_digest
570 );
571 }
572
573 let latest_checkpoint = self
574 .store
575 .get_highest_verified_checkpoint()
576 .expect("store operation should not fail");
577
578 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
580 return;
581 }
582
583 let checkpoint = *checkpoint;
584 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
585 if *checkpoint.sequence_number() > next_sequence_number {
586 debug!(
587 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
588 next_sequence_number,
589 checkpoint.sequence_number()
590 );
591 }
592
593 #[cfg(debug_assertions)]
596 {
597 let _ = (next_sequence_number..=*checkpoint.sequence_number())
598 .map(|n| {
599 let checkpoint = self
600 .store
601 .get_checkpoint_by_sequence_number(n)
602 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
603 self.store
604 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
605 .unwrap_or_else(|| {
606 panic!(
607 "store should contain checkpoint contents for {:?}",
608 checkpoint.content_digest
609 )
610 });
611 })
612 .collect::<Vec<_>>();
613 }
614
615 if let Some(EndOfEpochData {
622 next_epoch_committee,
623 ..
624 }) = checkpoint.end_of_epoch_data.as_ref()
625 {
626 let next_committee = next_epoch_committee.iter().cloned().collect();
627 let committee =
628 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
629 self.store
630 .insert_committee(committee)
631 .expect("store operation should not fail");
632 }
633
634 self.store
635 .update_highest_verified_checkpoint(&checkpoint)
636 .expect("store operation should not fail");
637 self.store
638 .update_highest_synced_checkpoint(&checkpoint)
639 .expect("store operation should not fail");
640
641 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
643
644 self.spawn_notify_peers_of_checkpoint(checkpoint);
645 }
646
647 fn handle_peer_event(
648 &mut self,
649 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
650 ) {
651 use tokio::sync::broadcast::error::RecvError;
652
653 match peer_event {
654 Ok(PeerEvent::NewPeer(peer_id)) => {
655 self.spawn_get_latest_from_peer(peer_id);
656 }
657 Ok(PeerEvent::LostPeer(peer_id, _)) => {
658 self.peer_heights.write().unwrap().peers.remove(&peer_id);
659 }
660
661 Err(RecvError::Closed) => {
662 panic!("PeerEvent channel shouldn't be able to be closed");
663 }
664
665 Err(RecvError::Lagged(_)) => {
666 trace!("State-Sync fell behind processing PeerEvents");
667 }
668 }
669 }
670
671 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
672 if let Some(peer) = self.network.peer(peer_id) {
673 let genesis_checkpoint_digest = *self
674 .store
675 .get_checkpoint_by_sequence_number(0)
676 .expect("store should contain genesis checkpoint")
677 .digest();
678 let task = get_latest_from_peer(
679 genesis_checkpoint_digest,
680 peer,
681 self.peer_heights.clone(),
682 self.config.timeout(),
683 );
684 self.tasks.spawn(task);
685 }
686 }
687
688 fn handle_tick(&mut self, _now: std::time::Instant) {
689 let task = query_peers_for_their_latest_checkpoint(
690 self.network.clone(),
691 self.peer_heights.clone(),
692 self.weak_sender.clone(),
693 self.config.timeout(),
694 );
695 self.tasks.spawn(task);
696
697 if let Some(layer) = self.download_limit_layer.as_ref() {
698 layer.maybe_prune_map();
699 }
700 }
701
702 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
703 if self.sync_checkpoint_summaries_task.is_some() {
705 return;
706 }
707
708 let highest_processed_checkpoint = self
709 .store
710 .get_highest_verified_checkpoint()
711 .expect("store operation should not fail");
712
713 let highest_known_sequence_number = self
714 .peer_heights
715 .read()
716 .unwrap()
717 .highest_known_checkpoint_sequence_number();
718
719 if let Some(target_seq) = highest_known_sequence_number
720 && *highest_processed_checkpoint.sequence_number() < target_seq
721 {
722 let max_batch_size = self.config.max_checkpoint_sync_batch_size();
724 let limited_target = std::cmp::min(
725 target_seq,
726 highest_processed_checkpoint
727 .sequence_number()
728 .saturating_add(max_batch_size),
729 );
730 let was_limited = limited_target < target_seq;
731
732 let weak_sender = self.weak_sender.clone();
734 let task = sync_to_checkpoint(
735 self.network.clone(),
736 self.store.clone(),
737 self.peer_heights.clone(),
738 self.metrics.clone(),
739 self.config.pinned_checkpoints.clone(),
740 self.config.checkpoint_header_download_concurrency(),
741 self.config.timeout(),
742 limited_target,
743 )
744 .map(move |result| match result {
745 Ok(()) => {
746 if was_limited && let Some(sender) = weak_sender.upgrade() {
749 let _ = sender.try_send(StateSyncMessage::StartSyncJob);
750 }
751 }
752 Err(e) => {
753 debug!("error syncing checkpoint {e}");
754 }
755 });
756 let task_handle = self.tasks.spawn(task);
757 self.sync_checkpoint_summaries_task = Some(task_handle);
758 }
759 }
760
761 fn maybe_trigger_checkpoint_contents_sync_task(
762 &mut self,
763 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
764 ) {
765 let highest_verified_checkpoint = self
766 .store
767 .get_highest_verified_checkpoint()
768 .expect("store operation should not fail");
769 let highest_synced_checkpoint = self
770 .store
771 .get_highest_synced_checkpoint()
772 .expect("store operation should not fail");
773
774 if highest_verified_checkpoint.sequence_number()
775 > highest_synced_checkpoint.sequence_number()
776 && self
778 .peer_heights
779 .read()
780 .unwrap()
781 .highest_known_checkpoint_sequence_number()
782 > Some(*highest_synced_checkpoint.sequence_number())
783 {
784 let _ = target_sequence_channel.send_if_modified(|num| {
785 let new_num = *highest_verified_checkpoint.sequence_number();
786 if *num == new_num {
787 return false;
788 }
789 *num = new_num;
790 true
791 });
792 }
793 }
794
795 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
796 let task = notify_peers_of_checkpoint(
797 self.network.clone(),
798 self.peer_heights.clone(),
799 checkpoint,
800 self.config.timeout(),
801 );
802 self.tasks.spawn(task);
803 }
804}
805
806async fn notify_peers_of_checkpoint(
807 network: anemo::Network,
808 peer_heights: Arc<RwLock<PeerHeights>>,
809 checkpoint: VerifiedCheckpoint,
810 timeout: Duration,
811) {
812 let futs = peer_heights
813 .read()
814 .unwrap()
815 .peers_on_same_chain()
816 .filter_map(|(peer_id, info)| {
818 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
819 })
820 .flat_map(|peer_id| network.peer(*peer_id))
822 .map(StateSyncClient::new)
823 .map(|mut client| {
824 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
825 async move { client.push_checkpoint_summary(request).await }
826 })
827 .collect::<Vec<_>>();
828 futures::future::join_all(futs).await;
829}
830
831async fn get_latest_from_peer(
832 our_genesis_checkpoint_digest: CheckpointDigest,
833 peer: anemo::Peer,
834 peer_heights: Arc<RwLock<PeerHeights>>,
835 timeout: Duration,
836) {
837 let peer_id = peer.peer_id();
838 let mut client = StateSyncClient::new(peer);
839
840 let info = {
841 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
842
843 if let Some(info) = maybe_info {
844 info
845 } else {
846 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
851 .with_timeout(timeout);
852 let response = client
853 .get_checkpoint_summary(request)
854 .await
855 .map(Response::into_inner);
856
857 let info = match response {
858 Ok(Some(checkpoint)) => {
859 let digest = *checkpoint.digest();
860 PeerStateSyncInfo {
861 genesis_checkpoint_digest: digest,
862 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
863 height: *checkpoint.sequence_number(),
864 lowest: CheckpointSequenceNumber::default(),
865 }
866 }
867 Ok(None) => PeerStateSyncInfo {
868 genesis_checkpoint_digest: CheckpointDigest::default(),
869 on_same_chain_as_us: false,
870 height: CheckpointSequenceNumber::default(),
871 lowest: CheckpointSequenceNumber::default(),
872 },
873 Err(status) => {
874 trace!("get_latest_checkpoint_summary request failed: {status:?}");
875 return;
876 }
877 };
878 peer_heights
879 .write()
880 .unwrap()
881 .insert_peer_info(peer_id, info);
882 info
883 }
884 };
885
886 if !info.on_same_chain_as_us {
888 trace!(?info, "Peer {peer_id} not on same chain as us");
889 return;
890 }
891 let Some((highest_checkpoint, low_watermark)) =
892 query_peer_for_latest_info(&mut client, timeout).await
893 else {
894 return;
895 };
896 peer_heights
897 .write()
898 .unwrap()
899 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
900}
901
902async fn query_peer_for_latest_info(
904 client: &mut StateSyncClient<anemo::Peer>,
905 timeout: Duration,
906) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
907 let request = Request::new(()).with_timeout(timeout);
908 let response = client
909 .get_checkpoint_availability(request)
910 .await
911 .map(Response::into_inner);
912 match response {
913 Ok(GetCheckpointAvailabilityResponse {
914 highest_synced_checkpoint,
915 lowest_available_checkpoint,
916 }) => {
917 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
918 }
919 Err(status) => {
920 if status.status() != anemo::types::response::StatusCode::NotFound {
922 trace!("get_checkpoint_availability request failed: {status:?}");
923 return None;
924 }
925 }
926 };
927
928 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
931 let response = client
932 .get_checkpoint_summary(request)
933 .await
934 .map(Response::into_inner);
935 match response {
936 Ok(Some(checkpoint)) => Some((checkpoint, None)),
937 Ok(None) => None,
938 Err(status) => {
939 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
940 None
941 }
942 }
943}
944
945#[instrument(level = "debug", skip_all)]
946async fn query_peers_for_their_latest_checkpoint(
947 network: anemo::Network,
948 peer_heights: Arc<RwLock<PeerHeights>>,
949 sender: mpsc::WeakSender<StateSyncMessage>,
950 timeout: Duration,
951) {
952 let peer_heights = &peer_heights;
953 let futs = peer_heights
954 .read()
955 .unwrap()
956 .peers_on_same_chain()
957 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
959 .map(|peer| {
960 let peer_id = peer.peer_id();
961 let mut client = StateSyncClient::new(peer);
962
963 async move {
964 let response = query_peer_for_latest_info(&mut client, timeout).await;
965 match response {
966 Some((highest_checkpoint, low_watermark)) => peer_heights
967 .write()
968 .unwrap()
969 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
970 .then_some(highest_checkpoint),
971 None => None,
972 }
973 }
974 })
975 .collect::<Vec<_>>();
976
977 debug!("Query {} peers for latest checkpoint", futs.len());
978
979 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
980
981 let highest_checkpoint_seq = checkpoints
982 .map(|checkpoint| *checkpoint.sequence_number())
983 .max();
984
985 let our_highest_seq = peer_heights
986 .read()
987 .unwrap()
988 .highest_known_checkpoint_sequence_number();
989
990 debug!(
991 "Our highest checkpoint {our_highest_seq:?}, peers' highest checkpoint {highest_checkpoint_seq:?}"
992 );
993
994 let _new_checkpoint = match (highest_checkpoint_seq, our_highest_seq) {
995 (Some(theirs), None) => theirs,
996 (Some(theirs), Some(ours)) if theirs > ours => theirs,
997 _ => return,
998 };
999
1000 if let Some(sender) = sender.upgrade() {
1001 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1002 }
1003}
1004
1005async fn sync_to_checkpoint<S>(
1006 network: anemo::Network,
1007 store: S,
1008 peer_heights: Arc<RwLock<PeerHeights>>,
1009 metrics: Metrics,
1010 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1011 checkpoint_header_download_concurrency: usize,
1012 timeout: Duration,
1013 target_sequence_number: CheckpointSequenceNumber,
1014) -> Result<()>
1015where
1016 S: WriteStore,
1017{
1018 metrics.set_highest_known_checkpoint(target_sequence_number);
1019
1020 let mut current = store
1021 .get_highest_verified_checkpoint()
1022 .expect("store operation should not fail");
1023 if *current.sequence_number() >= target_sequence_number {
1024 return Err(anyhow::anyhow!(
1025 "target checkpoint {} is older than highest verified checkpoint {}",
1026 target_sequence_number,
1027 current.sequence_number(),
1028 ));
1029 }
1030
1031 let peer_balancer = PeerBalancer::new(
1032 &network,
1033 peer_heights.clone(),
1034 PeerCheckpointRequestType::Summary,
1035 );
1036 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1038 ..=target_sequence_number)
1039 .map(|next| {
1040 let peers = peer_balancer.clone().with_checkpoint(next);
1041 let peer_heights = peer_heights.clone();
1042 let pinned_checkpoints = &pinned_checkpoints;
1043 async move {
1044 if let Some(checkpoint) = peer_heights
1045 .read()
1046 .unwrap()
1047 .get_checkpoint_by_sequence_number(next)
1048 {
1049 return (Some(checkpoint.to_owned()), next, None);
1050 }
1051
1052 for mut peer in peers {
1055 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1056 .with_timeout(timeout);
1057 if let Some(checkpoint) = peer
1058 .get_checkpoint_summary(request)
1059 .await
1060 .tap_err(|e| trace!("{e:?}"))
1061 .ok()
1062 .and_then(Response::into_inner)
1063 .tap_none(|| trace!("peer unable to help sync"))
1064 {
1065 if *checkpoint.sequence_number() != next {
1067 tracing::debug!(
1068 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1069 checkpoint.sequence_number()
1070 );
1071 peer_heights.write().unwrap().mark_peer_as_not_on_same_chain(peer.inner().peer_id());
1072 continue;
1073 }
1074
1075 let checkpoint_digest = checkpoint.digest();
1077 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1078 checkpoint.sequence_number(),
1079 |(seq_num, _digest)| *seq_num
1080 )
1081 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1082 tracing::debug!(
1083 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1084 pinned_checkpoints[pinned_digest_index].1,
1085 checkpoint_digest
1086 );
1087 continue;
1088 }
1089
1090 peer_heights
1092 .write()
1093 .unwrap()
1094 .insert_checkpoint(checkpoint.clone());
1095 return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1096 }
1097 }
1098 (None, next, None)
1099 }
1100 })
1101 .pipe(futures::stream::iter)
1102 .buffered(checkpoint_header_download_concurrency);
1103
1104 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1105 assert_eq!(
1106 current
1107 .sequence_number()
1108 .checked_add(1)
1109 .expect("exhausted u64"),
1110 next
1111 );
1112
1113 let checkpoint = 'cp: {
1115 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1116 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1117 })?;
1118 if pinned_checkpoints
1120 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1121 .is_ok()
1122 {
1123 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1124 }
1125 match verify_checkpoint(¤t, &store, checkpoint) {
1126 Ok(verified_checkpoint) => verified_checkpoint,
1127 Err(checkpoint) => {
1128 let mut peer_heights = peer_heights.write().unwrap();
1129 peer_heights.remove_checkpoint(checkpoint.digest());
1132
1133 if let Some(peer_id) = maybe_peer_id {
1135 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1136 }
1137
1138 return Err(anyhow::anyhow!(
1139 "unable to verify checkpoint {checkpoint:?}"
1140 ));
1141 }
1142 }
1143 };
1144
1145 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1146 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1147 metrics.checkpoint_summary_age_metrics()
1148 {
1149 checkpoint.report_checkpoint_age(
1150 checkpoint_summary_age_metric,
1151 checkpoint_summary_age_metric_deprecated,
1152 );
1153 }
1154
1155 store
1158 .insert_checkpoint(&checkpoint)
1159 .expect("store operation should not fail");
1160
1161 current = checkpoint;
1162 }
1163
1164 peer_heights
1165 .write()
1166 .unwrap()
1167 .cleanup_old_checkpoints(*current.sequence_number());
1168
1169 Ok(())
1170}
1171
1172async fn sync_checkpoint_contents_from_archive<S>(
1173 network: anemo::Network,
1174 archive_config: Option<ArchiveReaderConfig>,
1175 store: S,
1176 peer_heights: Arc<RwLock<PeerHeights>>,
1177 metrics: Metrics,
1178) where
1179 S: WriteStore + Clone + Send + Sync + 'static,
1180{
1181 loop {
1182 sync_checkpoint_contents_from_archive_iteration(
1183 &network,
1184 &archive_config,
1185 store.clone(),
1186 peer_heights.clone(),
1187 metrics.clone(),
1188 )
1189 .await;
1190 tokio::time::sleep(Duration::from_secs(5)).await;
1191 }
1192}
1193
1194async fn sync_checkpoint_contents_from_archive_iteration<S>(
1195 network: &anemo::Network,
1196 archive_config: &Option<ArchiveReaderConfig>,
1197 store: S,
1198 peer_heights: Arc<RwLock<PeerHeights>>,
1199 metrics: Metrics,
1200) where
1201 S: WriteStore + Clone + Send + Sync + 'static,
1202{
1203 let peers: Vec<_> = peer_heights
1204 .read()
1205 .unwrap()
1206 .peers_on_same_chain()
1207 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1209 .collect();
1210 let lowest_checkpoint_on_peers = peers
1211 .iter()
1212 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1213 .min();
1214 let highest_synced = store
1215 .get_highest_synced_checkpoint()
1216 .expect("store operation should not fail")
1217 .sequence_number;
1218 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1219 highest_synced < lowest_checkpoint_on_peers
1220 } else {
1221 false
1222 };
1223 debug!(
1224 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1225 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1226 );
1227 if sync_from_archive {
1228 let start = highest_synced
1229 .checked_add(1)
1230 .expect("Checkpoint seq num overflow");
1231 let end = lowest_checkpoint_on_peers.unwrap();
1232
1233 let Some(archive_config) = archive_config else {
1234 warn!("Failed to find an archive reader to complete the state sync request");
1235 return;
1236 };
1237 let Some(ingestion_url) = &archive_config.ingestion_url else {
1238 warn!("Archival ingestion url for state sync is not configured");
1239 return;
1240 };
1241 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1242 warn!("{} can't be used as an archival fallback", ingestion_url);
1243 return;
1244 }
1245 let reader_options = ReaderOptions {
1246 batch_size: archive_config.download_concurrency.into(),
1247 upper_limit: Some(end),
1248 ..Default::default()
1249 };
1250 let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1251 StateSyncWorker(store, metrics),
1252 ingestion_url.clone(),
1253 archive_config.remote_store_options.clone(),
1254 start,
1255 1,
1256 Some(reader_options),
1257 )
1258 .await
1259 else {
1260 return;
1261 };
1262 match executor.await {
1263 Ok(_) => info!(
1264 "State sync from archive is complete. Checkpoints downloaded = {:?}",
1265 end - start
1266 ),
1267 Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1268 }
1269 }
1270}
1271
1272async fn sync_checkpoint_contents<S>(
1273 network: anemo::Network,
1274 store: S,
1275 peer_heights: Arc<RwLock<PeerHeights>>,
1276 sender: mpsc::WeakSender<StateSyncMessage>,
1277 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1278 checkpoint_content_download_concurrency: usize,
1279 checkpoint_content_download_tx_concurrency: u64,
1280 use_get_checkpoint_contents_v2: bool,
1281 timeout: Duration,
1282 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1283) where
1284 S: WriteStore + Clone,
1285{
1286 let mut highest_synced = store
1287 .get_highest_synced_checkpoint()
1288 .expect("store operation should not fail");
1289
1290 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1291 let mut target_sequence_cursor = 0;
1292 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1293 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1294
1295 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1296
1297 loop {
1298 tokio::select! {
1299 result = target_sequence_channel.changed() => {
1300 match result {
1301 Ok(()) => {
1302 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1303 }
1304 Err(_) => {
1305 return
1307 }
1308 }
1309 },
1310 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1311 match maybe_checkpoint {
1312 Ok(checkpoint) => {
1313 let _: &VerifiedCheckpoint = &checkpoint; store
1316 .update_highest_synced_checkpoint(&checkpoint)
1317 .expect("store operation should not fail");
1318 let _ = checkpoint_event_sender.send(checkpoint.clone());
1320 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1321 highest_synced = checkpoint;
1322
1323 }
1324 Err(checkpoint) => {
1325 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1327 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1328 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1329 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1330 }
1331 } else {
1332 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1333
1334 }
1335 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1337 network.clone(),
1338 &store,
1339 peer_heights.clone(),
1340 use_get_checkpoint_contents_v2,
1341 timeout,
1342 checkpoint,
1343 ));
1344 }
1345 }
1346 },
1347 }
1348
1349 while current_sequence < target_sequence_cursor
1351 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1352 {
1353 let next_checkpoint = store
1354 .get_checkpoint_by_sequence_number(current_sequence)
1355 .unwrap_or_else(|| panic!(
1356 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1357 current_sequence
1358 ));
1359
1360 let tx_count = next_checkpoint.network_total_transactions
1362 - highest_started_network_total_transactions;
1363 if tx_count > tx_concurrency_remaining {
1364 break;
1365 }
1366 tx_concurrency_remaining -= tx_count;
1367
1368 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1369 current_sequence += 1;
1370 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1371 network.clone(),
1372 &store,
1373 peer_heights.clone(),
1374 use_get_checkpoint_contents_v2,
1375 timeout,
1376 next_checkpoint,
1377 ));
1378 }
1379
1380 if highest_synced
1381 .sequence_number()
1382 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1383 || checkpoint_contents_tasks.is_empty()
1384 {
1385 if let Some(sender) = sender.upgrade() {
1387 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1388 let _ = sender.send(message).await;
1389 }
1390 }
1391 }
1392}
1393
1394#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1395async fn sync_one_checkpoint_contents<S>(
1396 network: anemo::Network,
1397 store: S,
1398 peer_heights: Arc<RwLock<PeerHeights>>,
1399 use_get_checkpoint_contents_v2: bool,
1400 timeout: Duration,
1401 checkpoint: VerifiedCheckpoint,
1402) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1403where
1404 S: WriteStore + Clone,
1405{
1406 debug!("syncing checkpoint contents");
1407
1408 if store
1411 .get_highest_synced_checkpoint()
1412 .expect("store operation should not fail")
1413 .sequence_number()
1414 >= checkpoint.sequence_number()
1415 {
1416 debug!("checkpoint was already created via consensus output");
1417 return Ok(checkpoint);
1418 }
1419
1420 let peers = PeerBalancer::new(
1422 &network,
1423 peer_heights.clone(),
1424 PeerCheckpointRequestType::Content,
1425 )
1426 .with_checkpoint(*checkpoint.sequence_number());
1427 let now = tokio::time::Instant::now();
1428 let Some(_contents) = get_full_checkpoint_contents(
1429 peers,
1430 &store,
1431 &checkpoint,
1432 use_get_checkpoint_contents_v2,
1433 timeout,
1434 )
1435 .await
1436 else {
1437 let duration = peer_heights
1439 .read()
1440 .unwrap()
1441 .wait_interval_when_no_peer_to_sync_content();
1442 if now.elapsed() < duration {
1443 let duration = duration - now.elapsed();
1444 info!("retrying checkpoint sync after {:?}", duration);
1445 tokio::time::sleep(duration).await;
1446 }
1447 return Err(checkpoint);
1448 };
1449 debug!("completed checkpoint contents sync");
1450 Ok(checkpoint)
1451}
1452
1453#[instrument(level = "debug", skip_all)]
1454async fn get_full_checkpoint_contents<S>(
1455 peers: PeerBalancer,
1456 store: S,
1457 checkpoint: &VerifiedCheckpoint,
1458 use_get_checkpoint_contents_v2: bool,
1459 timeout: Duration,
1460) -> Option<VersionedFullCheckpointContents>
1461where
1462 S: WriteStore,
1463{
1464 let sequence_number = checkpoint.sequence_number;
1465 let digest = checkpoint.content_digest;
1466 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1467 debug!("store already contains checkpoint contents");
1468 return Some(contents);
1469 }
1470
1471 for mut peer in peers {
1474 debug!(
1475 ?timeout,
1476 "requesting checkpoint contents from {}",
1477 peer.inner().peer_id(),
1478 );
1479 let request = Request::new(digest).with_timeout(timeout);
1480 let contents = if use_get_checkpoint_contents_v2 {
1481 peer.get_checkpoint_contents_v2(request)
1482 .await
1483 .tap_err(|e| trace!("{e:?}"))
1484 .ok()
1485 .and_then(Response::into_inner)
1486 } else {
1487 peer.get_checkpoint_contents(request)
1488 .await
1489 .tap_err(|e| trace!("{e:?}"))
1490 .ok()
1491 .and_then(Response::into_inner)
1492 .map(VersionedFullCheckpointContents::V1)
1493 };
1494 if let Some(contents) = contents.tap_none(|| trace!("peer unable to help sync"))
1495 && contents.verify_digests(digest).is_ok()
1496 {
1497 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1498 store
1499 .insert_checkpoint_contents(checkpoint, verified_contents)
1500 .expect("store operation should not fail");
1501 return Some(contents);
1502 }
1503 }
1504 debug!("no peers had checkpoint contents");
1505 None
1506}
1507
1508async fn update_checkpoint_watermark_metrics<S>(
1509 mut recv: oneshot::Receiver<()>,
1510 store: S,
1511 metrics: Metrics,
1512) -> Result<()>
1513where
1514 S: WriteStore + Clone + Send + Sync,
1515{
1516 let mut interval = tokio::time::interval(Duration::from_secs(5));
1517 loop {
1518 tokio::select! {
1519 _now = interval.tick() => {
1520 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1521 .expect("store operation should not fail");
1522 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1523 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1524 .expect("store operation should not fail");
1525 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1526 },
1527 _ = &mut recv => break,
1528 }
1529 }
1530 Ok(())
1531}