1use super::{
5 Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6 metrics::Metrics,
7 server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use crate::discovery;
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use std::{
13 collections::HashMap,
14 sync::{Arc, RwLock},
15};
16use sui_config::node::ArchiveReaderConfig;
17use sui_config::p2p::StateSyncConfig;
18use sui_types::messages_checkpoint::VerifiedCheckpoint;
19use sui_types::storage::WriteStore;
20use tap::Pipe;
21use tokio::{
22 sync::{broadcast, mpsc},
23 task::JoinSet,
24};
25
26pub struct Builder<S> {
27 store: Option<S>,
28 config: Option<StateSyncConfig>,
29 metrics: Option<Metrics>,
30 archive_config: Option<ArchiveReaderConfig>,
31 discovery_sender: Option<discovery::Sender>,
32}
33
34impl Builder<()> {
35 #[allow(clippy::new_without_default)]
36 pub fn new() -> Self {
37 Self {
38 store: None,
39 config: None,
40 metrics: None,
41 archive_config: None,
42 discovery_sender: None,
43 }
44 }
45}
46
47impl<S> Builder<S> {
48 pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
49 Builder {
50 store: Some(store),
51 config: self.config,
52 metrics: self.metrics,
53 archive_config: self.archive_config,
54 discovery_sender: self.discovery_sender,
55 }
56 }
57
58 pub fn discovery_sender(mut self, sender: discovery::Sender) -> Self {
59 self.discovery_sender = Some(sender);
60 self
61 }
62
63 pub fn config(mut self, config: StateSyncConfig) -> Self {
64 self.config = Some(config);
65 self
66 }
67
68 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
69 self.metrics = Some(Metrics::enabled(registry));
70 self
71 }
72
73 pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
74 self.archive_config = archive_config;
75 self
76 }
77}
78
79impl<S> Builder<S>
80where
81 S: WriteStore + Clone + Send + Sync + 'static,
82{
83 pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router) {
84 let state_sync_config = self.config.clone().unwrap_or_default();
85 let (mut builder, server) = self.build_internal();
86 let mut state_sync_server = StateSyncServer::new(server);
87
88 if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
90 state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
91 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
92 governor::Quota::per_second(limit),
93 rate_limit::WaitMode::Block,
94 )),
95 );
96 }
97 if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
98 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
99 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
100 governor::Quota::per_second(limit),
101 rate_limit::WaitMode::Block,
102 )),
103 );
104 }
105 if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
106 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
107 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
108 governor::Quota::per_second(limit),
109 rate_limit::WaitMode::Block,
110 )),
111 );
112 }
113 if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
114 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
115 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
116 limit,
117 inflight_limit::WaitMode::ReturnError,
118 )),
119 );
120 }
121 if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
122 let layer = CheckpointContentsDownloadLimitLayer::new(limit);
123 builder.download_limit_layer = Some(layer.clone());
124 state_sync_server = state_sync_server
125 .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
126 }
127
128 let router = anemo::Router::new()
129 .route_layer(SizeLimitLayer::new(
131 state_sync_config.max_checkpoint_summary_size(),
132 ))
133 .add_rpc_service(state_sync_server);
134
135 (builder, router)
136 }
137
138 pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
139 let Builder {
140 store,
141 config,
142 metrics,
143 archive_config,
144 discovery_sender,
145 } = self;
146 let store = store.unwrap();
147 let config = config.unwrap_or_default();
148 let metrics = metrics.unwrap_or_else(Metrics::disabled);
149
150 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
151 let (checkpoint_event_sender, _receiver) =
152 broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
153 let weak_sender = sender.downgrade();
154 let handle = Handle {
155 sender,
156 checkpoint_event_sender: checkpoint_event_sender.clone(),
157 metrics: metrics.clone(),
158 };
159 let peer_heights = PeerHeights {
160 peers: HashMap::new(),
161 unprocessed_checkpoints: HashMap::new(),
162 sequence_number_to_digest: HashMap::new(),
163 scores: HashMap::new(),
164 wait_interval_when_no_peer_to_sync_content: config
165 .wait_interval_when_no_peer_to_sync_content(),
166 peer_scoring_window: config.peer_scoring_window(),
167 peer_failure_rate: config.peer_failure_rate(),
168 checkpoint_content_timeout_min: config.checkpoint_content_timeout_min(),
169 checkpoint_content_timeout_max: config.checkpoint_content_timeout_max(),
170 exploration_probability: config.exploration_probability(),
171 }
172 .pipe(RwLock::new)
173 .pipe(Arc::new);
174
175 let server = Server {
176 store: store.clone(),
177 peer_heights: peer_heights.clone(),
178 sender: weak_sender,
179 max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
180 };
181
182 (
183 UnstartedStateSync {
184 config,
185 handle,
186 mailbox,
187 store,
188 download_limit_layer: None,
189 peer_heights,
190 checkpoint_event_sender,
191 metrics,
192 archive_config,
193 discovery_sender,
194 },
195 server,
196 )
197 }
198}
199
200pub struct UnstartedStateSync<S> {
201 pub(super) config: StateSyncConfig,
202 pub(super) handle: Handle,
203 pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
204 pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
205 pub(super) store: S,
206 pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
207 pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
208 pub(super) metrics: Metrics,
209 pub(super) archive_config: Option<ArchiveReaderConfig>,
210 pub(super) discovery_sender: Option<discovery::Sender>,
211}
212
213impl<S> UnstartedStateSync<S>
214where
215 S: WriteStore + Clone + Send + Sync + 'static,
216{
217 pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
218 let Self {
219 config,
220 handle,
221 mailbox,
222 download_limit_layer,
223 store,
224 peer_heights,
225 checkpoint_event_sender,
226 metrics,
227 archive_config,
228 discovery_sender,
229 } = self;
230
231 (
232 StateSyncEventLoop {
233 config,
234 mailbox,
235 weak_sender: handle.sender.downgrade(),
236 tasks: JoinSet::new(),
237 sync_checkpoint_summaries_task: None,
238 sync_checkpoint_contents_task: None,
239 download_limit_layer,
240 store,
241 peer_heights,
242 checkpoint_event_sender,
243 network,
244 metrics,
245 sync_checkpoint_from_archive_task: None,
246 archive_config,
247 discovery_sender,
248 },
249 handle,
250 )
251 }
252
253 pub fn start(self, network: anemo::Network) -> Handle {
254 let (event_loop, handle) = self.build(network);
255 tokio::spawn(event_loop.start());
256
257 handle
258 }
259}