1use super::{
5 Handle, PeerHeights, StateSync, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
6 metrics::Metrics,
7 server::{CheckpointContentsDownloadLimitLayer, Server},
8};
9use anemo::codegen::InboundRequestLayer;
10use anemo_tower::{inflight_limit, rate_limit};
11use std::{
12 collections::HashMap,
13 sync::{Arc, RwLock},
14};
15use sui_config::node::ArchiveReaderConfig;
16use sui_config::p2p::StateSyncConfig;
17use sui_types::messages_checkpoint::VerifiedCheckpoint;
18use sui_types::storage::WriteStore;
19use tap::Pipe;
20use tokio::{
21 sync::{broadcast, mpsc},
22 task::JoinSet,
23};
24
25pub struct Builder<S> {
26 store: Option<S>,
27 config: Option<StateSyncConfig>,
28 metrics: Option<Metrics>,
29 archive_config: Option<ArchiveReaderConfig>,
30}
31
32impl Builder<()> {
33 #[allow(clippy::new_without_default)]
34 pub fn new() -> Self {
35 Self {
36 store: None,
37 config: None,
38 metrics: None,
39 archive_config: None,
40 }
41 }
42}
43
44impl<S> Builder<S> {
45 pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
46 Builder {
47 store: Some(store),
48 config: self.config,
49 metrics: self.metrics,
50 archive_config: self.archive_config,
51 }
52 }
53
54 pub fn config(mut self, config: StateSyncConfig) -> Self {
55 self.config = Some(config);
56 self
57 }
58
59 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
60 self.metrics = Some(Metrics::enabled(registry));
61 self
62 }
63
64 pub fn archive_config(mut self, archive_config: Option<ArchiveReaderConfig>) -> Self {
65 self.archive_config = archive_config;
66 self
67 }
68}
69
70impl<S> Builder<S>
71where
72 S: WriteStore + Clone + Send + Sync + 'static,
73{
74 pub fn build(self) -> (UnstartedStateSync<S>, StateSyncServer<impl StateSync>) {
75 let state_sync_config = self.config.clone().unwrap_or_default();
76 let (mut builder, server) = self.build_internal();
77 let mut state_sync_server = StateSyncServer::new(server);
78
79 if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
81 state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
82 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
83 governor::Quota::per_second(limit),
84 rate_limit::WaitMode::Block,
85 )),
86 );
87 }
88 if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
89 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
90 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
91 governor::Quota::per_second(limit),
92 rate_limit::WaitMode::Block,
93 )),
94 );
95 }
96 if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
97 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
98 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
99 governor::Quota::per_second(limit),
100 rate_limit::WaitMode::Block,
101 )),
102 );
103 }
104 if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
105 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
106 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
107 limit,
108 inflight_limit::WaitMode::ReturnError,
109 )),
110 );
111 }
112 if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
113 let layer = CheckpointContentsDownloadLimitLayer::new(limit);
114 builder.download_limit_layer = Some(layer.clone());
115 state_sync_server = state_sync_server
116 .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
117 }
118
119 (builder, state_sync_server)
120 }
121
122 pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
123 let Builder {
124 store,
125 config,
126 metrics,
127 archive_config,
128 } = self;
129 let store = store.unwrap();
130 let config = config.unwrap_or_default();
131 let metrics = metrics.unwrap_or_else(Metrics::disabled);
132
133 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
134 let (checkpoint_event_sender, _receiver) =
135 broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
136 let weak_sender = sender.downgrade();
137 let handle = Handle {
138 sender,
139 checkpoint_event_sender: checkpoint_event_sender.clone(),
140 };
141 let peer_heights = PeerHeights {
142 peers: HashMap::new(),
143 unprocessed_checkpoints: HashMap::new(),
144 sequence_number_to_digest: HashMap::new(),
145 wait_interval_when_no_peer_to_sync_content: config
146 .wait_interval_when_no_peer_to_sync_content(),
147 }
148 .pipe(RwLock::new)
149 .pipe(Arc::new);
150
151 let server = Server {
152 store: store.clone(),
153 peer_heights: peer_heights.clone(),
154 sender: weak_sender,
155 };
156
157 (
158 UnstartedStateSync {
159 config,
160 handle,
161 mailbox,
162 store,
163 download_limit_layer: None,
164 peer_heights,
165 checkpoint_event_sender,
166 metrics,
167 archive_config,
168 },
169 server,
170 )
171 }
172}
173
174pub struct UnstartedStateSync<S> {
175 pub(super) config: StateSyncConfig,
176 pub(super) handle: Handle,
177 pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
178 pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
179 pub(super) store: S,
180 pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
181 pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
182 pub(super) metrics: Metrics,
183 pub(super) archive_config: Option<ArchiveReaderConfig>,
184}
185
186impl<S> UnstartedStateSync<S>
187where
188 S: WriteStore + Clone + Send + Sync + 'static,
189{
190 pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
191 let Self {
192 config,
193 handle,
194 mailbox,
195 download_limit_layer,
196 store,
197 peer_heights,
198 checkpoint_event_sender,
199 metrics,
200 archive_config,
201 } = self;
202
203 (
204 StateSyncEventLoop {
205 config,
206 mailbox,
207 weak_sender: handle.sender.downgrade(),
208 tasks: JoinSet::new(),
209 sync_checkpoint_summaries_task: None,
210 sync_checkpoint_contents_task: None,
211 download_limit_layer,
212 store,
213 peer_heights,
214 checkpoint_event_sender,
215 network,
216 metrics,
217 sync_checkpoint_from_archive_task: None,
218 archive_config,
219 },
220 handle,
221 )
222 }
223
224 pub fn start(self, network: anemo::Network) -> Handle {
225 let (event_loop, handle) = self.build(network);
226 tokio::spawn(event_loop.start());
227
228 handle
229 }
230}