use anemo::{types::PeerEvent, PeerId, Request, Response, Result};
use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
use rand::Rng;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, RwLock},
time::Duration,
};
use sui_config::p2p::StateSyncConfig;
use sui_types::{
committee::Committee,
digests::CheckpointDigest,
messages_checkpoint::{
CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
},
storage::WriteStore,
};
use tap::{Pipe, TapFallible, TapOptional};
use tokio::sync::oneshot;
use tokio::{
sync::{broadcast, mpsc, watch},
task::{AbortHandle, JoinSet},
};
use tracing::{debug, info, instrument, trace, warn};
mod generated {
include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
}
mod builder;
mod metrics;
mod server;
#[cfg(test)]
mod tests;
pub use builder::{Builder, UnstartedStateSync};
pub use generated::{
state_sync_client::StateSyncClient,
state_sync_server::{StateSync, StateSyncServer},
};
pub use server::GetCheckpointAvailabilityResponse;
pub use server::GetCheckpointSummaryRequest;
use sui_archival::reader::ArchiveReaderBalancer;
use sui_storage::verify_checkpoint;
use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
#[derive(Clone, Debug)]
pub struct Handle {
sender: mpsc::Sender<StateSyncMessage>,
checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
}
impl Handle {
pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
self.sender
.send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
.await
.unwrap()
}
pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
self.checkpoint_event_sender.subscribe()
}
}
struct PeerHeights {
peers: HashMap<PeerId, PeerStateSyncInfo>,
unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
wait_interval_when_no_peer_to_sync_content: Duration,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct PeerStateSyncInfo {
genesis_checkpoint_digest: CheckpointDigest,
on_same_chain_as_us: bool,
height: CheckpointSequenceNumber,
lowest: CheckpointSequenceNumber,
}
impl PeerHeights {
pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
self.highest_known_checkpoint_sequence_number()
.and_then(|s| self.sequence_number_to_digest.get(&s))
.and_then(|digest| self.unprocessed_checkpoints.get(digest))
}
pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
self.peers
.values()
.filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
.max()
}
pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
self.peers
.iter()
.filter(|(_peer_id, info)| info.on_same_chain_as_us)
}
#[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
pub fn update_peer_info(
&mut self,
peer_id: PeerId,
checkpoint: Checkpoint,
low_watermark: Option<CheckpointSequenceNumber>,
) -> bool {
debug!("Update peer info");
let info = match self.peers.get_mut(&peer_id) {
Some(info) if info.on_same_chain_as_us => info,
_ => return false,
};
info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
if let Some(low_watermark) = low_watermark {
info.lowest = low_watermark;
}
self.insert_checkpoint(checkpoint);
true
}
#[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
use std::collections::hash_map::Entry;
debug!("Insert peer info");
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
entry.height = std::cmp::max(entry.height, info.height);
} else {
*entry = info;
}
}
Entry::Vacant(entry) => {
entry.insert(info);
}
}
}
pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
if let Some(info) = self.peers.get_mut(&peer_id) {
info.on_same_chain_as_us = false;
}
}
pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
self.unprocessed_checkpoints
.retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
self.sequence_number_to_digest
.retain(|&s, _digest| s > sequence_number);
}
pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
let digest = *checkpoint.digest();
let sequence_number = *checkpoint.sequence_number();
self.unprocessed_checkpoints.insert(digest, checkpoint);
self.sequence_number_to_digest
.insert(sequence_number, digest);
}
pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
self.sequence_number_to_digest
.remove(checkpoint.sequence_number());
}
}
pub fn get_checkpoint_by_sequence_number(
&self,
sequence_number: CheckpointSequenceNumber,
) -> Option<&Checkpoint> {
self.sequence_number_to_digest
.get(&sequence_number)
.and_then(|digest| self.get_checkpoint_by_digest(digest))
}
pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
self.unprocessed_checkpoints.get(digest)
}
#[cfg(test)]
pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
self.wait_interval_when_no_peer_to_sync_content = duration;
}
pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
self.wait_interval_when_no_peer_to_sync_content
}
}
#[derive(Clone)]
struct PeerBalancer {
peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
requested_checkpoint: Option<CheckpointSequenceNumber>,
request_type: PeerCheckpointRequestType,
}
#[derive(Clone)]
enum PeerCheckpointRequestType {
Summary,
Content,
}
impl PeerBalancer {
pub fn new(
network: &anemo::Network,
peer_heights: Arc<RwLock<PeerHeights>>,
request_type: PeerCheckpointRequestType,
) -> Self {
let mut peers: Vec<_> = peer_heights
.read()
.unwrap()
.peers_on_same_chain()
.filter_map(|(peer_id, info)| {
network
.peer(*peer_id)
.map(|peer| (peer.connection_rtt(), peer, *info))
})
.collect();
peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
Self {
peers: peers
.into_iter()
.map(|(_, peer, info)| (peer, info))
.collect(),
requested_checkpoint: None,
request_type,
}
}
pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
self.requested_checkpoint = Some(checkpoint);
self
}
}
impl Iterator for PeerBalancer {
type Item = StateSyncClient<anemo::Peer>;
fn next(&mut self) -> Option<Self::Item> {
while !self.peers.is_empty() {
const SELECTION_WINDOW: usize = 2;
let idx =
rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
let (peer, info) = self.peers.remove(idx).unwrap();
let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
match &self.request_type {
PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
return Some(StateSyncClient::new(peer));
}
PeerCheckpointRequestType::Content
if info.height >= requested_checkpoint
&& info.lowest <= requested_checkpoint =>
{
return Some(StateSyncClient::new(peer));
}
_ => {}
}
}
None
}
}
#[derive(Clone, Debug)]
enum StateSyncMessage {
StartSyncJob,
VerifiedCheckpoint(Box<VerifiedCheckpoint>),
SyncedCheckpoint(Box<VerifiedCheckpoint>),
}
struct StateSyncEventLoop<S> {
config: StateSyncConfig,
mailbox: mpsc::Receiver<StateSyncMessage>,
weak_sender: mpsc::WeakSender<StateSyncMessage>,
tasks: JoinSet<()>,
sync_checkpoint_summaries_task: Option<AbortHandle>,
sync_checkpoint_contents_task: Option<AbortHandle>,
download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
network: anemo::Network,
metrics: Metrics,
archive_readers: ArchiveReaderBalancer,
sync_checkpoint_from_archive_task: Option<AbortHandle>,
}
impl<S> StateSyncEventLoop<S>
where
S: WriteStore + Clone + Send + Sync + 'static,
{
pub async fn start(mut self) {
info!("State-Synchronizer started");
self.config.pinned_checkpoints.sort();
let mut interval = tokio::time::interval(self.config.interval_period());
let mut peer_events = {
let (subscriber, peers) = self.network.subscribe().unwrap();
for peer_id in peers {
self.spawn_get_latest_from_peer(peer_id);
}
subscriber
};
let (
target_checkpoint_contents_sequence_sender,
target_checkpoint_contents_sequence_receiver,
) = watch::channel(0);
let (_sender, receiver) = oneshot::channel();
tokio::spawn(update_checkpoint_watermark_metrics(
receiver,
self.store.clone(),
self.metrics.clone(),
));
let task = sync_checkpoint_contents(
self.network.clone(),
self.store.clone(),
self.peer_heights.clone(),
self.weak_sender.clone(),
self.checkpoint_event_sender.clone(),
self.config.checkpoint_content_download_concurrency(),
self.config.checkpoint_content_download_tx_concurrency(),
self.config.checkpoint_content_timeout(),
target_checkpoint_contents_sequence_receiver,
);
let task_handle = self.tasks.spawn(task);
self.sync_checkpoint_contents_task = Some(task_handle);
let task = sync_checkpoint_contents_from_archive(
self.network.clone(),
self.archive_readers.clone(),
self.store.clone(),
self.peer_heights.clone(),
);
let task_handle = self.tasks.spawn(task);
self.sync_checkpoint_from_archive_task = Some(task_handle);
loop {
tokio::select! {
now = interval.tick() => {
self.handle_tick(now.into_std());
},
maybe_message = self.mailbox.recv() => {
if let Some(message) = maybe_message {
self.handle_message(message);
} else {
break;
}
},
peer_event = peer_events.recv() => {
self.handle_peer_event(peer_event);
},
Some(task_result) = self.tasks.join_next() => {
match task_result {
Ok(()) => {},
Err(e) => {
if e.is_cancelled() {
} else if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
panic!("task failed: {e}");
}
},
};
if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
panic!("sync_checkpoint_contents task unexpectedly terminated")
}
if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
self.sync_checkpoint_summaries_task = None;
}
if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
panic!("sync_checkpoint_from_archive task unexpectedly terminated")
}
},
}
self.maybe_start_checkpoint_summary_sync_task();
self.maybe_trigger_checkpoint_contents_sync_task(
&target_checkpoint_contents_sequence_sender,
);
}
info!("State-Synchronizer ended");
}
fn handle_message(&mut self, message: StateSyncMessage) {
debug!("Received message: {:?}", message);
match message {
StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
self.handle_checkpoint_from_consensus(checkpoint)
}
StateSyncMessage::SyncedCheckpoint(checkpoint) => {
self.spawn_notify_peers_of_checkpoint(*checkpoint)
}
}
}
#[instrument(level = "debug", skip_all)]
fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
.unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
.digest();
if checkpoint.previous_digest != Some(prev_digest) {
panic!("Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}", checkpoint.sequence_number(), Some(prev_digest), checkpoint.previous_digest);
}
let latest_checkpoint = self
.store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");
if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
return;
}
let checkpoint = *checkpoint;
let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
if *checkpoint.sequence_number() > next_sequence_number {
debug!(
"consensus sent too new of a checkpoint, expecting: {}, got: {}",
next_sequence_number,
checkpoint.sequence_number()
);
}
#[cfg(debug_assertions)]
{
let _ = (next_sequence_number..=*checkpoint.sequence_number())
.map(|n| {
let checkpoint = self
.store
.get_checkpoint_by_sequence_number(n)
.unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
self.store
.get_full_checkpoint_contents(&checkpoint.content_digest)
.unwrap_or_else(|| {
panic!(
"store should contain checkpoint contents for {:?}",
checkpoint.content_digest
)
});
})
.collect::<Vec<_>>();
}
if let Some(EndOfEpochData {
next_epoch_committee,
..
}) = checkpoint.end_of_epoch_data.as_ref()
{
let next_committee = next_epoch_committee.iter().cloned().collect();
let committee =
Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
self.store
.insert_committee(committee)
.expect("store operation should not fail");
}
self.store
.update_highest_verified_checkpoint(&checkpoint)
.expect("store operation should not fail");
self.store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
let _ = self.checkpoint_event_sender.send(checkpoint.clone());
self.spawn_notify_peers_of_checkpoint(checkpoint);
}
fn handle_peer_event(
&mut self,
peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
) {
use tokio::sync::broadcast::error::RecvError;
match peer_event {
Ok(PeerEvent::NewPeer(peer_id)) => {
self.spawn_get_latest_from_peer(peer_id);
}
Ok(PeerEvent::LostPeer(peer_id, _)) => {
self.peer_heights.write().unwrap().peers.remove(&peer_id);
}
Err(RecvError::Closed) => {
panic!("PeerEvent channel shouldn't be able to be closed");
}
Err(RecvError::Lagged(_)) => {
trace!("State-Sync fell behind processing PeerEvents");
}
}
}
fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
if let Some(peer) = self.network.peer(peer_id) {
let genesis_checkpoint_digest = *self
.store
.get_checkpoint_by_sequence_number(0)
.expect("store should contain genesis checkpoint")
.digest();
let task = get_latest_from_peer(
genesis_checkpoint_digest,
peer,
self.peer_heights.clone(),
self.config.timeout(),
);
self.tasks.spawn(task);
}
}
fn handle_tick(&mut self, _now: std::time::Instant) {
let task = query_peers_for_their_latest_checkpoint(
self.network.clone(),
self.peer_heights.clone(),
self.weak_sender.clone(),
self.config.timeout(),
);
self.tasks.spawn(task);
if let Some(layer) = self.download_limit_layer.as_ref() {
layer.maybe_prune_map();
}
}
fn maybe_start_checkpoint_summary_sync_task(&mut self) {
if self.sync_checkpoint_summaries_task.is_some() {
return;
}
let highest_processed_checkpoint = self
.store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");
let highest_known_checkpoint = self
.peer_heights
.read()
.unwrap()
.highest_known_checkpoint()
.cloned();
if Some(highest_processed_checkpoint.sequence_number())
< highest_known_checkpoint
.as_ref()
.map(|x| x.sequence_number())
{
let task = sync_to_checkpoint(
self.network.clone(),
self.store.clone(),
self.peer_heights.clone(),
self.metrics.clone(),
self.config.pinned_checkpoints.clone(),
self.config.checkpoint_header_download_concurrency(),
self.config.timeout(),
highest_known_checkpoint.unwrap(),
)
.map(|result| match result {
Ok(()) => {}
Err(e) => {
debug!("error syncing checkpoint {e}");
}
});
let task_handle = self.tasks.spawn(task);
self.sync_checkpoint_summaries_task = Some(task_handle);
}
}
fn maybe_trigger_checkpoint_contents_sync_task(
&mut self,
target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
) {
let highest_verified_checkpoint = self
.store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");
let highest_synced_checkpoint = self
.store
.get_highest_synced_checkpoint()
.expect("store operation should not fail");
if highest_verified_checkpoint.sequence_number()
> highest_synced_checkpoint.sequence_number()
&& self
.peer_heights
.read()
.unwrap()
.highest_known_checkpoint_sequence_number()
> Some(*highest_synced_checkpoint.sequence_number())
{
let _ = target_sequence_channel.send_if_modified(|num| {
let new_num = *highest_verified_checkpoint.sequence_number();
if *num == new_num {
return false;
}
*num = new_num;
true
});
}
}
fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
let task = notify_peers_of_checkpoint(
self.network.clone(),
self.peer_heights.clone(),
checkpoint,
self.config.timeout(),
);
self.tasks.spawn(task);
}
}
async fn notify_peers_of_checkpoint(
network: anemo::Network,
peer_heights: Arc<RwLock<PeerHeights>>,
checkpoint: VerifiedCheckpoint,
timeout: Duration,
) {
let futs = peer_heights
.read()
.unwrap()
.peers_on_same_chain()
.filter_map(|(peer_id, info)| {
(*checkpoint.sequence_number() > info.height).then_some(peer_id)
})
.flat_map(|peer_id| network.peer(*peer_id))
.map(StateSyncClient::new)
.map(|mut client| {
let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
async move { client.push_checkpoint_summary(request).await }
})
.collect::<Vec<_>>();
futures::future::join_all(futs).await;
}
async fn get_latest_from_peer(
our_genesis_checkpoint_digest: CheckpointDigest,
peer: anemo::Peer,
peer_heights: Arc<RwLock<PeerHeights>>,
timeout: Duration,
) {
let peer_id = peer.peer_id();
let mut client = StateSyncClient::new(peer);
let info = {
let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
if let Some(info) = maybe_info {
info
} else {
let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
.with_timeout(timeout);
let response = client
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);
let info = match response {
Ok(Some(checkpoint)) => {
let digest = *checkpoint.digest();
PeerStateSyncInfo {
genesis_checkpoint_digest: digest,
on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
height: *checkpoint.sequence_number(),
lowest: CheckpointSequenceNumber::default(),
}
}
Ok(None) => PeerStateSyncInfo {
genesis_checkpoint_digest: CheckpointDigest::default(),
on_same_chain_as_us: false,
height: CheckpointSequenceNumber::default(),
lowest: CheckpointSequenceNumber::default(),
},
Err(status) => {
trace!("get_latest_checkpoint_summary request failed: {status:?}");
return;
}
};
peer_heights
.write()
.unwrap()
.insert_peer_info(peer_id, info);
info
}
};
if !info.on_same_chain_as_us {
trace!(?info, "Peer {peer_id} not on same chain as us");
return;
}
let Some((highest_checkpoint, low_watermark)) =
query_peer_for_latest_info(&mut client, timeout).await
else {
return;
};
peer_heights
.write()
.unwrap()
.update_peer_info(peer_id, highest_checkpoint, low_watermark);
}
async fn query_peer_for_latest_info(
client: &mut StateSyncClient<anemo::Peer>,
timeout: Duration,
) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
let request = Request::new(()).with_timeout(timeout);
let response = client
.get_checkpoint_availability(request)
.await
.map(Response::into_inner);
match response {
Ok(GetCheckpointAvailabilityResponse {
highest_synced_checkpoint,
lowest_available_checkpoint,
}) => {
return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
}
Err(status) => {
if status.status() != anemo::types::response::StatusCode::NotFound {
trace!("get_checkpoint_availability request failed: {status:?}");
return None;
}
}
};
let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
let response = client
.get_checkpoint_summary(request)
.await
.map(Response::into_inner);
match response {
Ok(Some(checkpoint)) => Some((checkpoint, None)),
Ok(None) => None,
Err(status) => {
trace!("get_checkpoint_summary (latest) request failed: {status:?}");
None
}
}
}
#[instrument(level = "debug", skip_all)]
async fn query_peers_for_their_latest_checkpoint(
network: anemo::Network,
peer_heights: Arc<RwLock<PeerHeights>>,
sender: mpsc::WeakSender<StateSyncMessage>,
timeout: Duration,
) {
let peer_heights = &peer_heights;
let futs = peer_heights
.read()
.unwrap()
.peers_on_same_chain()
.flat_map(|(peer_id, _info)| network.peer(*peer_id))
.map(|peer| {
let peer_id = peer.peer_id();
let mut client = StateSyncClient::new(peer);
async move {
let response = query_peer_for_latest_info(&mut client, timeout).await;
match response {
Some((highest_checkpoint, low_watermark)) => peer_heights
.write()
.unwrap()
.update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
.then_some(highest_checkpoint),
None => None,
}
}
})
.collect::<Vec<_>>();
debug!("Query {} peers for latest checkpoint", futs.len());
let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
let our_highest_checkpoint = peer_heights
.read()
.unwrap()
.highest_known_checkpoint()
.cloned();
debug!(
"Our highest checkpoint {:?}, peers highest checkpoint {:?}",
our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
highest_checkpoint.as_ref().map(|c| c.sequence_number())
);
let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
(Some(theirs), None) => theirs,
(Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
_ => return,
};
if let Some(sender) = sender.upgrade() {
let _ = sender.send(StateSyncMessage::StartSyncJob).await;
}
}
async fn sync_to_checkpoint<S>(
network: anemo::Network,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
metrics: Metrics,
pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
checkpoint_header_download_concurrency: usize,
timeout: Duration,
checkpoint: Checkpoint,
) -> Result<()>
where
S: WriteStore,
{
metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
let mut current = store
.get_highest_verified_checkpoint()
.expect("store operation should not fail");
if current.sequence_number() >= checkpoint.sequence_number() {
return Err(anyhow::anyhow!(
"target checkpoint {} is older than highest verified checkpoint {}",
checkpoint.sequence_number(),
current.sequence_number(),
));
}
let peer_balancer = PeerBalancer::new(
&network,
peer_heights.clone(),
PeerCheckpointRequestType::Summary,
);
let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
..=*checkpoint.sequence_number())
.map(|next| {
let peers = peer_balancer.clone().with_checkpoint(next);
let peer_heights = peer_heights.clone();
let pinned_checkpoints = &pinned_checkpoints;
async move {
if let Some(checkpoint) = peer_heights
.read()
.unwrap()
.get_checkpoint_by_sequence_number(next)
{
return (Some(checkpoint.to_owned()), next, None);
}
for mut peer in peers {
let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
.with_timeout(timeout);
if let Some(checkpoint) = peer
.get_checkpoint_summary(request)
.await
.tap_err(|e| trace!("{e:?}"))
.ok()
.and_then(Response::into_inner)
.tap_none(|| trace!("peer unable to help sync"))
{
if *checkpoint.sequence_number() != next {
tracing::debug!(
"peer returned checkpoint with wrong sequence number: expected {next}, got {}",
checkpoint.sequence_number()
);
continue;
}
let checkpoint_digest = checkpoint.digest();
if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
checkpoint.sequence_number(),
|(seq_num, _digest)| *seq_num
) {
if pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
tracing::debug!(
"peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
pinned_checkpoints[pinned_digest_index].1,
checkpoint_digest
);
continue;
}
}
peer_heights
.write()
.unwrap()
.insert_checkpoint(checkpoint.clone());
return (Some(checkpoint), next, Some(peer.inner().peer_id()));
}
}
(None, next, None)
}
})
.pipe(futures::stream::iter)
.buffered(checkpoint_header_download_concurrency);
while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
assert_eq!(
current
.sequence_number()
.checked_add(1)
.expect("exhausted u64"),
next
);
let checkpoint = 'cp: {
let checkpoint = maybe_checkpoint.ok_or_else(|| {
anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
})?;
if pinned_checkpoints
.binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
.is_ok()
{
break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
}
match verify_checkpoint(¤t, &store, checkpoint) {
Ok(verified_checkpoint) => verified_checkpoint,
Err(checkpoint) => {
let mut peer_heights = peer_heights.write().unwrap();
peer_heights.remove_checkpoint(checkpoint.digest());
if let Some(peer_id) = maybe_peer_id {
peer_heights.mark_peer_as_not_on_same_chain(peer_id);
}
return Err(anyhow::anyhow!(
"unable to verify checkpoint {checkpoint:?}"
));
}
}
};
debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
if let Some((checkpoint_summary_age_metric, checkpoint_summary_age_metric_deprecated)) =
metrics.checkpoint_summary_age_metrics()
{
checkpoint.report_checkpoint_age(
checkpoint_summary_age_metric,
checkpoint_summary_age_metric_deprecated,
);
}
current = checkpoint.clone();
store
.insert_checkpoint(&checkpoint)
.expect("store operation should not fail");
}
peer_heights
.write()
.unwrap()
.cleanup_old_checkpoints(*checkpoint.sequence_number());
Ok(())
}
async fn sync_checkpoint_contents_from_archive<S>(
network: anemo::Network,
archive_readers: ArchiveReaderBalancer,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
) where
S: WriteStore + Clone + Send + Sync + 'static,
{
loop {
let peers: Vec<_> = peer_heights
.read()
.unwrap()
.peers_on_same_chain()
.filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
.collect();
let lowest_checkpoint_on_peers = peers
.iter()
.map(|(_p, state_sync_info)| state_sync_info.lowest)
.min();
let highest_synced = store
.get_highest_synced_checkpoint()
.expect("store operation should not fail")
.sequence_number;
let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers
{
highest_synced < lowest_checkpoint_on_peers
} else {
false
};
debug!("Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}", lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string()));
if sync_from_archive {
let start = highest_synced
.checked_add(1)
.expect("Checkpoint seq num overflow");
let checkpoint_range = start..lowest_checkpoint_on_peers.unwrap();
if let Some(archive_reader) = archive_readers
.pick_one_random(checkpoint_range.clone())
.await
{
let txn_counter = Arc::new(AtomicU64::new(0));
let checkpoint_counter = Arc::new(AtomicU64::new(0));
if let Err(err) = archive_reader
.read(
store.clone(),
checkpoint_range,
txn_counter.clone(),
checkpoint_counter.clone(),
true,
)
.await
{
warn!("State sync from archive failed with error: {:?}", err);
} else {
info!("State sync from archive is complete. Checkpoints downloaded = {:?}, Txns downloaded = {:?}", checkpoint_counter.load(Ordering::Relaxed), txn_counter.load(Ordering::Relaxed));
}
} else {
warn!("Failed to find an archive reader to complete the state sync request");
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn sync_checkpoint_contents<S>(
network: anemo::Network,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
sender: mpsc::WeakSender<StateSyncMessage>,
checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
checkpoint_content_download_concurrency: usize,
checkpoint_content_download_tx_concurrency: u64,
timeout: Duration,
mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
) where
S: WriteStore + Clone,
{
let mut highest_synced = store
.get_highest_synced_checkpoint()
.expect("store operation should not fail");
let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
let mut target_sequence_cursor = 0;
let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
let mut checkpoint_contents_tasks = FuturesOrdered::new();
let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
loop {
tokio::select! {
result = target_sequence_channel.changed() => {
match result {
Ok(()) => {
target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
}
Err(_) => {
return
}
}
},
Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
match maybe_checkpoint {
Ok(checkpoint) => {
let _: &VerifiedCheckpoint = &checkpoint; store
.update_highest_synced_checkpoint(&checkpoint)
.expect("store operation should not fail");
let _ = checkpoint_event_sender.send(checkpoint.clone());
tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
highest_synced = checkpoint;
}
Err(checkpoint) => {
let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
}
} else {
info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
}
checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
network.clone(),
&store,
peer_heights.clone(),
timeout,
checkpoint,
));
}
}
},
}
while current_sequence < target_sequence_cursor
&& checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
{
let next_checkpoint = store
.get_checkpoint_by_sequence_number(current_sequence)
.expect(
"BUG: store should have all checkpoints older than highest_verified_checkpoint",
);
let tx_count = next_checkpoint.network_total_transactions
- highest_started_network_total_transactions;
if tx_count > tx_concurrency_remaining {
break;
}
tx_concurrency_remaining -= tx_count;
highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
current_sequence += 1;
checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
network.clone(),
&store,
peer_heights.clone(),
timeout,
next_checkpoint,
));
}
if highest_synced.sequence_number() % checkpoint_content_download_concurrency as u64 == 0
|| checkpoint_contents_tasks.is_empty()
{
if let Some(sender) = sender.upgrade() {
let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
let _ = sender.send(message).await;
}
}
}
}
#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
async fn sync_one_checkpoint_contents<S>(
network: anemo::Network,
store: S,
peer_heights: Arc<RwLock<PeerHeights>>,
timeout: Duration,
checkpoint: VerifiedCheckpoint,
) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
where
S: WriteStore + Clone,
{
debug!("syncing checkpoint contents");
if store
.get_highest_synced_checkpoint()
.expect("store operation should not fail")
.sequence_number()
>= checkpoint.sequence_number()
{
debug!("checkpoint was already created via consensus output");
return Ok(checkpoint);
}
let peers = PeerBalancer::new(
&network,
peer_heights.clone(),
PeerCheckpointRequestType::Content,
)
.with_checkpoint(*checkpoint.sequence_number());
let now = tokio::time::Instant::now();
let Some(_contents) = get_full_checkpoint_contents(peers, &store, &checkpoint, timeout).await
else {
let duration = peer_heights
.read()
.unwrap()
.wait_interval_when_no_peer_to_sync_content();
if now.elapsed() < duration {
let duration = duration - now.elapsed();
info!("retrying checkpoint sync after {:?}", duration);
tokio::time::sleep(duration).await;
}
return Err(checkpoint);
};
debug!("completed checkpoint contents sync");
Ok(checkpoint)
}
#[instrument(level = "debug", skip_all)]
async fn get_full_checkpoint_contents<S>(
peers: PeerBalancer,
store: S,
checkpoint: &VerifiedCheckpoint,
timeout: Duration,
) -> Option<FullCheckpointContents>
where
S: WriteStore,
{
let digest = checkpoint.content_digest;
if let Some(contents) = store
.get_full_checkpoint_contents_by_sequence_number(*checkpoint.sequence_number())
.or_else(|| store.get_full_checkpoint_contents(&digest))
{
debug!("store already contains checkpoint contents");
return Some(contents);
}
for mut peer in peers {
debug!(
?timeout,
"requesting checkpoint contents from {}",
peer.inner().peer_id(),
);
let request = Request::new(digest).with_timeout(timeout);
if let Some(contents) = peer
.get_checkpoint_contents(request)
.await
.tap_err(|e| trace!("{e:?}"))
.ok()
.and_then(Response::into_inner)
.tap_none(|| trace!("peer unable to help sync"))
{
if contents.verify_digests(digest).is_ok() {
let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
store
.insert_checkpoint_contents(checkpoint, verified_contents)
.expect("store operation should not fail");
return Some(contents);
}
}
}
debug!("no peers had checkpoint contents");
None
}
async fn update_checkpoint_watermark_metrics<S>(
mut recv: oneshot::Receiver<()>,
store: S,
metrics: Metrics,
) -> Result<()>
where
S: WriteStore + Clone + Send + Sync,
{
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_now = interval.tick() => {
let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
.expect("store operation should not fail");
metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
.expect("store operation should not fail");
metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
},
_ = &mut recv => break,
}
}
Ok(())
}