sui_network/state_sync/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6    metrics::Metrics,
7    server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use crate::discovery;
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use std::{
13    collections::HashMap,
14    sync::{Arc, RwLock},
15};
16use sui_config::node::ArchiveReaderConfig;
17use sui_config::p2p::StateSyncConfig;
18use sui_types::messages_checkpoint::VerifiedCheckpoint;
19use sui_types::storage::WriteStore;
20use tap::Pipe;
21use tokio::{
22    sync::{broadcast, mpsc},
23    task::JoinSet,
24};
25
26pub struct Builder<S> {
27    store: Option<S>,
28    config: Option<StateSyncConfig>,
29    metrics: Option<Metrics>,
30    archive_config: Option<ArchiveReaderConfig>,
31    discovery_sender: Option<discovery::Sender>,
32}
33
34impl Builder<()> {
35    #[allow(clippy::new_without_default)]
36    pub fn new() -> Self {
37        Self {
38            store: None,
39            config: None,
40            metrics: None,
41            archive_config: None,
42            discovery_sender: None,
43        }
44    }
45}
46
47impl<S> Builder<S> {
48    pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
49        Builder {
50            store: Some(store),
51            config: self.config,
52            metrics: self.metrics,
53            archive_config: self.archive_config,
54            discovery_sender: self.discovery_sender,
55        }
56    }
57
58    pub fn discovery_sender(mut self, sender: discovery::Sender) -> Self {
59        self.discovery_sender = Some(sender);
60        self
61    }
62
63    pub fn config(mut self, config: StateSyncConfig) -> Self {
64        self.config = Some(config);
65        self
66    }
67
68    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
69        self.metrics = Some(Metrics::enabled(registry));
70        self
71    }
72
73    pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
74        self.archive_config = archive_config;
75        self
76    }
77}
78
79impl<S> Builder<S>
80where
81    S: WriteStore + Clone + Send + Sync + 'static,
82{
83    pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router) {
84        let state_sync_config = self.config.clone().unwrap_or_default();
85        let (mut builder, server) = self.build_internal();
86        let mut state_sync_server = StateSyncServer::new(server);
87
88        // Apply rate limits from configuration as needed.
89        if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
90            state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
91                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
92                    governor::Quota::per_second(limit),
93                    rate_limit::WaitMode::Block,
94                )),
95            );
96        }
97        if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
98            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
99                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
100                    governor::Quota::per_second(limit),
101                    rate_limit::WaitMode::Block,
102                )),
103            );
104        }
105        if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
106            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
107                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
108                    governor::Quota::per_second(limit),
109                    rate_limit::WaitMode::Block,
110                )),
111            );
112        }
113        if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
114            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
115                InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
116                    limit,
117                    inflight_limit::WaitMode::ReturnError,
118                )),
119            );
120        }
121        if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
122            let layer = CheckpointContentsDownloadLimitLayer::new(limit);
123            builder.download_limit_layer = Some(layer.clone());
124            state_sync_server = state_sync_server
125                .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
126        }
127
128        let router = anemo::Router::new()
129            // Size limit layer applied before deserialization.
130            .route_layer(SizeLimitLayer::new(
131                state_sync_config.max_checkpoint_summary_size(),
132            ))
133            .add_rpc_service(state_sync_server);
134
135        (builder, router)
136    }
137
138    pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
139        let Builder {
140            store,
141            config,
142            metrics,
143            archive_config,
144            discovery_sender,
145        } = self;
146        let store = store.unwrap();
147        let config = config.unwrap_or_default();
148        let metrics = metrics.unwrap_or_else(Metrics::disabled);
149
150        let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
151        let (checkpoint_event_sender, _receiver) =
152            broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
153        let weak_sender = sender.downgrade();
154        let handle = Handle {
155            sender,
156            checkpoint_event_sender: checkpoint_event_sender.clone(),
157            metrics: metrics.clone(),
158        };
159        let peer_heights = PeerHeights {
160            peers: HashMap::new(),
161            unprocessed_checkpoints: HashMap::new(),
162            sequence_number_to_digest: HashMap::new(),
163            scores: HashMap::new(),
164            wait_interval_when_no_peer_to_sync_content: config
165                .wait_interval_when_no_peer_to_sync_content(),
166            peer_scoring_window: config.peer_scoring_window(),
167            peer_failure_rate: config.peer_failure_rate(),
168            checkpoint_content_timeout_min: config.checkpoint_content_timeout_min(),
169            checkpoint_content_timeout_max: config.checkpoint_content_timeout_max(),
170            exploration_probability: config.exploration_probability(),
171        }
172        .pipe(RwLock::new)
173        .pipe(Arc::new);
174
175        let server = Server {
176            store: store.clone(),
177            peer_heights: peer_heights.clone(),
178            sender: weak_sender,
179            max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
180        };
181
182        (
183            UnstartedStateSync {
184                config,
185                handle,
186                mailbox,
187                store,
188                download_limit_layer: None,
189                peer_heights,
190                checkpoint_event_sender,
191                metrics,
192                archive_config,
193                discovery_sender,
194            },
195            server,
196        )
197    }
198}
199
200pub struct UnstartedStateSync<S> {
201    pub(super) config: StateSyncConfig,
202    pub(super) handle: Handle,
203    pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
204    pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
205    pub(super) store: S,
206    pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
207    pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
208    pub(super) metrics: Metrics,
209    pub(super) archive_config: Option<ArchiveReaderConfig>,
210    pub(super) discovery_sender: Option<discovery::Sender>,
211}
212
213impl<S> UnstartedStateSync<S>
214where
215    S: WriteStore + Clone + Send + Sync + 'static,
216{
217    pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
218        let Self {
219            config,
220            handle,
221            mailbox,
222            download_limit_layer,
223            store,
224            peer_heights,
225            checkpoint_event_sender,
226            metrics,
227            archive_config,
228            discovery_sender,
229        } = self;
230
231        (
232            StateSyncEventLoop {
233                config,
234                mailbox,
235                weak_sender: handle.sender.downgrade(),
236                tasks: JoinSet::new(),
237                sync_checkpoint_summaries_task: None,
238                sync_checkpoint_contents_task: None,
239                download_limit_layer,
240                store,
241                peer_heights,
242                checkpoint_event_sender,
243                network,
244                metrics,
245                sync_checkpoint_from_archive_task: None,
246                archive_config,
247                discovery_sender,
248            },
249            handle,
250        )
251    }
252
253    pub fn start(self, network: anemo::Network) -> Handle {
254        let (event_loop, handle) = self.build(network);
255        tokio::spawn(event_loop.start());
256
257        handle
258    }
259}