sui_network/state_sync/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6    metrics::Metrics,
7    server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use crate::discovery;
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use std::{
13    collections::HashMap,
14    sync::{Arc, RwLock},
15};
16use sui_config::node::ArchiveReaderConfig;
17use sui_config::p2p::StateSyncConfig;
18use sui_types::messages_checkpoint::VerifiedCheckpoint;
19use sui_types::storage::WriteStore;
20use tap::Pipe;
21use tokio::{
22    sync::{broadcast, mpsc},
23    task::JoinSet,
24};
25
26pub struct Builder<S> {
27    store: Option<S>,
28    config: Option<StateSyncConfig>,
29    metrics: Option<Metrics>,
30    archive_config: Option<ArchiveReaderConfig>,
31    discovery_sender: Option<discovery::Sender>,
32}
33
34impl Builder<()> {
35    #[allow(clippy::new_without_default)]
36    pub fn new() -> Self {
37        Self {
38            store: None,
39            config: None,
40            metrics: None,
41            archive_config: None,
42            discovery_sender: None,
43        }
44    }
45}
46
47impl<S> Builder<S> {
48    pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
49        Builder {
50            store: Some(store),
51            config: self.config,
52            metrics: self.metrics,
53            archive_config: self.archive_config,
54            discovery_sender: self.discovery_sender,
55        }
56    }
57
58    pub fn discovery_sender(mut self, sender: discovery::Sender) -> Self {
59        self.discovery_sender = Some(sender);
60        self
61    }
62
63    pub fn config(mut self, config: StateSyncConfig) -> Self {
64        self.config = Some(config);
65        self
66    }
67
68    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
69        self.metrics = Some(Metrics::enabled(registry));
70        self
71    }
72
73    pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
74        self.archive_config = archive_config;
75        self
76    }
77}
78
79impl<S> Builder<S>
80where
81    S: WriteStore + Clone + Send + Sync + 'static,
82{
83    pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router<anemo::ServicesSealed>) {
84        let state_sync_config = self.config.clone().unwrap_or_default();
85        let (mut builder, server) = self.build_internal();
86        let mut state_sync_server = StateSyncServer::new(server);
87
88        // Apply rate limits from configuration as needed.
89        if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
90            state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
91                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
92                    governor::Quota::per_second(limit),
93                    rate_limit::WaitMode::Block,
94                )),
95            );
96        }
97        if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
98            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
99                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
100                    governor::Quota::per_second(limit),
101                    rate_limit::WaitMode::Block,
102                )),
103            );
104        }
105        if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
106            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
107                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
108                    governor::Quota::per_second(limit),
109                    rate_limit::WaitMode::Block,
110                )),
111            );
112        }
113        if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
114            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
115                InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
116                    limit,
117                    inflight_limit::WaitMode::ReturnError,
118                )),
119            );
120        }
121        if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
122            let layer = CheckpointContentsDownloadLimitLayer::new(limit);
123            builder.download_limit_layer = Some(layer.clone());
124            state_sync_server = state_sync_server
125                .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
126        }
127
128        let router = anemo::Router::new()
129            .add_rpc_service(state_sync_server)
130            // Size limit layer applies to request messages only. This effectively
131            // bounds only checkpoint summary size, because all other state sync
132            // request messages are very small.
133            .route_layer(SizeLimitLayer::new(
134                state_sync_config.max_checkpoint_summary_size(),
135            ));
136
137        (builder, router)
138    }
139
140    pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
141        let Builder {
142            store,
143            config,
144            metrics,
145            archive_config,
146            discovery_sender,
147        } = self;
148        let store = store.unwrap();
149        let config = config.unwrap_or_default();
150        let metrics = metrics.unwrap_or_else(Metrics::disabled);
151
152        let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
153        let (checkpoint_event_sender, _receiver) =
154            broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
155        let weak_sender = sender.downgrade();
156        let handle = Handle {
157            sender,
158            checkpoint_event_sender: checkpoint_event_sender.clone(),
159            metrics: metrics.clone(),
160        };
161        let peer_heights = PeerHeights {
162            peers: HashMap::new(),
163            unprocessed_checkpoints: HashMap::new(),
164            sequence_number_to_digest: HashMap::new(),
165            scores: HashMap::new(),
166            wait_interval_when_no_peer_to_sync_content: config
167                .wait_interval_when_no_peer_to_sync_content(),
168            peer_scoring_window: config.peer_scoring_window(),
169            peer_failure_rate: config.peer_failure_rate(),
170            checkpoint_content_timeout_min: config.checkpoint_content_timeout_min(),
171            checkpoint_content_timeout_max: config.checkpoint_content_timeout_max(),
172            exploration_probability: config.exploration_probability(),
173        }
174        .pipe(RwLock::new)
175        .pipe(Arc::new);
176
177        let server = Server {
178            store: store.clone(),
179            peer_heights: peer_heights.clone(),
180            sender: weak_sender,
181            max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
182        };
183
184        (
185            UnstartedStateSync {
186                config,
187                handle,
188                mailbox,
189                store,
190                download_limit_layer: None,
191                peer_heights,
192                checkpoint_event_sender,
193                metrics,
194                archive_config,
195                discovery_sender,
196            },
197            server,
198        )
199    }
200}
201
202pub struct UnstartedStateSync<S> {
203    pub(super) config: StateSyncConfig,
204    pub(super) handle: Handle,
205    pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
206    pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
207    pub(super) store: S,
208    pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
209    pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
210    pub(super) metrics: Metrics,
211    pub(super) archive_config: Option<ArchiveReaderConfig>,
212    pub(super) discovery_sender: Option<discovery::Sender>,
213}
214
215impl<S> UnstartedStateSync<S>
216where
217    S: WriteStore + Clone + Send + Sync + 'static,
218{
219    pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
220        let Self {
221            config,
222            handle,
223            mailbox,
224            download_limit_layer,
225            store,
226            peer_heights,
227            checkpoint_event_sender,
228            metrics,
229            archive_config,
230            discovery_sender,
231        } = self;
232
233        (
234            StateSyncEventLoop {
235                config,
236                mailbox,
237                weak_sender: handle.sender.downgrade(),
238                tasks: JoinSet::new(),
239                sync_checkpoint_summaries_task: None,
240                sync_checkpoint_contents_task: None,
241                download_limit_layer,
242                store,
243                peer_heights,
244                checkpoint_event_sender,
245                network,
246                metrics,
247                sync_checkpoint_from_archive_task: None,
248                archive_config,
249                discovery_sender,
250            },
251            handle,
252        )
253    }
254
255    pub fn start(self, network: anemo::Network) -> Handle {
256        let (event_loop, handle) = self.build(network);
257        tokio::spawn(event_loop.start());
258
259        handle
260    }
261}