sui_network/state_sync/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Handle, PeerHeights, StateSync, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6    metrics::Metrics,
7    server::{CheckpointContentsDownloadLimitLayer, Server},
8};
9use anemo::codegen::InboundRequestLayer;
10use anemo_tower::{inflight_limit, rate_limit};
11use std::{
12    collections::HashMap,
13    sync::{Arc, RwLock},
14};
15use sui_config::node::ArchiveReaderConfig;
16use sui_config::p2p::StateSyncConfig;
17use sui_types::messages_checkpoint::VerifiedCheckpoint;
18use sui_types::storage::WriteStore;
19use tap::Pipe;
20use tokio::{
21    sync::{broadcast, mpsc},
22    task::JoinSet,
23};
24
25pub struct Builder<S> {
26    store: Option<S>,
27    config: Option<StateSyncConfig>,
28    metrics: Option<Metrics>,
29    archive_config: Option<ArchiveReaderConfig>,
30}
31
32impl Builder<()> {
33    #[allow(clippy::new_without_default)]
34    pub fn new() -> Self {
35        Self {
36            store: None,
37            config: None,
38            metrics: None,
39            archive_config: None,
40        }
41    }
42}
43
44impl<S> Builder<S> {
45    pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
46        Builder {
47            store: Some(store),
48            config: self.config,
49            metrics: self.metrics,
50            archive_config: self.archive_config,
51        }
52    }
53
54    pub fn config(mut self, config: StateSyncConfig) -> Self {
55        self.config = Some(config);
56        self
57    }
58
59    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
60        self.metrics = Some(Metrics::enabled(registry));
61        self
62    }
63
64    pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
65        self.archive_config = archive_config;
66        self
67    }
68}
69
70impl<S> Builder<S>
71where
72    S: WriteStore + Clone + Send + Sync + 'static,
73{
74    pub fn build(self) -> (UnstartedStateSync<S>, StateSyncServer<impl StateSync>) {
75        let state_sync_config = self.config.clone().unwrap_or_default();
76        let (mut builder, server) = self.build_internal();
77        let mut state_sync_server = StateSyncServer::new(server);
78
79        // Apply rate limits from configuration as needed.
80        if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
81            state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
82                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
83                    governor::Quota::per_second(limit),
84                    rate_limit::WaitMode::Block,
85                )),
86            );
87        }
88        if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
89            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
90                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
91                    governor::Quota::per_second(limit),
92                    rate_limit::WaitMode::Block,
93                )),
94            );
95        }
96        if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
97            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
98                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
99                    governor::Quota::per_second(limit),
100                    rate_limit::WaitMode::Block,
101                )),
102            );
103        }
104        if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
105            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
106                InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
107                    limit,
108                    inflight_limit::WaitMode::ReturnError,
109                )),
110            );
111        }
112        if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
113            let layer = CheckpointContentsDownloadLimitLayer::new(limit);
114            builder.download_limit_layer = Some(layer.clone());
115            state_sync_server = state_sync_server
116                .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
117        }
118
119        (builder, state_sync_server)
120    }
121
122    pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
123        let Builder {
124            store,
125            config,
126            metrics,
127            archive_config,
128        } = self;
129        let store = store.unwrap();
130        let config = config.unwrap_or_default();
131        let metrics = metrics.unwrap_or_else(Metrics::disabled);
132
133        let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
134        let (checkpoint_event_sender, _receiver) =
135            broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
136        let weak_sender = sender.downgrade();
137        let handle = Handle {
138            sender,
139            checkpoint_event_sender: checkpoint_event_sender.clone(),
140        };
141        let peer_heights = PeerHeights {
142            peers: HashMap::new(),
143            unprocessed_checkpoints: HashMap::new(),
144            sequence_number_to_digest: HashMap::new(),
145            wait_interval_when_no_peer_to_sync_content: config
146                .wait_interval_when_no_peer_to_sync_content(),
147        }
148        .pipe(RwLock::new)
149        .pipe(Arc::new);
150
151        let server = Server {
152            store: store.clone(),
153            peer_heights: peer_heights.clone(),
154            sender: weak_sender,
155        };
156
157        (
158            UnstartedStateSync {
159                config,
160                handle,
161                mailbox,
162                store,
163                download_limit_layer: None,
164                peer_heights,
165                checkpoint_event_sender,
166                metrics,
167                archive_config,
168            },
169            server,
170        )
171    }
172}
173
174pub struct UnstartedStateSync<S> {
175    pub(super) config: StateSyncConfig,
176    pub(super) handle: Handle,
177    pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
178    pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
179    pub(super) store: S,
180    pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
181    pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
182    pub(super) metrics: Metrics,
183    pub(super) archive_config: Option<ArchiveReaderConfig>,
184}
185
186impl<S> UnstartedStateSync<S>
187where
188    S: WriteStore + Clone + Send + Sync + 'static,
189{
190    pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
191        let Self {
192            config,
193            handle,
194            mailbox,
195            download_limit_layer,
196            store,
197            peer_heights,
198            checkpoint_event_sender,
199            metrics,
200            archive_config,
201        } = self;
202
203        (
204            StateSyncEventLoop {
205                config,
206                mailbox,
207                weak_sender: handle.sender.downgrade(),
208                tasks: JoinSet::new(),
209                sync_checkpoint_summaries_task: None,
210                sync_checkpoint_contents_task: None,
211                download_limit_layer,
212                store,
213                peer_heights,
214                checkpoint_event_sender,
215                network,
216                metrics,
217                sync_checkpoint_from_archive_task: None,
218                archive_config,
219            },
220            handle,
221        )
222    }
223
224    pub fn start(self, network: anemo::Network) -> Handle {
225        let (event_loop, handle) = self.build(network);
226        tokio::spawn(event_loop.start());
227
228        handle
229    }
230}