1use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
51use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
52use rand::Rng;
53use std::{
54 collections::{HashMap, VecDeque},
55 sync::{Arc, RwLock},
56 time::Duration,
57};
58use sui_config::p2p::StateSyncConfig;
59use sui_types::{
60 committee::Committee,
61 digests::CheckpointDigest,
62 messages_checkpoint::{
63 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
64 VerifiedCheckpoint, VerifiedCheckpointContents, VersionedFullCheckpointContents,
65 },
66 storage::WriteStore,
67};
68use tap::{Pipe, TapFallible, TapOptional};
69use tokio::sync::oneshot;
70use tokio::{
71 sync::{broadcast, mpsc, watch},
72 task::{AbortHandle, JoinSet},
73};
74use tracing::{debug, info, instrument, trace, warn};
75
76mod generated {
77 include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
78}
79mod builder;
80mod metrics;
81mod server;
82#[cfg(test)]
83mod tests;
84mod worker;
85
86use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
87use crate::state_sync::worker::StateSyncWorker;
88pub use builder::{Builder, UnstartedStateSync};
89pub use generated::{
90 state_sync_client::StateSyncClient,
91 state_sync_server::{StateSync, StateSyncServer},
92};
93pub use server::GetCheckpointAvailabilityResponse;
94pub use server::GetCheckpointSummaryRequest;
95use sui_config::node::ArchiveReaderConfig;
96use sui_data_ingestion_core::{ReaderOptions, setup_single_workflow_with_options};
97use sui_storage::verify_checkpoint;
98
99#[derive(Clone, Debug)]
104pub struct Handle {
105 sender: mpsc::Sender<StateSyncMessage>,
106 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
107}
108
109impl Handle {
110 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
119 self.sender
120 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
121 .await
122 .unwrap()
123 }
124
125 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
127 self.checkpoint_event_sender.subscribe()
128 }
129}
130
131struct PeerHeights {
132 peers: HashMap<PeerId, PeerStateSyncInfo>,
134 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
135 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
136
137 wait_interval_when_no_peer_to_sync_content: Duration,
139}
140
141#[derive(Copy, Clone, Debug, PartialEq, Eq)]
142struct PeerStateSyncInfo {
143 genesis_checkpoint_digest: CheckpointDigest,
145 on_same_chain_as_us: bool,
147 height: CheckpointSequenceNumber,
149 lowest: CheckpointSequenceNumber,
152}
153
154impl PeerHeights {
155 pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
156 self.highest_known_checkpoint_sequence_number()
157 .and_then(|s| self.sequence_number_to_digest.get(&s))
158 .and_then(|digest| self.unprocessed_checkpoints.get(digest))
159 }
160
161 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
162 self.peers
163 .values()
164 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
165 .max()
166 }
167
168 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
169 self.peers
170 .iter()
171 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
172 }
173
174 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
179 pub fn update_peer_info(
180 &mut self,
181 peer_id: PeerId,
182 checkpoint: Checkpoint,
183 low_watermark: Option<CheckpointSequenceNumber>,
184 ) -> bool {
185 debug!("Update peer info");
186
187 let info = match self.peers.get_mut(&peer_id) {
188 Some(info) if info.on_same_chain_as_us => info,
189 _ => return false,
190 };
191
192 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
193 if let Some(low_watermark) = low_watermark {
194 info.lowest = low_watermark;
195 }
196 self.insert_checkpoint(checkpoint);
197
198 true
199 }
200
201 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
202 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
203 use std::collections::hash_map::Entry;
204 debug!("Insert peer info");
205
206 match self.peers.entry(peer_id) {
207 Entry::Occupied(mut entry) => {
208 let entry = entry.get_mut();
211 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
212 entry.height = std::cmp::max(entry.height, info.height);
213 } else {
214 *entry = info;
215 }
216 }
217 Entry::Vacant(entry) => {
218 entry.insert(info);
219 }
220 }
221 }
222
223 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
224 if let Some(info) = self.peers.get_mut(&peer_id) {
225 info.on_same_chain_as_us = false;
226 }
227 }
228
229 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
230 self.unprocessed_checkpoints
231 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
232 self.sequence_number_to_digest
233 .retain(|&s, _digest| s > sequence_number);
234 }
235
236 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
238 let digest = *checkpoint.digest();
239 let sequence_number = *checkpoint.sequence_number();
240 self.unprocessed_checkpoints.insert(digest, checkpoint);
241 self.sequence_number_to_digest
242 .insert(sequence_number, digest);
243 }
244
245 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
246 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
247 self.sequence_number_to_digest
248 .remove(checkpoint.sequence_number());
249 }
250 }
251
252 pub fn get_checkpoint_by_sequence_number(
253 &self,
254 sequence_number: CheckpointSequenceNumber,
255 ) -> Option<&Checkpoint> {
256 self.sequence_number_to_digest
257 .get(&sequence_number)
258 .and_then(|digest| self.get_checkpoint_by_digest(digest))
259 }
260
261 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
262 self.unprocessed_checkpoints.get(digest)
263 }
264
265 #[cfg(test)]
266 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
267 self.wait_interval_when_no_peer_to_sync_content = duration;
268 }
269
270 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
271 self.wait_interval_when_no_peer_to_sync_content
272 }
273}
274
275#[derive(Clone)]
277struct PeerBalancer {
278 peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
279 requested_checkpoint: Option<CheckpointSequenceNumber>,
280 request_type: PeerCheckpointRequestType,
281}
282
283#[derive(Clone)]
284enum PeerCheckpointRequestType {
285 Summary,
286 Content,
287}
288
289impl PeerBalancer {
290 pub fn new(
291 network: &anemo::Network,
292 peer_heights: Arc<RwLock<PeerHeights>>,
293 request_type: PeerCheckpointRequestType,
294 ) -> Self {
295 let mut peers: Vec<_> = peer_heights
296 .read()
297 .unwrap()
298 .peers_on_same_chain()
299 .filter_map(|(peer_id, info)| {
301 network
302 .peer(*peer_id)
303 .map(|peer| (peer.connection_rtt(), peer, *info))
304 })
305 .collect();
306 peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
307 Self {
308 peers: peers
309 .into_iter()
310 .map(|(_, peer, info)| (peer, info))
311 .collect(),
312 requested_checkpoint: None,
313 request_type,
314 }
315 }
316
317 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
318 self.requested_checkpoint = Some(checkpoint);
319 self
320 }
321}
322
323impl Iterator for PeerBalancer {
324 type Item = StateSyncClient<anemo::Peer>;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 while !self.peers.is_empty() {
328 const SELECTION_WINDOW: usize = 2;
329 let idx =
330 rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
331 let (peer, info) = self.peers.remove(idx).unwrap();
332 let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
333 match &self.request_type {
334 PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
336 return Some(StateSyncClient::new(peer));
337 }
338 PeerCheckpointRequestType::Content
339 if info.height >= requested_checkpoint
340 && info.lowest <= requested_checkpoint =>
341 {
342 return Some(StateSyncClient::new(peer));
343 }
344 _ => {}
345 }
346 }
347 None
348 }
349}
350
351#[derive(Clone, Debug)]
352enum StateSyncMessage {
353 StartSyncJob,
354 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
357 SyncedCheckpoint(Box<VerifiedCheckpoint>),
361}
362
363struct StateSyncEventLoop<S> {
364 config: StateSyncConfig,
365
366 mailbox: mpsc::Receiver<StateSyncMessage>,
367 weak_sender: mpsc::WeakSender<StateSyncMessage>,
369
370 tasks: JoinSet<()>,
371 sync_checkpoint_summaries_task: Option<AbortHandle>,
372 sync_checkpoint_contents_task: Option<AbortHandle>,
373 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
374
375 store: S,
376 peer_heights: Arc<RwLock<PeerHeights>>,
377 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
378 network: anemo::Network,
379 metrics: Metrics,
380
381 sync_checkpoint_from_archive_task: Option<AbortHandle>,
382 archive_config: Option<ArchiveReaderConfig>,
383}
384
385impl<S> StateSyncEventLoop<S>
386where
387 S: WriteStore + Clone + Send + Sync + 'static,
388{
389 pub async fn start(mut self) {
394 info!("State-Synchronizer started");
395
396 self.config.pinned_checkpoints.sort();
397
398 let mut interval = tokio::time::interval(self.config.interval_period());
399 let mut peer_events = {
400 let (subscriber, peers) = self.network.subscribe().unwrap();
401 for peer_id in peers {
402 self.spawn_get_latest_from_peer(peer_id);
403 }
404 subscriber
405 };
406 let (
407 target_checkpoint_contents_sequence_sender,
408 target_checkpoint_contents_sequence_receiver,
409 ) = watch::channel(0);
410
411 let (_sender, receiver) = oneshot::channel();
413 tokio::spawn(update_checkpoint_watermark_metrics(
414 receiver,
415 self.store.clone(),
416 self.metrics.clone(),
417 ));
418
419 let task = sync_checkpoint_contents(
421 self.network.clone(),
422 self.store.clone(),
423 self.peer_heights.clone(),
424 self.weak_sender.clone(),
425 self.checkpoint_event_sender.clone(),
426 self.config.checkpoint_content_download_concurrency(),
427 self.config.checkpoint_content_download_tx_concurrency(),
428 self.config.use_get_checkpoint_contents_v2(),
429 self.config.checkpoint_content_timeout(),
430 target_checkpoint_contents_sequence_receiver,
431 );
432 let task_handle = self.tasks.spawn(task);
433 self.sync_checkpoint_contents_task = Some(task_handle);
434
435 let task = sync_checkpoint_contents_from_archive(
443 self.network.clone(),
444 self.archive_config.clone(),
445 self.store.clone(),
446 self.peer_heights.clone(),
447 self.metrics.clone(),
448 );
449 let task_handle = self.tasks.spawn(task);
450 self.sync_checkpoint_from_archive_task = Some(task_handle);
451
452 loop {
454 tokio::select! {
455 now = interval.tick() => {
456 self.handle_tick(now.into_std());
457 },
458 maybe_message = self.mailbox.recv() => {
459 if let Some(message) = maybe_message {
462 self.handle_message(message);
463 } else {
464 break;
465 }
466 },
467 peer_event = peer_events.recv() => {
468 self.handle_peer_event(peer_event);
469 },
470 Some(task_result) = self.tasks.join_next() => {
471 match task_result {
472 Ok(()) => {},
473 Err(e) => {
474 if e.is_cancelled() {
475 } else if e.is_panic() {
477 std::panic::resume_unwind(e.into_panic());
479 } else {
480 panic!("task failed: {e}");
481 }
482 },
483 };
484
485 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
486 panic!("sync_checkpoint_contents task unexpectedly terminated")
487 }
488
489 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
490 self.sync_checkpoint_summaries_task = None;
491 }
492
493 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
494 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
495 }
496 },
497 }
498
499 self.maybe_start_checkpoint_summary_sync_task();
500 self.maybe_trigger_checkpoint_contents_sync_task(
501 &target_checkpoint_contents_sequence_sender,
502 );
503 }
504
505 info!("State-Synchronizer ended");
506 }
507
508 fn handle_message(&mut self, message: StateSyncMessage) {
509 debug!("Received message: {:?}", message);
510 match message {
511 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
512 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
513 self.handle_checkpoint_from_consensus(checkpoint)
514 }
515 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
517 self.spawn_notify_peers_of_checkpoint(*checkpoint)
518 }
519 }
520 }
521
522 #[instrument(level = "debug", skip_all)]
524 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
525 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
528 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
529 .digest();
530 if checkpoint.previous_digest != Some(prev_digest) {
531 panic!(
532 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
533 checkpoint.sequence_number(),
534 Some(prev_digest),
535 checkpoint.previous_digest
536 );
537 }
538
539 let latest_checkpoint = self
540 .store
541 .get_highest_verified_checkpoint()
542 .expect("store operation should not fail");
543
544 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
546 return;
547 }
548
549 let checkpoint = *checkpoint;
550 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
551 if *checkpoint.sequence_number() > next_sequence_number {
552 debug!(
553 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
554 next_sequence_number,
555 checkpoint.sequence_number()
556 );
557 }
558
559 #[cfg(debug_assertions)]
562 {
563 let _ = (next_sequence_number..=*checkpoint.sequence_number())
564 .map(|n| {
565 let checkpoint = self
566 .store
567 .get_checkpoint_by_sequence_number(n)
568 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
569 self.store
570 .get_full_checkpoint_contents(Some(n), &checkpoint.content_digest)
571 .unwrap_or_else(|| {
572 panic!(
573 "store should contain checkpoint contents for {:?}",
574 checkpoint.content_digest
575 )
576 });
577 })
578 .collect::<Vec<_>>();
579 }
580
581 if let Some(EndOfEpochData {
588 next_epoch_committee,
589 ..
590 }) = checkpoint.end_of_epoch_data.as_ref()
591 {
592 let next_committee = next_epoch_committee.iter().cloned().collect();
593 let committee =
594 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
595 self.store
596 .insert_committee(committee)
597 .expect("store operation should not fail");
598 }
599
600 self.store
601 .update_highest_verified_checkpoint(&checkpoint)
602 .expect("store operation should not fail");
603 self.store
604 .update_highest_synced_checkpoint(&checkpoint)
605 .expect("store operation should not fail");
606
607 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
609
610 self.spawn_notify_peers_of_checkpoint(checkpoint);
611 }
612
613 fn handle_peer_event(
614 &mut self,
615 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
616 ) {
617 use tokio::sync::broadcast::error::RecvError;
618
619 match peer_event {
620 Ok(PeerEvent::NewPeer(peer_id)) => {
621 self.spawn_get_latest_from_peer(peer_id);
622 }
623 Ok(PeerEvent::LostPeer(peer_id, _)) => {
624 self.peer_heights.write().unwrap().peers.remove(&peer_id);
625 }
626
627 Err(RecvError::Closed) => {
628 panic!("PeerEvent channel shouldn't be able to be closed");
629 }
630
631 Err(RecvError::Lagged(_)) => {
632 trace!("State-Sync fell behind processing PeerEvents");
633 }
634 }
635 }
636
637 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
638 if let Some(peer) = self.network.peer(peer_id) {
639 let genesis_checkpoint_digest = *self
640 .store
641 .get_checkpoint_by_sequence_number(0)
642 .expect("store should contain genesis checkpoint")
643 .digest();
644 let task = get_latest_from_peer(
645 genesis_checkpoint_digest,
646 peer,
647 self.peer_heights.clone(),
648 self.config.timeout(),
649 );
650 self.tasks.spawn(task);
651 }
652 }
653
654 fn handle_tick(&mut self, _now: std::time::Instant) {
655 let task = query_peers_for_their_latest_checkpoint(
656 self.network.clone(),
657 self.peer_heights.clone(),
658 self.weak_sender.clone(),
659 self.config.timeout(),
660 );
661 self.tasks.spawn(task);
662
663 if let Some(layer) = self.download_limit_layer.as_ref() {
664 layer.maybe_prune_map();
665 }
666 }
667
668 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
669 if self.sync_checkpoint_summaries_task.is_some() {
671 return;
672 }
673
674 let highest_processed_checkpoint = self
675 .store
676 .get_highest_verified_checkpoint()
677 .expect("store operation should not fail");
678
679 let highest_known_checkpoint = self
680 .peer_heights
681 .read()
682 .unwrap()
683 .highest_known_checkpoint()
684 .cloned();
685
686 if Some(highest_processed_checkpoint.sequence_number())
687 < highest_known_checkpoint
688 .as_ref()
689 .map(|x| x.sequence_number())
690 {
691 let task = sync_to_checkpoint(
693 self.network.clone(),
694 self.store.clone(),
695 self.peer_heights.clone(),
696 self.metrics.clone(),
697 self.config.pinned_checkpoints.clone(),
698 self.config.checkpoint_header_download_concurrency(),
699 self.config.timeout(),
700 highest_known_checkpoint.unwrap(),
702 )
703 .map(|result| match result {
704 Ok(()) => {}
705 Err(e) => {
706 debug!("error syncing checkpoint {e}");
707 }
708 });
709 let task_handle = self.tasks.spawn(task);
710 self.sync_checkpoint_summaries_task = Some(task_handle);
711 }
712 }
713
714 fn maybe_trigger_checkpoint_contents_sync_task(
715 &mut self,
716 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
717 ) {
718 let highest_verified_checkpoint = self
719 .store
720 .get_highest_verified_checkpoint()
721 .expect("store operation should not fail");
722 let highest_synced_checkpoint = self
723 .store
724 .get_highest_synced_checkpoint()
725 .expect("store operation should not fail");
726
727 if highest_verified_checkpoint.sequence_number()
728 > highest_synced_checkpoint.sequence_number()
729 && self
731 .peer_heights
732 .read()
733 .unwrap()
734 .highest_known_checkpoint_sequence_number()
735 > Some(*highest_synced_checkpoint.sequence_number())
736 {
737 let _ = target_sequence_channel.send_if_modified(|num| {
738 let new_num = *highest_verified_checkpoint.sequence_number();
739 if *num == new_num {
740 return false;
741 }
742 *num = new_num;
743 true
744 });
745 }
746 }
747
748 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
749 let task = notify_peers_of_checkpoint(
750 self.network.clone(),
751 self.peer_heights.clone(),
752 checkpoint,
753 self.config.timeout(),
754 );
755 self.tasks.spawn(task);
756 }
757}
758
759async fn notify_peers_of_checkpoint(
760 network: anemo::Network,
761 peer_heights: Arc<RwLock<PeerHeights>>,
762 checkpoint: VerifiedCheckpoint,
763 timeout: Duration,
764) {
765 let futs = peer_heights
766 .read()
767 .unwrap()
768 .peers_on_same_chain()
769 .filter_map(|(peer_id, info)| {
771 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
772 })
773 .flat_map(|peer_id| network.peer(*peer_id))
775 .map(StateSyncClient::new)
776 .map(|mut client| {
777 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
778 async move { client.push_checkpoint_summary(request).await }
779 })
780 .collect::<Vec<_>>();
781 futures::future::join_all(futs).await;
782}
783
784async fn get_latest_from_peer(
785 our_genesis_checkpoint_digest: CheckpointDigest,
786 peer: anemo::Peer,
787 peer_heights: Arc<RwLock<PeerHeights>>,
788 timeout: Duration,
789) {
790 let peer_id = peer.peer_id();
791 let mut client = StateSyncClient::new(peer);
792
793 let info = {
794 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
795
796 if let Some(info) = maybe_info {
797 info
798 } else {
799 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
804 .with_timeout(timeout);
805 let response = client
806 .get_checkpoint_summary(request)
807 .await
808 .map(Response::into_inner);
809
810 let info = match response {
811 Ok(Some(checkpoint)) => {
812 let digest = *checkpoint.digest();
813 PeerStateSyncInfo {
814 genesis_checkpoint_digest: digest,
815 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
816 height: *checkpoint.sequence_number(),
817 lowest: CheckpointSequenceNumber::default(),
818 }
819 }
820 Ok(None) => PeerStateSyncInfo {
821 genesis_checkpoint_digest: CheckpointDigest::default(),
822 on_same_chain_as_us: false,
823 height: CheckpointSequenceNumber::default(),
824 lowest: CheckpointSequenceNumber::default(),
825 },
826 Err(status) => {
827 trace!("get_latest_checkpoint_summary request failed: {status:?}");
828 return;
829 }
830 };
831 peer_heights
832 .write()
833 .unwrap()
834 .insert_peer_info(peer_id, info);
835 info
836 }
837 };
838
839 if !info.on_same_chain_as_us {
841 trace!(?info, "Peer {peer_id} not on same chain as us");
842 return;
843 }
844 let Some((highest_checkpoint, low_watermark)) =
845 query_peer_for_latest_info(&mut client, timeout).await
846 else {
847 return;
848 };
849 peer_heights
850 .write()
851 .unwrap()
852 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
853}
854
855async fn query_peer_for_latest_info(
857 client: &mut StateSyncClient<anemo::Peer>,
858 timeout: Duration,
859) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
860 let request = Request::new(()).with_timeout(timeout);
861 let response = client
862 .get_checkpoint_availability(request)
863 .await
864 .map(Response::into_inner);
865 match response {
866 Ok(GetCheckpointAvailabilityResponse {
867 highest_synced_checkpoint,
868 lowest_available_checkpoint,
869 }) => {
870 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
871 }
872 Err(status) => {
873 if status.status() != anemo::types::response::StatusCode::NotFound {
875 trace!("get_checkpoint_availability request failed: {status:?}");
876 return None;
877 }
878 }
879 };
880
881 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
884 let response = client
885 .get_checkpoint_summary(request)
886 .await
887 .map(Response::into_inner);
888 match response {
889 Ok(Some(checkpoint)) => Some((checkpoint, None)),
890 Ok(None) => None,
891 Err(status) => {
892 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
893 None
894 }
895 }
896}
897
898#[instrument(level = "debug", skip_all)]
899async fn query_peers_for_their_latest_checkpoint(
900 network: anemo::Network,
901 peer_heights: Arc<RwLock<PeerHeights>>,
902 sender: mpsc::WeakSender<StateSyncMessage>,
903 timeout: Duration,
904) {
905 let peer_heights = &peer_heights;
906 let futs = peer_heights
907 .read()
908 .unwrap()
909 .peers_on_same_chain()
910 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
912 .map(|peer| {
913 let peer_id = peer.peer_id();
914 let mut client = StateSyncClient::new(peer);
915
916 async move {
917 let response = query_peer_for_latest_info(&mut client, timeout).await;
918 match response {
919 Some((highest_checkpoint, low_watermark)) => peer_heights
920 .write()
921 .unwrap()
922 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
923 .then_some(highest_checkpoint),
924 None => None,
925 }
926 }
927 })
928 .collect::<Vec<_>>();
929
930 debug!("Query {} peers for latest checkpoint", futs.len());
931
932 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
933
934 let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
935
936 let our_highest_checkpoint = peer_heights
937 .read()
938 .unwrap()
939 .highest_known_checkpoint()
940 .cloned();
941
942 debug!(
943 "Our highest checkpoint {:?}, peers highest checkpoint {:?}",
944 our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
945 highest_checkpoint.as_ref().map(|c| c.sequence_number())
946 );
947
948 let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
949 (Some(theirs), None) => theirs,
950 (Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
951 _ => return,
952 };
953
954 if let Some(sender) = sender.upgrade() {
955 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
956 }
957}
958
959async fn sync_to_checkpoint<S>(
960 network: anemo::Network,
961 store: S,
962 peer_heights: Arc<RwLock<PeerHeights>>,
963 metrics: Metrics,
964 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
965 checkpoint_header_download_concurrency: usize,
966 timeout: Duration,
967 checkpoint: Checkpoint,
968) -> Result<()>
969where
970 S: WriteStore,
971{
972 metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
973
974 let mut current = store
975 .get_highest_verified_checkpoint()
976 .expect("store operation should not fail");
977 if current.sequence_number() >= checkpoint.sequence_number() {
978 return Err(anyhow::anyhow!(
979 "target checkpoint {} is older than highest verified checkpoint {}",
980 checkpoint.sequence_number(),
981 current.sequence_number(),
982 ));
983 }
984
985 let peer_balancer = PeerBalancer::new(
986 &network,
987 peer_heights.clone(),
988 PeerCheckpointRequestType::Summary,
989 );
990 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
992 ..=*checkpoint.sequence_number())
993 .map(|next| {
994 let peers = peer_balancer.clone().with_checkpoint(next);
995 let peer_heights = peer_heights.clone();
996 let pinned_checkpoints = &pinned_checkpoints;
997 async move {
998 if let Some(checkpoint) = peer_heights
999 .read()
1000 .unwrap()
1001 .get_checkpoint_by_sequence_number(next)
1002 {
1003 return (Some(checkpoint.to_owned()), next, None);
1004 }
1005
1006 for mut peer in peers {
1009 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1010 .with_timeout(timeout);
1011 if let Some(checkpoint) = peer
1012 .get_checkpoint_summary(request)
1013 .await
1014 .tap_err(|e| trace!("{e:?}"))
1015 .ok()
1016 .and_then(Response::into_inner)
1017 .tap_none(|| trace!("peer unable to help sync"))
1018 {
1019 if *checkpoint.sequence_number() != next {
1021 tracing::debug!(
1022 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1023 checkpoint.sequence_number()
1024 );
1025 continue;
1026 }
1027
1028 let checkpoint_digest = checkpoint.digest();
1030 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1031 checkpoint.sequence_number(),
1032 |(seq_num, _digest)| *seq_num
1033 )
1034 && pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1035 tracing::debug!(
1036 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1037 pinned_checkpoints[pinned_digest_index].1,
1038 checkpoint_digest
1039 );
1040 continue;
1041 }
1042
1043 peer_heights
1045 .write()
1046 .unwrap()
1047 .insert_checkpoint(checkpoint.clone());
1048 return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1049 }
1050 }
1051 (None, next, None)
1052 }
1053 })
1054 .pipe(futures::stream::iter)
1055 .buffered(checkpoint_header_download_concurrency);
1056
1057 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1058 assert_eq!(
1059 current
1060 .sequence_number()
1061 .checked_add(1)
1062 .expect("exhausted u64"),
1063 next
1064 );
1065
1066 let checkpoint = 'cp: {
1068 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1069 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1070 })?;
1071 if pinned_checkpoints
1073 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1074 .is_ok()
1075 {
1076 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1077 }
1078 match verify_checkpoint(¤t, &store, checkpoint) {
1079 Ok(verified_checkpoint) => verified_checkpoint,
1080 Err(checkpoint) => {
1081 let mut peer_heights = peer_heights.write().unwrap();
1082 peer_heights.remove_checkpoint(checkpoint.digest());
1085
1086 if let Some(peer_id) = maybe_peer_id {
1088 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1089 }
1090
1091 return Err(anyhow::anyhow!(
1092 "unable to verify checkpoint {checkpoint:?}"
1093 ));
1094 }
1095 }
1096 };
1097
1098 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1099 if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
1100 metrics.checkpoint_summary_age_metrics()
1101 {
1102 checkpoint.report_checkpoint_age(
1103 checkpoint_summary_age_metric,
1104 checkpoint_summary_age_metric_deprecated,
1105 );
1106 }
1107
1108 current = checkpoint.clone();
1109 store
1112 .insert_checkpoint(&checkpoint)
1113 .expect("store operation should not fail");
1114 }
1115
1116 peer_heights
1117 .write()
1118 .unwrap()
1119 .cleanup_old_checkpoints(*checkpoint.sequence_number());
1120
1121 Ok(())
1122}
1123
1124async fn sync_checkpoint_contents_from_archive<S>(
1125 network: anemo::Network,
1126 archive_config: Option<ArchiveReaderConfig>,
1127 store: S,
1128 peer_heights: Arc<RwLock<PeerHeights>>,
1129 metrics: Metrics,
1130) where
1131 S: WriteStore + Clone + Send + Sync + 'static,
1132{
1133 loop {
1134 sync_checkpoint_contents_from_archive_iteration(
1135 &network,
1136 &archive_config,
1137 store.clone(),
1138 peer_heights.clone(),
1139 metrics.clone(),
1140 )
1141 .await;
1142 tokio::time::sleep(Duration::from_secs(5)).await;
1143 }
1144}
1145
1146async fn sync_checkpoint_contents_from_archive_iteration<S>(
1147 network: &anemo::Network,
1148 archive_config: &Option<ArchiveReaderConfig>,
1149 store: S,
1150 peer_heights: Arc<RwLock<PeerHeights>>,
1151 metrics: Metrics,
1152) where
1153 S: WriteStore + Clone + Send + Sync + 'static,
1154{
1155 let peers: Vec<_> = peer_heights
1156 .read()
1157 .unwrap()
1158 .peers_on_same_chain()
1159 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1161 .collect();
1162 let lowest_checkpoint_on_peers = peers
1163 .iter()
1164 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1165 .min();
1166 let highest_synced = store
1167 .get_highest_synced_checkpoint()
1168 .expect("store operation should not fail")
1169 .sequence_number;
1170 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers {
1171 highest_synced < lowest_checkpoint_on_peers
1172 } else {
1173 false
1174 };
1175 debug!(
1176 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1177 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1178 );
1179 if sync_from_archive {
1180 let start = highest_synced
1181 .checked_add(1)
1182 .expect("Checkpoint seq num overflow");
1183 let end = lowest_checkpoint_on_peers.unwrap();
1184
1185 let Some(archive_config) = archive_config else {
1186 warn!("Failed to find an archive reader to complete the state sync request");
1187 return;
1188 };
1189 let Some(ingestion_url) = &archive_config.ingestion_url else {
1190 warn!("Archival ingestion url for state sync is not configured");
1191 return;
1192 };
1193 if ingestion_url.contains("checkpoints.mainnet.sui.io") {
1194 warn!("{} can't be used as an archival fallback", ingestion_url);
1195 return;
1196 }
1197 let reader_options = ReaderOptions {
1198 batch_size: archive_config.download_concurrency.into(),
1199 upper_limit: Some(end),
1200 ..Default::default()
1201 };
1202 let Ok((executor, _exit_sender)) = setup_single_workflow_with_options(
1203 StateSyncWorker(store, metrics),
1204 ingestion_url.clone(),
1205 archive_config.remote_store_options.clone(),
1206 start,
1207 1,
1208 Some(reader_options),
1209 )
1210 .await
1211 else {
1212 return;
1213 };
1214 match executor.await {
1215 Ok(_) => info!(
1216 "State sync from archive is complete. Checkpoints downloaded = {:?}",
1217 end - start
1218 ),
1219 Err(err) => warn!("State sync from archive failed with error: {:?}", err),
1220 }
1221 }
1222}
1223
1224async fn sync_checkpoint_contents<S>(
1225 network: anemo::Network,
1226 store: S,
1227 peer_heights: Arc<RwLock<PeerHeights>>,
1228 sender: mpsc::WeakSender<StateSyncMessage>,
1229 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1230 checkpoint_content_download_concurrency: usize,
1231 checkpoint_content_download_tx_concurrency: u64,
1232 use_get_checkpoint_contents_v2: bool,
1233 timeout: Duration,
1234 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1235) where
1236 S: WriteStore + Clone,
1237{
1238 let mut highest_synced = store
1239 .get_highest_synced_checkpoint()
1240 .expect("store operation should not fail");
1241
1242 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1243 let mut target_sequence_cursor = 0;
1244 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1245 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1246
1247 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1248
1249 loop {
1250 tokio::select! {
1251 result = target_sequence_channel.changed() => {
1252 match result {
1253 Ok(()) => {
1254 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1255 }
1256 Err(_) => {
1257 return
1259 }
1260 }
1261 },
1262 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1263 match maybe_checkpoint {
1264 Ok(checkpoint) => {
1265 let _: &VerifiedCheckpoint = &checkpoint; store
1268 .update_highest_synced_checkpoint(&checkpoint)
1269 .expect("store operation should not fail");
1270 let _ = checkpoint_event_sender.send(checkpoint.clone());
1272 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1273 highest_synced = checkpoint;
1274
1275 }
1276 Err(checkpoint) => {
1277 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1279 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1280 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1281 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1282 }
1283 } else {
1284 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1285
1286 }
1287 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1289 network.clone(),
1290 &store,
1291 peer_heights.clone(),
1292 use_get_checkpoint_contents_v2,
1293 timeout,
1294 checkpoint,
1295 ));
1296 }
1297 }
1298 },
1299 }
1300
1301 while current_sequence < target_sequence_cursor
1303 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1304 {
1305 let next_checkpoint = store
1306 .get_checkpoint_by_sequence_number(current_sequence)
1307 .unwrap_or_else(|| panic!(
1308 "BUG: store should have all checkpoints older than highest_verified_checkpoint (checkpoint {})",
1309 current_sequence
1310 ));
1311
1312 let tx_count = next_checkpoint.network_total_transactions
1314 - highest_started_network_total_transactions;
1315 if tx_count > tx_concurrency_remaining {
1316 break;
1317 }
1318 tx_concurrency_remaining -= tx_count;
1319
1320 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1321 current_sequence += 1;
1322 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1323 network.clone(),
1324 &store,
1325 peer_heights.clone(),
1326 use_get_checkpoint_contents_v2,
1327 timeout,
1328 next_checkpoint,
1329 ));
1330 }
1331
1332 if highest_synced
1333 .sequence_number()
1334 .is_multiple_of(checkpoint_content_download_concurrency as u64)
1335 || checkpoint_contents_tasks.is_empty()
1336 {
1337 if let Some(sender) = sender.upgrade() {
1339 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1340 let _ = sender.send(message).await;
1341 }
1342 }
1343 }
1344}
1345
1346#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1347async fn sync_one_checkpoint_contents<S>(
1348 network: anemo::Network,
1349 store: S,
1350 peer_heights: Arc<RwLock<PeerHeights>>,
1351 use_get_checkpoint_contents_v2: bool,
1352 timeout: Duration,
1353 checkpoint: VerifiedCheckpoint,
1354) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1355where
1356 S: WriteStore + Clone,
1357{
1358 debug!("syncing checkpoint contents");
1359
1360 if store
1363 .get_highest_synced_checkpoint()
1364 .expect("store operation should not fail")
1365 .sequence_number()
1366 >= checkpoint.sequence_number()
1367 {
1368 debug!("checkpoint was already created via consensus output");
1369 return Ok(checkpoint);
1370 }
1371
1372 let peers = PeerBalancer::new(
1374 &network,
1375 peer_heights.clone(),
1376 PeerCheckpointRequestType::Content,
1377 )
1378 .with_checkpoint(*checkpoint.sequence_number());
1379 let now = tokio::time::Instant::now();
1380 let Some(_contents) = get_full_checkpoint_contents(
1381 peers,
1382 &store,
1383 &checkpoint,
1384 use_get_checkpoint_contents_v2,
1385 timeout,
1386 )
1387 .await
1388 else {
1389 let duration = peer_heights
1391 .read()
1392 .unwrap()
1393 .wait_interval_when_no_peer_to_sync_content();
1394 if now.elapsed() < duration {
1395 let duration = duration - now.elapsed();
1396 info!("retrying checkpoint sync after {:?}", duration);
1397 tokio::time::sleep(duration).await;
1398 }
1399 return Err(checkpoint);
1400 };
1401 debug!("completed checkpoint contents sync");
1402 Ok(checkpoint)
1403}
1404
1405#[instrument(level = "debug", skip_all)]
1406async fn get_full_checkpoint_contents<S>(
1407 peers: PeerBalancer,
1408 store: S,
1409 checkpoint: &VerifiedCheckpoint,
1410 use_get_checkpoint_contents_v2: bool,
1411 timeout: Duration,
1412) -> Option<VersionedFullCheckpointContents>
1413where
1414 S: WriteStore,
1415{
1416 let sequence_number = checkpoint.sequence_number;
1417 let digest = checkpoint.content_digest;
1418 if let Some(contents) = store.get_full_checkpoint_contents(Some(sequence_number), &digest) {
1419 debug!("store already contains checkpoint contents");
1420 return Some(contents);
1421 }
1422
1423 for mut peer in peers {
1426 debug!(
1427 ?timeout,
1428 "requesting checkpoint contents from {}",
1429 peer.inner().peer_id(),
1430 );
1431 let request = Request::new(digest).with_timeout(timeout);
1432 let contents = if use_get_checkpoint_contents_v2 {
1433 peer.get_checkpoint_contents_v2(request)
1434 .await
1435 .tap_err(|e| trace!("{e:?}"))
1436 .ok()
1437 .and_then(Response::into_inner)
1438 } else {
1439 peer.get_checkpoint_contents(request)
1440 .await
1441 .tap_err(|e| trace!("{e:?}"))
1442 .ok()
1443 .and_then(Response::into_inner)
1444 .map(VersionedFullCheckpointContents::V1)
1445 };
1446 if let Some(contents) = contents.tap_none(|| trace!("peer unable to help sync"))
1447 && contents.verify_digests(digest).is_ok()
1448 {
1449 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1450 store
1451 .insert_checkpoint_contents(checkpoint, verified_contents)
1452 .expect("store operation should not fail");
1453 return Some(contents);
1454 }
1455 }
1456 debug!("no peers had checkpoint contents");
1457 None
1458}
1459
1460async fn update_checkpoint_watermark_metrics<S>(
1461 mut recv: oneshot::Receiver<()>,
1462 store: S,
1463 metrics: Metrics,
1464) -> Result<()>
1465where
1466 S: WriteStore + Clone + Send + Sync,
1467{
1468 let mut interval = tokio::time::interval(Duration::from_secs(5));
1469 loop {
1470 tokio::select! {
1471 _now = interval.tick() => {
1472 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1473 .expect("store operation should not fail");
1474 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1475 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1476 .expect("store operation should not fail");
1477 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1478 },
1479 _ = &mut recv => break,
1480 }
1481 }
1482 Ok(())
1483}