sui_network/state_sync/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6    metrics::Metrics,
7    server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use anemo::codegen::InboundRequestLayer;
10use anemo_tower::{inflight_limit, rate_limit};
11use std::{
12    collections::HashMap,
13    sync::{Arc, RwLock},
14};
15use sui_config::node::ArchiveReaderConfig;
16use sui_config::p2p::StateSyncConfig;
17use sui_types::messages_checkpoint::VerifiedCheckpoint;
18use sui_types::storage::WriteStore;
19use tap::Pipe;
20use tokio::{
21    sync::{broadcast, mpsc},
22    task::JoinSet,
23};
24
25pub struct Builder<S> {
26    store: Option<S>,
27    config: Option<StateSyncConfig>,
28    metrics: Option<Metrics>,
29    archive_config: Option<ArchiveReaderConfig>,
30}
31
32impl Builder<()> {
33    #[allow(clippy::new_without_default)]
34    pub fn new() -> Self {
35        Self {
36            store: None,
37            config: None,
38            metrics: None,
39            archive_config: None,
40        }
41    }
42}
43
44impl<S> Builder<S> {
45    pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
46        Builder {
47            store: Some(store),
48            config: self.config,
49            metrics: self.metrics,
50            archive_config: self.archive_config,
51        }
52    }
53
54    pub fn config(mut self, config: StateSyncConfig) -> Self {
55        self.config = Some(config);
56        self
57    }
58
59    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
60        self.metrics = Some(Metrics::enabled(registry));
61        self
62    }
63
64    pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
65        self.archive_config = archive_config;
66        self
67    }
68}
69
70impl<S> Builder<S>
71where
72    S: WriteStore + Clone + Send + Sync + 'static,
73{
74    pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router) {
75        let state_sync_config = self.config.clone().unwrap_or_default();
76        let (mut builder, server) = self.build_internal();
77        let mut state_sync_server = StateSyncServer::new(server);
78
79        // Apply rate limits from configuration as needed.
80        if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
81            state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
82                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
83                    governor::Quota::per_second(limit),
84                    rate_limit::WaitMode::Block,
85                )),
86            );
87        }
88        if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
89            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
90                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
91                    governor::Quota::per_second(limit),
92                    rate_limit::WaitMode::Block,
93                )),
94            );
95        }
96        if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
97            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
98                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
99                    governor::Quota::per_second(limit),
100                    rate_limit::WaitMode::Block,
101                )),
102            );
103        }
104        if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
105            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
106                InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
107                    limit,
108                    inflight_limit::WaitMode::ReturnError,
109                )),
110            );
111        }
112        if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
113            let layer = CheckpointContentsDownloadLimitLayer::new(limit);
114            builder.download_limit_layer = Some(layer.clone());
115            state_sync_server = state_sync_server
116                .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
117        }
118
119        let router = anemo::Router::new()
120            // Size limit layer applied before deserialization.
121            .route_layer(SizeLimitLayer::new(
122                state_sync_config.max_checkpoint_summary_size(),
123            ))
124            .add_rpc_service(state_sync_server);
125
126        (builder, router)
127    }
128
129    pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
130        let Builder {
131            store,
132            config,
133            metrics,
134            archive_config,
135        } = self;
136        let store = store.unwrap();
137        let config = config.unwrap_or_default();
138        let metrics = metrics.unwrap_or_else(Metrics::disabled);
139
140        let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
141        let (checkpoint_event_sender, _receiver) =
142            broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
143        let weak_sender = sender.downgrade();
144        let handle = Handle {
145            sender,
146            checkpoint_event_sender: checkpoint_event_sender.clone(),
147        };
148        let peer_heights = PeerHeights {
149            peers: HashMap::new(),
150            unprocessed_checkpoints: HashMap::new(),
151            sequence_number_to_digest: HashMap::new(),
152            wait_interval_when_no_peer_to_sync_content: config
153                .wait_interval_when_no_peer_to_sync_content(),
154        }
155        .pipe(RwLock::new)
156        .pipe(Arc::new);
157
158        let server = Server {
159            store: store.clone(),
160            peer_heights: peer_heights.clone(),
161            sender: weak_sender,
162            max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
163        };
164
165        (
166            UnstartedStateSync {
167                config,
168                handle,
169                mailbox,
170                store,
171                download_limit_layer: None,
172                peer_heights,
173                checkpoint_event_sender,
174                metrics,
175                archive_config,
176            },
177            server,
178        )
179    }
180}
181
182pub struct UnstartedStateSync<S> {
183    pub(super) config: StateSyncConfig,
184    pub(super) handle: Handle,
185    pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
186    pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
187    pub(super) store: S,
188    pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
189    pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
190    pub(super) metrics: Metrics,
191    pub(super) archive_config: Option<ArchiveReaderConfig>,
192}
193
194impl<S> UnstartedStateSync<S>
195where
196    S: WriteStore + Clone + Send + Sync + 'static,
197{
198    pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
199        let Self {
200            config,
201            handle,
202            mailbox,
203            download_limit_layer,
204            store,
205            peer_heights,
206            checkpoint_event_sender,
207            metrics,
208            archive_config,
209        } = self;
210
211        (
212            StateSyncEventLoop {
213                config,
214                mailbox,
215                weak_sender: handle.sender.downgrade(),
216                tasks: JoinSet::new(),
217                sync_checkpoint_summaries_task: None,
218                sync_checkpoint_contents_task: None,
219                download_limit_layer,
220                store,
221                peer_heights,
222                checkpoint_event_sender,
223                network,
224                metrics,
225                sync_checkpoint_from_archive_task: None,
226                archive_config,
227            },
228            handle,
229        )
230    }
231
232    pub fn start(self, network: anemo::Network) -> Handle {
233        let (event_loop, handle) = self.build(network);
234        tokio::spawn(event_loop.start());
235
236        handle
237    }
238}