1use super::{
5 Handle, PeerHeights, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6 metrics::Metrics,
7 server::{CheckpointContentsDownloadLimitLayer, Server, SizeLimitLayer},
8};
9use anemo::codegen::InboundRequestLayer;
10use anemo_tower::{inflight_limit, rate_limit};
11use std::{
12 collections::HashMap,
13 sync::{Arc, RwLock},
14};
15use sui_config::node::ArchiveReaderConfig;
16use sui_config::p2p::StateSyncConfig;
17use sui_types::messages_checkpoint::VerifiedCheckpoint;
18use sui_types::storage::WriteStore;
19use tap::Pipe;
20use tokio::{
21 sync::{broadcast, mpsc},
22 task::JoinSet,
23};
24
25pub struct Builder<S> {
26 store: Option<S>,
27 config: Option<StateSyncConfig>,
28 metrics: Option<Metrics>,
29 archive_config: Option<ArchiveReaderConfig>,
30}
31
32impl Builder<()> {
33 #[allow(clippy::new_without_default)]
34 pub fn new() -> Self {
35 Self {
36 store: None,
37 config: None,
38 metrics: None,
39 archive_config: None,
40 }
41 }
42}
43
44impl<S> Builder<S> {
45 pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
46 Builder {
47 store: Some(store),
48 config: self.config,
49 metrics: self.metrics,
50 archive_config: self.archive_config,
51 }
52 }
53
54 pub fn config(mut self, config: StateSyncConfig) -> Self {
55 self.config = Some(config);
56 self
57 }
58
59 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
60 self.metrics = Some(Metrics::enabled(registry));
61 self
62 }
63
64 pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
65 self.archive_config = archive_config;
66 self
67 }
68}
69
70impl<S> Builder<S>
71where
72 S: WriteStore + Clone + Send + Sync + 'static,
73{
74 pub fn build(self) -> (UnstartedStateSync<S>, anemo::Router) {
75 let state_sync_config = self.config.clone().unwrap_or_default();
76 let (mut builder, server) = self.build_internal();
77 let mut state_sync_server = StateSyncServer::new(server);
78
79 if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
81 state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
82 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
83 governor::Quota::per_second(limit),
84 rate_limit::WaitMode::Block,
85 )),
86 );
87 }
88 if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
89 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
90 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
91 governor::Quota::per_second(limit),
92 rate_limit::WaitMode::Block,
93 )),
94 );
95 }
96 if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
97 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
98 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
99 governor::Quota::per_second(limit),
100 rate_limit::WaitMode::Block,
101 )),
102 );
103 }
104 if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
105 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
106 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
107 limit,
108 inflight_limit::WaitMode::ReturnError,
109 )),
110 );
111 }
112 if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
113 let layer = CheckpointContentsDownloadLimitLayer::new(limit);
114 builder.download_limit_layer = Some(layer.clone());
115 state_sync_server = state_sync_server
116 .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
117 }
118
119 let router = anemo::Router::new()
120 .route_layer(SizeLimitLayer::new(
122 state_sync_config.max_checkpoint_summary_size(),
123 ))
124 .add_rpc_service(state_sync_server);
125
126 (builder, router)
127 }
128
129 pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
130 let Builder {
131 store,
132 config,
133 metrics,
134 archive_config,
135 } = self;
136 let store = store.unwrap();
137 let config = config.unwrap_or_default();
138 let metrics = metrics.unwrap_or_else(Metrics::disabled);
139
140 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
141 let (checkpoint_event_sender, _receiver) =
142 broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
143 let weak_sender = sender.downgrade();
144 let handle = Handle {
145 sender,
146 checkpoint_event_sender: checkpoint_event_sender.clone(),
147 };
148 let peer_heights = PeerHeights {
149 peers: HashMap::new(),
150 unprocessed_checkpoints: HashMap::new(),
151 sequence_number_to_digest: HashMap::new(),
152 wait_interval_when_no_peer_to_sync_content: config
153 .wait_interval_when_no_peer_to_sync_content(),
154 }
155 .pipe(RwLock::new)
156 .pipe(Arc::new);
157
158 let server = Server {
159 store: store.clone(),
160 peer_heights: peer_heights.clone(),
161 sender: weak_sender,
162 max_checkpoint_lookahead: config.max_checkpoint_lookahead(),
163 };
164
165 (
166 UnstartedStateSync {
167 config,
168 handle,
169 mailbox,
170 store,
171 download_limit_layer: None,
172 peer_heights,
173 checkpoint_event_sender,
174 metrics,
175 archive_config,
176 },
177 server,
178 )
179 }
180}
181
182pub struct UnstartedStateSync<S> {
183 pub(super) config: StateSyncConfig,
184 pub(super) handle: Handle,
185 pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
186 pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
187 pub(super) store: S,
188 pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
189 pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
190 pub(super) metrics: Metrics,
191 pub(super) archive_config: Option<ArchiveReaderConfig>,
192}
193
194impl<S> UnstartedStateSync<S>
195where
196 S: WriteStore + Clone + Send + Sync + 'static,
197{
198 pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
199 let Self {
200 config,
201 handle,
202 mailbox,
203 download_limit_layer,
204 store,
205 peer_heights,
206 checkpoint_event_sender,
207 metrics,
208 archive_config,
209 } = self;
210
211 (
212 StateSyncEventLoop {
213 config,
214 mailbox,
215 weak_sender: handle.sender.downgrade(),
216 tasks: JoinSet::new(),
217 sync_checkpoint_summaries_task: None,
218 sync_checkpoint_contents_task: None,
219 download_limit_layer,
220 store,
221 peer_heights,
222 checkpoint_event_sender,
223 network,
224 metrics,
225 sync_checkpoint_from_archive_task: None,
226 archive_config,
227 },
228 handle,
229 )
230 }
231
232 pub fn start(self, network: anemo::Network) -> Handle {
233 let (event_loop, handle) = self.build(network);
234 tokio::spawn(event_loop.start());
235
236 handle
237 }
238}