1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::Duration,
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::{Pipe, TapFallible, TapOptional};
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119 self.sender
120 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121 .await
122 .unwrap()
123 }
124
125 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127 self.checkpoint_event_sender.subscribe()
128 }
129}
130
131struct PeerHeights {
132 peers: HashMap<PeerId, PeerStateSyncInfo>,
134 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
135 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
136
137 wait_interval_when_no_peer_to_sync_content: Duration,
139}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq)]
142struct PeerStateSyncInfo {
143 genesis_checkpoint_digest: CheckpointDigest,
145 on_same_chain_as_us: bool,
147 height: CheckpointSequenceNumber,
149 lowest: CheckpointSequenceNumber,
152}
153
154impl PeerHeights {
155 pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
156 self.highest_known_checkpoint_sequence_number()
157 .and_then(|s| self.sequence_number_to_digest.get(&s))
158 .and_then(|digest| self.unprocessed_checkpoints.get(digest))
159 }
160
161 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
162 self.peers
163 .values()
164 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
165 .max()
166 }
167
168 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
169 self.peers
170 .iter()
171 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
172 }
173
174 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
179 pub fn update_peer_info(
180 &mut self,
181 peer_id: PeerId,
182 checkpoint: Checkpoint,
183 low_watermark: Option<CheckpointSequenceNumber>,
184 ) -> bool {
185 debug!("Update peer info");
186
187 let info = match self.peers.get_mut(&peer_id) {
188 Some(info) if info.on_same_chain_as_us => info,
189 _ => return false,
190 };
191
192 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
193 if let Some(low_watermark) = low_watermark {
194 info.lowest = low_watermark;
195 }
196 self.insert_checkpoint(checkpoint);
197
198 true
199 }
200
201 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
202 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
203 use std::collections::hash_map::Entry;
204 debug!("Insert peer info");
205
206 match self.peers.entry(peer_id) {
207 Entry::Occupied(mut entry) => {
208 let entry = entry.get_mut();
211 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
212 entry.height = std::cmp::max(entry.height, info.height);
213 } else {
214 *entry = info;
215 }
216 }
217 Entry::Vacant(entry) => {
218 entry.insert(info);
219 }
220 }
221 }
222
223 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
224 if let Some(info) = self.peers.get_mut(&peer_id) {
225 info.on_same_chain_as_us = false;
226 }
227 }
228
229 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
230 self.unprocessed_checkpoints
231 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
232 self.sequence_number_to_digest
233 .retain(|&s, _digest| s > sequence_number);
234 }
235
236 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
238 let digest = *checkpoint.digest();
239 let sequence_number = *checkpoint.sequence_number();
240 self.unprocessed_checkpoints.insert(digest, checkpoint);
241 self.sequence_number_to_digest
242 .insert(sequence_number, digest);
243 }
244
245 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
246 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
247 self.sequence_number_to_digest
248 .remove(checkpoint.sequence_number());
249 }
250 }
251
252 pub fn get_checkpoint_by_sequence_number(
253 &self,
254 sequence_number: CheckpointSequenceNumber,
255 ) -> Option<&Checkpoint> {
256 self.sequence_number_to_digest
257 .get(&sequence_number)
258 .and_then(|digest| self.get_checkpoint_by_digest(digest))
259 }
260
261 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
262 self.unprocessed_checkpoints.get(digest)
263 }
264
265 #[cfg(test)]
266 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
267 self.wait_interval_when_no_peer_to_sync_content = duration;
268 }
269
270 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
271 self.wait_interval_when_no_peer_to_sync_content
272 }
273}
274
275#[derive(Clone)]
277struct PeerBalancer {
278 peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
279 requested_checkpoint: Option<CheckpointSequenceNumber>,
280 request_type: PeerCheckpointRequestType,
281}
282
283#[derive(Clone)]
284enum PeerCheckpointRequestType {
285 Summary,
286 Content,
287}
288
289impl PeerBalancer {
290 pub fn new(
291 network: &anemo::Network,
292 peer_heights: Arc<RwLock<PeerHeights>>,
293 request_type: PeerCheckpointRequestType,
294 ) -> Self {
295 let mut peers: Vec<_> = peer_heights
296 .read()
297 .unwrap()
298 .peers_on_same_chain()
299 .filter_map(|(peer_id, info)| {
301 network
302 .peer(*peer_id)
303 .map(|peer| (peer.connection_rtt(), peer, *info))
304 })
305 .collect();
306 peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
307 Self {
308 peers: peers
309 .into_iter()
310 .map(|(_, peer, info)| (peer, info))
311 .collect(),
312 requested_checkpoint: None,
313 request_type,
314 }
315 }
316
317 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
318 self.requested_checkpoint = Some(checkpoint);
319 self
320 }
321}
322
323impl Iterator for PeerBalancer {
324 type Item = StateSyncClient<anemo::Peer>;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 while !self.peers.is_empty() {
328 const SELECTION_WINDOW: usize = 2;
329 let idx =
330 rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
331 let (peer, info) = self.peers.remove(idx).unwrap();
332 let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
333 match &self.request_type {
334 PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
336 return Some(StateSyncClient::new(peer));
337 }
338 PeerCheckpointRequestType::Content
339 if info.height >= requested_checkpoint
340 && info.lowest <= requested_checkpoint =>
341 {
342 return Some(StateSyncClient::new(peer));
343 }
344 _ => {}
345 }
346 }
347 None
348 }
349}
350
351#[derive(Clone, Debug)]
352enum StateSyncMessage {
353 StartSyncJob,
354 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
357 SyncedCheckpoint(Box<VerifiedCheckpoint>),
361}
362
363struct StateSyncEventLoop<S> {
364 config: StateSyncConfig,
365
366 mailbox: mpsc::Receiver<StateSyncMessage>,
367 weak_sender: mpsc::WeakSender<StateSyncMessage>,
369
370 tasks: JoinSet<()>,
371 sync_checkpoint_summaries_task: Option<AbortHandle>,
372 sync_checkpoint_contents_task: Option<AbortHandle>,
373 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
374
375 store: S,
376 peer_heights: Arc<RwLock<PeerHeights>>,
377 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
378 network: anemo::Network,
379 metrics: Metrics,
380
381 sync_checkpoint_from_archive_task: Option<AbortHandle>,
382 archive_config: Option<ArchiveReaderConfig>,
383}
384
385impl<S> StateSyncEventLoop<S>
386where
387 S: WriteStore + Clone + Send + Sync + 'static,
388{
389 pub async fn start(mut self) {
394 info!("State-Synchronizer started");
395
396 self.config.pinned_checkpoints.sort();
397
398 let mut interval = tokio::time::interval(self.config.interval_period());
399 let mut peer_events = {
400 let (subscriber, peers) = self.network.subscribe().unwrap();
401 for peer_id in peers {
402 self.spawn_get_latest_from_peer(peer_id);
403 }
404 subscriber
405 };
406 let (
407 target_checkpoint_contents_sequence_sender,
408 target_checkpoint_contents_sequence_receiver,
409 ) = watch::channel(0);
410
411 let (_sender, receiver) = oneshot::channel();
413 tokio::spawn(update_checkpoint_watermark_metrics(
414 receiver,
415 self.store.clone(),
416 self.metrics.clone(),
417 ));
418
419 let task = sync_checkpoint_contents(
421 self.network.clone(),
422 self.store.clone(),
423 self.peer_heights.clone(),
424 self.weak_sender.clone(),
425 self.checkpoint_event_sender.clone(),
426 self.config.checkpoint_content_download_concurrency(),
427 self.config.checkpoint_content_download_tx_concurrency(),
428 self.config.checkpoint_content_timeout(),
429 target_checkpoint_contents_sequence_receiver,
430 );
431 let task_handle = self.tasks.spawn(task);
432 self.sync_checkpoint_contents_task = Some(task_handle);
433
434 let task = sync_checkpoint_contents_from_archive(
442 self.network.clone(),
443 self.archive_config.clone(),
444 self.store.clone(),
445 self.peer_heights.clone(),
446 self.metrics.clone(),
447 );
448 let task_handle = self.tasks.spawn(task);
449 self.sync_checkpoint_from_archive_task = Some(task_handle);
450
451 loop {
453 tokio::select! {
454 now = interval.tick() => {
455 self.handle_tick(now.into_std());
456 },
457 maybe_message = self.mailbox.recv() => {
458 if let Some(message) = maybe_message {
461 self.handle_message(message);
462 } else {
463 break;
464 }
465 },
466 peer_event = peer_events.recv() => {
467 self.handle_peer_event(peer_event);
468 },
469 Some(task_result) = self.tasks.join_next() => {
470 match task_result {
471 Ok(()) => {},
472 Err(e) => {
473 if e.is_cancelled() {
474 } else if e.is_panic() {
476 std::panic::resume_unwind(e.into_panic());
478 } else {
479 panic!("task failed: {e}");
480 }
481 },
482 };
483
484 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
485 panic!("sync_checkpoint_contents task unexpectedly terminated")
486 }
487
488 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
489 self.sync_checkpoint_summaries_task = None;
490 }
491
492 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
493 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
494 }
495 },
496 }
497
498 self.maybe_start_checkpoint_summary_sync_task();
499 self.maybe_trigger_checkpoint_contents_sync_task(
500 &target_checkpoint_contents_sequence_sender,
501 );
502 }
503
504 info!("State-Synchronizer ended");
505 }
506
507 fn handle_message(&mut self, message: StateSyncMessage) {
508 debug!("Received message: {:?}", message);
509 match message {
510 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
511 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
512 self.handle_checkpoint_from_consensus(checkpoint)
513 }
514 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
516 self.spawn_notify_peers_of_checkpoint(*checkpoint)
517 }
518 }
519 }
520
521 #[instrument(level = "debug", skip_all)]
523 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
524 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
527 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
528 .digest();
529 if checkpoint.previous_digest != Some(prev_digest) {
530 panic!(
531 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
532 checkpoint.sequence_number(),
533 Some(prev_digest),
534 checkpoint.previous_digest
535 );
536 }
537
538 let latest_checkpoint = self
539 .store
540 .get_highest_verified_checkpoint()
541 .expect("store operation should not fail");
542
543 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
545 return;
546 }
547
548 let checkpoint = *checkpoint;
549 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
550 if *checkpoint.sequence_number() > next_sequence_number {
551 debug!(
552 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
553 next_sequence_number,
554 checkpoint.sequence_number()
555 );
556 }
557
558 #[cfg(debug_assertions)]
561 {
562 let _ = (next_sequence_number..=*checkpoint.sequence_number())
563 .map(|n| {
564 let checkpoint = self
565 .store
566 .get_checkpoint_by_sequence_number(n)
567 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
568 self.store
569 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
570 .unwrap_or_else(|| {
571 panic!(
572 "store should contain checkpoint contents for {:?}",
573 checkpoint.content_digest
574 )
575 });
576 })
577 .collect::<Vec<_>>();
578 }
579
580 if let Some(EndOfEpochData {
587 next_epoch_committee,
588 ..
589 }) = checkpoint.end_of_epoch_data.as_ref()
590 {
591 let next_committee = next_epoch_committee.iter().cloned().collect();
592 let committee =
593 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
594 self.store
595 .insert_committee(committee)
596 .expect("store operation should not fail");
597 }
598
599 self.store
600 .update_highest_verified_checkpoint(&checkpoint)
601 .expect("store operation should not fail");
602 self.store
603 .update_highest_synced_checkpoint(&checkpoint)
604 .expect("store operation should not fail");
605
606 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
608
609 self.spawn_notify_peers_of_checkpoint(checkpoint);
610 }
611
612 fn handle_peer_event(
613 &mut self,
614 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
615 ) {
616 use tokio::sync::broadcast::error::RecvError;
617
618 match peer_event {
619 Ok(PeerEvent::NewPeer(peer_id)) => {
620 self.spawn_get_latest_from_peer(peer_id);
621 }
622 Ok(PeerEvent::LostPeer(peer_id, _)) => {
623 self.peer_heights.write().unwrap().peers.remove(&peer_id);
624 }
625
626 Err(RecvError::Closed) => {
627 panic!("PeerEvent channel shouldn't be able to be closed");
628 }
629
630 Err(RecvError::Lagged(_)) => {
631 trace!("State-Sync fell behind processing PeerEvents");
632 }
633 }
634 }
635
636 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
637 if let Some(peer) = self.network.peer(peer_id) {
638 let genesis_checkpoint_digest = *self
639 .store
640 .get_checkpoint_by_sequence_number(0)
641 .expect("store should contain genesis checkpoint")
642 .digest();
643 let task = get_latest_from_peer(
644 genesis_checkpoint_digest,
645 peer,
646 self.peer_heights.clone(),
647 self.config.timeout(),
648 );
649 self.tasks.spawn(task);
650 }
651 }
652
653 fn handle_tick(&mut self, _now: std::time::Instant) {
654 let task = query_peers_for_their_latest_checkpoint(
655 self.network.clone(),
656 self.peer_heights.clone(),
657 self.weak_sender.clone(),
658 self.config.timeout(),
659 );
660 self.tasks.spawn(task);
661
662 if let Some(layer) = self.download_limit_layer.as_ref() {
663 layer.maybe_prune_map();
664 }
665 }
666
667 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
668 if self.sync_checkpoint_summaries_task.is_some() {
670 return;
671 }
672
673 let highest_processed_checkpoint = self
674 .store
675 .get_highest_verified_checkpoint()
676 .expect("store operation should not fail");
677
678 let highest_known_checkpoint = self
679 .peer_heights
680 .read()
681 .unwrap()
682 .highest_known_checkpoint()
683 .cloned();
684
685 if Some(highest_processed_checkpoint.sequence_number())
686 < highest_known_checkpoint
687 .as_ref()
688 .map(|x| x.sequence_number())
689 {
690 let task = sync_to_checkpoint(
692 self.network.clone(),
693 self.store.clone(),
694 self.peer_heights.clone(),
695 self.metrics.clone(),
696 self.config.pinned_checkpoints.clone(),
697 self.config.checkpoint_header_download_concurrency(),
698 self.config.timeout(),
699 highest_known_checkpoint.unwrap(),
701 )
702 .map(|result| match result {
703 Ok(()) => {}
704 Err(e) => {
705 debug!("error syncing checkpoint {e}");
706 }
707 });
708 let task_handle = self.tasks.spawn(task);
709 self.sync_checkpoint_summaries_task = Some(task_handle);
710 }
711 }
712
713 fn maybe_trigger_checkpoint_contents_sync_task(
714 &mut self,
715 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
716 ) {
717 let highest_verified_checkpoint = self
718 .store
719 .get_highest_verified_checkpoint()
720 .expect("store operation should not fail");
721 let highest_synced_checkpoint = self
722 .store
723 .get_highest_synced_checkpoint()
724 .expect("store operation should not fail");
725
726 if highest_verified_checkpoint.sequence_number()
727 > highest_synced_checkpoint.sequence_number()
728 && self
730 .peer_heights
731 .read()
732 .unwrap()
733 .highest_known_checkpoint_sequence_number()
734 > Some(*highest_synced_checkpoint.sequence_number())
735 {
736 let _ = target_sequence_channel.send_if_modified(|num| {
737 let new_num = *highest_verified_checkpoint.sequence_number();
738 if *num == new_num {
739 return false;
740 }
741 *num = new_num;
742 true
743 });
744 }
745 }
746
747 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
748 let task = notify_peers_of_checkpoint(
749 self.network.clone(),
750 self.peer_heights.clone(),
751 checkpoint,
752 self.config.timeout(),
753 );
754 self.tasks.spawn(task);
755 }
756}
757
758async fn notify_peers_of_checkpoint(
759 network: anemo::Network,
760 peer_heights: Arc<RwLock<PeerHeights>>,
761 checkpoint: VerifiedCheckpoint,
762 timeout: Duration,
763) {
764 let futs = peer_heights
765 .read()
766 .unwrap()
767 .peers_on_same_chain()
768 .filter_map(|(peer_id, info)| {
770 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
771 })
772 .flat_map(|peer_id| network.peer(*peer_id))
774 .map(StateSyncClient::new)
775 .map(|mut client| {
776 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
777 async move { client.push_checkpoint_summary(request).await }
778 })
779 .collect::<Vec<_>>();
780 futures::future::join_all(futs).await;
781}
782
783async fn get_latest_from_peer(
784 our_genesis_checkpoint_digest: CheckpointDigest,
785 peer: anemo::Peer,
786 peer_heights: Arc<RwLock<PeerHeights>>,
787 timeout: Duration,
788) {
789 let peer_id = peer.peer_id();
790 let mut client = StateSyncClient::new(peer);
791
792 let info = {
793 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
794
795 if let Some(info) = maybe_info {
796 info
797 } else {
798 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
803 .with_timeout(timeout);
804 let response = client
805 .get_checkpoint_summary(request)
806 .await
807 .map(Response::into_inner);
808
809 let info = match response {
810 Ok(Some(checkpoint)) => {
811 let digest = *checkpoint.digest();
812 PeerStateSyncInfo {
813 genesis_checkpoint_digest: digest,
814 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
815 height: *checkpoint.sequence_number(),
816 lowest: CheckpointSequenceNumber::default(),
817 }
818 }
819 Ok(None) => PeerStateSyncInfo {
820 genesis_checkpoint_digest: CheckpointDigest::default(),
821 on_same_chain_as_us: false,
822 height: CheckpointSequenceNumber::default(),
823 lowest: CheckpointSequenceNumber::default(),
824 },
825 Err(status) => {
826 trace!("get_latest_checkpoint_summary request failed: {status:?}");
827 return;
828 }
829 };
830 peer_heights
831 .write()
832 .unwrap()
833 .insert_peer_info(peer_id, info);
834 info
835 }
836 };
837
838 if !info.on_same_chain_as_us {
840 trace!(?info, "Peer {peer_id} not on same chain as us");
841 return;
842 }
843 let Some((highest_checkpoint, low_watermark)) =
844 query_peer_for_latest_info(&mut client, timeout).await
845 else {
846 return;
847 };
848 peer_heights
849 .write()
850 .unwrap()
851 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
852}
853
854async fn query_peer_for_latest_info(
856 client: &mut StateSyncClient<anemo::Peer>,
857 timeout: Duration,
858) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
859 let request = Request::new(()).with_timeout(timeout);
860 let response = client
861 .get_checkpoint_availability(request)
862 .await
863 .map(Response::into_inner);
864 match response {
865 Ok(GetCheckpointAvailabilityResponse {
866 highest_synced_checkpoint,
867 lowest_available_checkpoint,
868 }) => {
869 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
870 }
871 Err(status) => {
872 if status.status() != anemo::types::response::StatusCode::NotFound {
874 trace!("get_checkpoint_availability request failed: {status:?}");
875 return None;
876 }
877 }
878 };
879
880 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
883 let response = client
884 .get_checkpoint_summary(request)
885 .await
886 .map(Response::into_inner);
887 match response {
888 Ok(Some(checkpoint)) => Some((checkpoint, None)),
889 Ok(None) => None,
890 Err(status) => {
891 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
892 None
893 }
894 }
895}
896
897#[instrument(level = "debug", skip_all)]
898async fn query_peers_for_their_latest_checkpoint(
899 network: anemo::Network,
900 peer_heights: Arc<RwLock<PeerHeights>>,
901 sender: mpsc::WeakSender<StateSyncMessage>,
902 timeout: Duration,
903) {
904 let peer_heights = &peer_heights;
905 let futs = peer_heights
906 .read()
907 .unwrap()
908 .peers_on_same_chain()
909 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
911 .map(|peer| {
912 let peer_id = peer.peer_id();
913 let mut client = StateSyncClient::new(peer);
914
915 async move {
916 let response = query_peer_for_latest_info(&mut client, timeout).await;
917 match response {
918 Some((highest_checkpoint, low_watermark)) => peer_heights
919 .write()
920 .unwrap()
921 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
922 .then_some(highest_checkpoint),
923 None => None,
924 }
925 }
926 })
927 .collect::<Vec<_>>();
928
929 debug!("Query {} peers for latest checkpoint", futs.len());
930
931 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
932
933 let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
934
935 let our_highest_checkpoint = peer_heights
936 .read()
937 .unwrap()
938 .highest_known_checkpoint()
939 .cloned();
940
941 debug!(
942 "Our highest checkpoint {:?}, peers highest checkpoint {:?}",
943 our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
944 highest_checkpoint.as_ref().map(|c| c.sequence_number())
945 );
946
947 let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
948 (Some(theirs), None) => theirs,
949 (Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
950 _ => return,
951 };
952
953 if let Some(sender) = sender.upgrade() {
954 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
955 }
956}
957
958async fn sync_to_checkpoint<S>(
959 network: anemo::Network,
960 store: S,
961 peer_heights: Arc<RwLock<PeerHeights>>,
962 metrics: Metrics,
963 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
964 checkpoint_header_download_concurrency: usize,
965 timeout: Duration,
966 checkpoint: Checkpoint,
967) -> Result<()>
968where
969 S: WriteStore,
970{
971 metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
972
973 let mut current = store
974 .get_highest_verified_checkpoint()
975 .expect("store operation should not fail");
976 if current.sequence_number() >= checkpoint.sequence_number() {
977 return Err(anyhow::anyhow!(
978 "target checkpoint {} is older than highest verified checkpoint {}",
979 checkpoint.sequence_number(),
980 current.sequence_number(),
981 ));
982 }
983
984 let peer_balancer = PeerBalancer::new(
985 &network,
986 peer_heights.clone(),
987 PeerCheckpointRequestType::Summary,
988 );
989 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
991 ..=*checkpoint.sequence_number())
992 .map(|next| {
993 let peers = peer_balancer.clone().with_checkpoint(next);
994 let peer_heights = peer_heights.clone();
995 let pinned_checkpoints = &pinned_checkpoints;
996 async move {
997 if let Some(checkpoint) = peer_heights
998 .read()
999 .unwrap()
1000 .get_checkpoint_by_sequence_number(next)
1001 {
1002 return (Some(checkpoint.to_owned()), next, None);
1003 }
1004
1005 for mut peer in peers {
1008 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1009 .with_timeout(timeout);
1010 if let Some(checkpoint) = peer
1011 .get_checkpoint_summary(request)
1012 .await
1013 .tap_err(|e| trace!("{e:?}"))
1014 .ok()
1015 .and_then(Response::into_inner)
1016 .tap_none(|| trace!("peer unable to help sync"))
1017 {
1018 if *checkpoint.sequence_number() != next {
1020 tracing::debug!(
1021 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1022 checkpoint.sequence_number()
1023 );
1024 continue;
1025 }
1026
1027 let checkpoint_digest = checkpoint.digest();
1029 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1030 checkpoint.sequence_number(),
1031 |(seq_num, _digest)| *seq_num
1032 )
1033 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1034 tracing::debug!(
1035 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1036 pinned_checkpoints[pinned_digest_index].1,
1037 checkpoint_digest
1038 );
1039 continue;
1040 }
1041
1042 peer_heights
1044 .write()
1045 .unwrap()
1046 .insert_checkpoint(checkpoint.clone());
1047 return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1048 }
1049 }
1050 (None, next, None)
1051 }
1052 })
1053 .pipe(futures::stream::iter)
1054 .buffered(checkpoint_header_download_concurrency);
1055
1056 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1057 assert_eq!(
1058 current
1059 .sequence_number()
1060 .checked_add(1)
1061 .expect("exhausted u64"),
1062 next
1063 );
1064
1065 let checkpoint = 'cp: {
1067 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1068 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1069 })?;
1070 if pinned_checkpoints
1072 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1073 .is_ok()
1074 {
1075 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1076 }
1077 match verify_checkpoint(¤t, &store, checkpoint) {
1078 Ok(verified_checkpoint) => verified_checkpoint,
1079 Err(checkpoint) => {
1080 let mut peer_heights = peer_heights.write().unwrap();
1081 peer_heights.remove_checkpoint(checkpoint.digest());
1084
1085 if let Some(peer_id) = maybe_peer_id {
1087 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1088 }
1089
1090 return Err(anyhow::anyhow!(
1091 "unable to verify checkpoint {checkpoint:?}"
1092 ));
1093 }
1094 }
1095 };
1096
1097 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1098 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1099 metrics.checkpoint_summary_age_metrics()
1100 {
1101 checkpoint.report_checkpoint_age(
1102 checkpoint_summary_age_metric,
1103 checkpoint_summary_age_metric_deprecated,
1104 );
1105 }
1106
1107 current = checkpoint.clone();
1108 store
1111 .insert_checkpoint(&checkpoint)
1112 .expect("store operation should not fail");
1113 }
1114
1115 peer_heights
1116 .write()
1117 .unwrap()
1118 .cleanup_old_checkpoints(*checkpoint.sequence_number());
1119
1120 Ok(())
1121}
1122
1123async fn sync_checkpoint_contents_from_archive<S>(
1124 network: anemo::Network,
1125 archive_config: Option<ArchiveReaderConfig>,
1126 store: S,
1127 peer_heights: Arc<RwLock<PeerHeights>>,
1128 metrics: Metrics,
1129) where
1130 S: WriteStore + Clone + Send + Sync + 'static,
1131{
1132 loop {
1133 sync_checkpoint_contents_from_archive_iteration(
1134 &network,
1135 &archive_config,
1136 store.clone(),
1137 peer_heights.clone(),
1138 metrics.clone(),
1139 )
1140 .await;
1141 tokio::time::sleep(Duration::from_secs(5)).await;
1142 }
1143}
1144
1145async fn sync_checkpoint_contents_from_archive_iteration<S>(
1146 network: &anemo::Network,
1147 archive_config: &Option<ArchiveReaderConfig>,
1148 store: S,
1149 peer_heights: Arc<RwLock<PeerHeights>>,
1150 metrics: Metrics,
1151) where
1152 S: WriteStore + Clone + Send + Sync + 'static,
1153{
1154 let peers: Vec<_> = peer_heights
1155 .read()
1156 .unwrap()
1157 .peers_on_same_chain()
1158 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1160 .collect();
1161 let lowest_checkpoint_on_peers = peers
1162 .iter()
1163 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1164 .min();
1165 let highest_synced = store
1166 .get_highest_synced_checkpoint()
1167 .expect("store operation should not fail")
1168 .sequence_number;
1169 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1170 highest_synced < lowest_checkpoint_on_peers
1171 } else {
1172 false
1173 };
1174 debug!(
1175 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1176 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1177 );
1178 if sync_from_archive {
1179 let start = highest_synced
1180 .checked_add(1)
1181 .expect("Checkpoint seq num overflow");
1182 let end = lowest_checkpoint_on_peers.unwrap();
1183
1184 let Some(archive_config) = archive_config else {
1185 warn!("Failed to find an archive reader to complete the state sync request");
1186 return;
1187 };
1188 let Some(ingestion_url) = &archive_config.ingestion_url else {
1189 warn!("Archival ingestion url for state sync is not configured");
1190 return;
1191 };
1192 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1193 warn!("{} can't be used as an archival fallback", ingestion_url);
1194 return;
1195 }
1196 let reader_options = ReaderOptions {
1197 batch_size: archive_config.download_concurrency.into(),
1198 upper_limit: Some(end),
1199 ..Default::default()
1200 };
1201 let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1202 StateSyncWorker(store, metrics),
1203 ingestion_url.clone(),
1204 archive_config.remote_store_options.clone(),
1205 start,
1206 1,
1207 Some(reader_options),
1208 )
1209 .await
1210 else {
1211 return;
1212 };
1213 match executor.await {
1214 Ok(_) => info!(
1215 "State sync from archive is complete. Checkpoints downloaded = {:?}",
1216 end - start
1217 ),
1218 Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1219 }
1220 }
1221}
1222
1223async fn sync_checkpoint_contents<S>(
1224 network: anemo::Network,
1225 store: S,
1226 peer_heights: Arc<RwLock<PeerHeights>>,
1227 sender: mpsc::WeakSender<StateSyncMessage>,
1228 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1229 checkpoint_content_download_concurrency: usize,
1230 checkpoint_content_download_tx_concurrency: u64,
1231 timeout: Duration,
1232 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1233) where
1234 S: WriteStore + Clone,
1235{
1236 let mut highest_synced = store
1237 .get_highest_synced_checkpoint()
1238 .expect("store operation should not fail");
1239
1240 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1241 let mut target_sequence_cursor = 0;
1242 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1243 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1244
1245 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1246
1247 loop {
1248 tokio::select! {
1249 result = target_sequence_channel.changed() => {
1250 match result {
1251 Ok(()) => {
1252 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1253 }
1254 Err(_) => {
1255 return
1257 }
1258 }
1259 },
1260 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1261 match maybe_checkpoint {
1262 Ok(checkpoint) => {
1263 let _: &VerifiedCheckpoint = &checkpoint; store
1266 .update_highest_synced_checkpoint(&checkpoint)
1267 .expect("store operation should not fail");
1268 let _ = checkpoint_event_sender.send(checkpoint.clone());
1270 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1271 highest_synced = checkpoint;
1272
1273 }
1274 Err(checkpoint) => {
1275 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1277 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1278 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1279 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1280 }
1281 } else {
1282 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1283
1284 }
1285 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1287 network.clone(),
1288 &store,
1289 peer_heights.clone(),
1290 timeout,
1291 checkpoint,
1292 ));
1293 }
1294 }
1295 },
1296 }
1297
1298 while current_sequence < target_sequence_cursor
1300 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1301 {
1302 let next_checkpoint = store
1303 .get_checkpoint_by_sequence_number(current_sequence)
1304 .unwrap_or_else(|| panic!(
1305 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1306 current_sequence
1307 ));
1308
1309 let tx_count = next_checkpoint.network_total_transactions
1311 - highest_started_network_total_transactions;
1312 if tx_count > tx_concurrency_remaining {
1313 break;
1314 }
1315 tx_concurrency_remaining -= tx_count;
1316
1317 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1318 current_sequence += 1;
1319 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1320 network.clone(),
1321 &store,
1322 peer_heights.clone(),
1323 timeout,
1324 next_checkpoint,
1325 ));
1326 }
1327
1328 if highest_synced
1329 .sequence_number()
1330 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1331 || checkpoint_contents_tasks.is_empty()
1332 {
1333 if let Some(sender) = sender.upgrade() {
1335 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1336 let _ = sender.send(message).await;
1337 }
1338 }
1339 }
1340}
1341
1342#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1343async fn sync_one_checkpoint_contents<S>(
1344 network: anemo::Network,
1345 store: S,
1346 peer_heights: Arc<RwLock<PeerHeights>>,
1347 timeout: Duration,
1348 checkpoint: VerifiedCheckpoint,
1349) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1350where
1351 S: WriteStore + Clone,
1352{
1353 debug!("syncing checkpoint contents");
1354
1355 if store
1358 .get_highest_synced_checkpoint()
1359 .expect("store operation should not fail")
1360 .sequence_number()
1361 >= checkpoint.sequence_number()
1362 {
1363 debug!("checkpoint was already created via consensus output");
1364 return Ok(checkpoint);
1365 }
1366
1367 let peers = PeerBalancer::new(
1369 &network,
1370 peer_heights.clone(),
1371 PeerCheckpointRequestType::Content,
1372 )
1373 .with_checkpoint(*checkpoint.sequence_number());
1374 let now = tokio::time::Instant::now();
1375 let Some(_contents) = get_full_checkpoint_contents(peers, &store, &checkpoint, timeout).await
1376 else {
1377 let duration = peer_heights
1379 .read()
1380 .unwrap()
1381 .wait_interval_when_no_peer_to_sync_content();
1382 if now.elapsed() < duration {
1383 let duration = duration - now.elapsed();
1384 info!("retrying checkpoint sync after {:?}", duration);
1385 tokio::time::sleep(duration).await;
1386 }
1387 return Err(checkpoint);
1388 };
1389 debug!("completed checkpoint contents sync");
1390 Ok(checkpoint)
1391}
1392
1393#[instrument(level = "debug", skip_all)]
1394async fn get_full_checkpoint_contents<S>(
1395 peers: PeerBalancer,
1396 store: S,
1397 checkpoint: &VerifiedCheckpoint,
1398 timeout: Duration,
1399) -> Option<FullCheckpointContents>
1400where
1401 S: WriteStore,
1402{
1403 let sequence_number = checkpoint.sequence_number;
1404 let digest = checkpoint.content_digest;
1405 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1406 debug!("store already contains checkpoint contents");
1407 return Some(contents);
1408 }
1409
1410 for mut peer in peers {
1413 debug!(
1414 ?timeout,
1415 "requesting checkpoint contents from {}",
1416 peer.inner().peer_id(),
1417 );
1418 let request = Request::new(digest).with_timeout(timeout);
1419 if let Some(contents) = peer
1420 .get_checkpoint_contents(request)
1421 .await
1422 .tap_err(|e| trace!("{e:?}"))
1423 .ok()
1424 .and_then(Response::into_inner)
1425 .tap_none(|| trace!("peer unable to help sync"))
1426 && contents.verify_digests(digest).is_ok()
1427 {
1428 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1429 store
1430 .insert_checkpoint_contents(checkpoint, verified_contents)
1431 .expect("store operation should not fail");
1432 return Some(contents);
1433 }
1434 }
1435 debug!("no peers had checkpoint contents");
1436 None
1437}
1438
1439async fn update_checkpoint_watermark_metrics<S>(
1440 mut recv: oneshot::Receiver<()>,
1441 store: S,
1442 metrics: Metrics,
1443) -> Result<()>
1444where
1445 S: WriteStore + Clone + Send + Sync,
1446{
1447 let mut interval = tokio::time::interval(Duration::from_secs(5));
1448 loop {
1449 tokio::select! {
1450 _now = interval.tick() => {
1451 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1452 .expect("store operation should not fail");
1453 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1454 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1455 .expect("store operation should not fail");
1456 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1457 },
1458 _ = &mut recv => break,
1459 }
1460 }
1461 Ok(())
1462}