1use super::{
5 Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6 metrics::Metrics,
7 server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use crate::discovery;
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use std::{
13 collections::HashMap,
14 sync::{Arc, RwLock},
15};
16use sui_config::node::ArchiveReaderConfig;
17use sui_config::p2p::StateSyncConfig;
18use sui_types::messages_checkpoint::VerifiedCheckpoint;
19use sui_types::storage::WriteStore;
20use tap::Pipe;
21use tokio::{
22 sync::{broadcast, mpsc},
23 task::JoinSet,
24};
25
26pub struct Builder<S> {
27 store: Option<S>,
28 config: Option<StateSyncConfig>,
29 metrics: Option<Metrics>,
30 archive_config: Option<ArchiveReaderConfig>,
31 discovery_sender: Option<discovery::Sender>,
32}
33
34impl Builder<()> {
35 #[allow(clippy::new_without_default)]
36 pub fn new() -> Self {
37 Self {
38 store: None,
39 config: None,
40 metrics: None,
41 archive_config: None,
42 discovery_sender: None,
43 }
44 }
45}
46
47impl<S> Builder<S> {
48 pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
49 Builder {
50 store: Some(store),
51 config: self.config,
52 metrics: self.metrics,
53 archive_config: self.archive_config,
54 discovery_sender: self.discovery_sender,
55 }
56 }
57
58 pub fn discovery_sender(mut self, sender: discovery::Sender) -> Self {
59 self.discovery_sender = Some(sender);
60 self
61 }
62
63 pub fn config(mut self, config: StateSyncConfig) -> Self {
64 self.config = Some(config);
65 self
66 }
67
68 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
69 self.metrics = Some(Metrics::enabled(registry));
70 self
71 }
72
73 pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
74 self.archive_config = archive_config;
75 self
76 }
77}
78
79impl<S> Builder<S>
80where
81 S: WriteStore + Clone + Send + Sync + 'static,
82{
83 pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router<anemo::ServicesSealed>) {
84 let state_sync_config = self.config.clone().unwrap_or_default();
85 let (mut builder, server) = self.build_internal();
86 let mut state_sync_server = StateSyncServer::new(server);
87
88 if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
90 state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
91 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
92 governor::Quota::per_second(limit),
93 rate_limit::WaitMode::Block,
94 )),
95 );
96 }
97 if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
98 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
99 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
100 governor::Quota::per_second(limit),
101 rate_limit::WaitMode::Block,
102 )),
103 );
104 }
105 if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
106 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
107 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
108 governor::Quota::per_second(limit),
109 rate_limit::WaitMode::Block,
110 )),
111 );
112 }
113 if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
114 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
115 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
116 limit,
117 inflight_limit::WaitMode::ReturnError,
118 )),
119 );
120 }
121 if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
122 let layer = CheckpointContentsDownloadLimitLayer::new(limit);
123 builder.download_limit_layer = Some(layer.clone());
124 state_sync_server = state_sync_server
125 .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
126 }
127
128 let router = anemo::Router::new()
129 .add_rpc_service(state_sync_server)
130 .route_layer(SizeLimitLayer::new(
134 state_sync_config.max_checkpoint_summary_size(),
135 ));
136
137 (builder, router)
138 }
139
140 pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
141 let Builder {
142 store,
143 config,
144 metrics,
145 archive_config,
146 discovery_sender,
147 } = self;
148 let store = store.unwrap();
149 let config = config.unwrap_or_default();
150 let metrics = metrics.unwrap_or_else(Metrics::disabled);
151
152 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
153 let (checkpoint_event_sender, _receiver) =
154 broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
155 let weak_sender = sender.downgrade();
156 let handle = Handle {
157 sender,
158 checkpoint_event_sender: checkpoint_event_sender.clone(),
159 metrics: metrics.clone(),
160 };
161 let peer_heights = PeerHeights {
162 peers: HashMap::new(),
163 unprocessed_checkpoints: HashMap::new(),
164 sequence_number_to_digest: HashMap::new(),
165 scores: HashMap::new(),
166 wait_interval_when_no_peer_to_sync_content: config
167 .wait_interval_when_no_peer_to_sync_content(),
168 peer_scoring_window: config.peer_scoring_window(),
169 peer_failure_rate: config.peer_failure_rate(),
170 checkpoint_content_timeout_min: config.checkpoint_content_timeout_min(),
171 checkpoint_content_timeout_max: config.checkpoint_content_timeout_max(),
172 exploration_probability: config.exploration_probability(),
173 }
174 .pipe(RwLock::new)
175 .pipe(Arc::new);
176
177 let server = Server {
178 store: store.clone(),
179 peer_heights: peer_heights.clone(),
180 sender: weak_sender,
181 max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
182 };
183
184 (
185 UnstartedStateSync {
186 config,
187 handle,
188 mailbox,
189 store,
190 download_limit_layer: None,
191 peer_heights,
192 checkpoint_event_sender,
193 metrics,
194 archive_config,
195 discovery_sender,
196 },
197 server,
198 )
199 }
200}
201
202pub struct UnstartedStateSync<S> {
203 pub(super) config: StateSyncConfig,
204 pub(super) handle: Handle,
205 pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
206 pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
207 pub(super) store: S,
208 pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
209 pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
210 pub(super) metrics: Metrics,
211 pub(super) archive_config: Option<ArchiveReaderConfig>,
212 pub(super) discovery_sender: Option<discovery::Sender>,
213}
214
215impl<S> UnstartedStateSync<S>
216where
217 S: WriteStore + Clone + Send + Sync + 'static,
218{
219 pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
220 let Self {
221 config,
222 handle,
223 mailbox,
224 download_limit_layer,
225 store,
226 peer_heights,
227 checkpoint_event_sender,
228 metrics,
229 archive_config,
230 discovery_sender,
231 } = self;
232
233 (
234 StateSyncEventLoop {
235 config,
236 mailbox,
237 weak_sender: handle.sender.downgrade(),
238 tasks: JoinSet::new(),
239 sync_checkpoint_summaries_task: None,
240 sync_checkpoint_contents_task: None,
241 download_limit_layer,
242 store,
243 peer_heights,
244 checkpoint_event_sender,
245 network,
246 metrics,
247 sync_checkpoint_from_archive_task: None,
248 archive_config,
249 discovery_sender,
250 },
251 handle,
252 )
253 }
254
255 pub fn start(self, network: anemo::Network) -> Handle {
256 let (event_loop, handle) = self.build(network);
257 tokio::spawn(event_loop.start());
258
259 handle
260 }
261}