1use std::{sync::Arc, time::Duration};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use futures::StreamExt;
9use mysten_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15 block::BlockAPI as _,
16 context::Context,
17 dag_state::DagState,
18 error::ConsensusError,
19 network::{NetworkClient, NetworkService},
20};
21
22pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
28 context: Arc<Context>,
29 network_client: Arc<C>,
30 authority_service: Arc<S>,
31 dag_state: Arc<RwLock<DagState>>,
32 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
33}
34
35impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
36 pub(crate) fn new(
37 context: Arc<Context>,
38 network_client: Arc<C>,
39 authority_service: Arc<S>,
40 dag_state: Arc<RwLock<DagState>>,
41 ) -> Self {
42 let subscriptions = (0..context.committee.size())
43 .map(|_| None)
44 .collect::<Vec<_>>();
45 Self {
46 context,
47 network_client,
48 authority_service,
49 dag_state,
50 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
51 }
52 }
53
54 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
55 if peer == self.context.own_index {
56 error!("Attempt to subscribe to own validator {peer} is ignored!");
57 return;
58 }
59 let context = self.context.clone();
60 let network_client = self.network_client.clone();
61 let authority_service = self.authority_service.clone();
62 let (mut last_received, gc_round) = {
63 let dag_state = self.dag_state.read();
64 (
65 dag_state.get_last_block_for_authority(peer).round(),
66 dag_state.gc_round(),
67 )
68 };
69
70 if last_received < gc_round {
74 info!(
75 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
76 );
77 last_received = gc_round;
78 }
79
80 let mut subscriptions = self.subscriptions.lock();
81 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
82 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
83 context,
84 network_client,
85 authority_service,
86 peer,
87 last_received,
88 )));
89 }
90
91 pub(crate) fn stop(&self) {
92 let mut subscriptions = self.subscriptions.lock();
93 for (peer, _) in self.context.committee.authorities() {
94 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
95 }
96 }
97
98 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
99 let peer_hostname = &self.context.committee.authority(peer).hostname;
100 if let Some(subscription) = subscription.take() {
101 subscription.abort();
102 }
103 self.context
106 .metrics
107 .node_metrics
108 .subscribed_to
109 .with_label_values(&[peer_hostname])
110 .set(0);
111 }
112
113 async fn subscription_loop(
114 context: Arc<Context>,
115 network_client: Arc<C>,
116 authority_service: Arc<S>,
117 peer: AuthorityIndex,
118 last_received: Round,
119 ) {
120 const IMMEDIATE_RETRIES: i64 = 3;
121 const MIN_TIMEOUT: Duration = Duration::from_millis(500);
122 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
124 Duration::from_millis(100),
125 Duration::from_secs(10),
126 );
127
128 let peer_hostname = &context.committee.authority(peer).hostname;
129 let mut retries: i64 = 0;
130 'subscription: loop {
131 context
132 .metrics
133 .node_metrics
134 .subscribed_to
135 .with_label_values(&[peer_hostname])
136 .set(0);
137
138 let mut delay = Duration::ZERO;
139 if retries > IMMEDIATE_RETRIES {
140 delay = backoff.next().unwrap();
141 debug!(
142 "Delaying retry {} of peer {} subscription, in {} seconds",
143 retries,
144 peer_hostname,
145 delay.as_secs_f32(),
146 );
147 sleep(delay).await;
148 } else if retries > 0 {
149 tokio::task::yield_now().await;
151 }
152 retries += 1;
153
154 let request_timeout = MIN_TIMEOUT.max(delay);
156 let mut blocks = match network_client
157 .subscribe_blocks(peer, last_received, request_timeout)
158 .await
159 {
160 Ok(blocks) => {
161 debug!(
162 "Subscribed to peer {} {} after {} attempts",
163 peer, peer_hostname, retries
164 );
165 context
166 .metrics
167 .node_metrics
168 .subscriber_connection_attempts
169 .with_label_values(&[peer_hostname, "success"])
170 .inc();
171 blocks
172 }
173 Err(e) => {
174 debug!(
175 "Failed to subscribe to blocks from peer {} {}: {}",
176 peer, peer_hostname, e
177 );
178 context
179 .metrics
180 .node_metrics
181 .subscriber_connection_attempts
182 .with_label_values(&[peer_hostname, "failure"])
183 .inc();
184 continue 'subscription;
185 }
186 };
187
188 context
190 .metrics
191 .node_metrics
192 .subscribed_to
193 .with_label_values(&[peer_hostname])
194 .set(1);
195
196 'stream: loop {
197 match blocks.next().await {
198 Some(block) => {
199 context
200 .metrics
201 .node_metrics
202 .subscribed_blocks
203 .with_label_values(&[peer_hostname])
204 .inc();
205 let result = authority_service
206 .handle_send_block(peer, block.clone())
207 .await;
208 if let Err(e) = result {
209 match e {
210 ConsensusError::BlockRejected { block_ref, reason } => {
211 debug!(
212 "Failed to process block from peer {} {} for block {:?}: {}",
213 peer, peer_hostname, block_ref, reason
214 );
215 }
216 _ => {
217 info!(
218 "Invalid block received from peer {} {}: {}",
219 peer, peer_hostname, e
220 );
221 }
222 }
223 }
224 retries = 0;
226 }
227 None => {
228 debug!(
229 "Subscription to blocks from peer {} {} ended",
230 peer, peer_hostname
231 );
232 retries += 1;
233 break 'stream;
234 }
235 }
236 }
237 }
238 }
239}
240
241#[cfg(test)]
242mod test {
243 use async_trait::async_trait;
244 use bytes::Bytes;
245 use consensus_types::block::BlockRef;
246 use futures::stream;
247
248 use super::*;
249 use crate::{
250 VerifiedBlock,
251 commit::CommitRange,
252 error::ConsensusResult,
253 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
254 storage::mem_store::MemStore,
255 };
256
257 struct SubscriberTestClient {}
258
259 impl SubscriberTestClient {
260 fn new() -> Self {
261 Self {}
262 }
263 }
264
265 #[async_trait]
266 impl NetworkClient for SubscriberTestClient {
267 async fn send_block(
268 &self,
269 _peer: AuthorityIndex,
270 _block: &VerifiedBlock,
271 _timeout: Duration,
272 ) -> ConsensusResult<()> {
273 unimplemented!("Unimplemented")
274 }
275
276 async fn subscribe_blocks(
277 &self,
278 _peer: AuthorityIndex,
279 _last_received: Round,
280 _timeout: Duration,
281 ) -> ConsensusResult<BlockStream> {
282 let block_stream = stream::unfold((), |_| async {
283 sleep(Duration::from_millis(1)).await;
284 let block = ExtendedSerializedBlock {
285 block: Bytes::from(vec![1u8; 8]),
286 excluded_ancestors: vec![],
287 };
288 Some((block, ()))
289 })
290 .take(10);
291 Ok(Box::pin(block_stream))
292 }
293
294 async fn fetch_blocks(
295 &self,
296 _peer: AuthorityIndex,
297 _block_refs: Vec<BlockRef>,
298 _highest_accepted_rounds: Vec<Round>,
299 _breadth_first: bool,
300 _timeout: Duration,
301 ) -> ConsensusResult<Vec<Bytes>> {
302 unimplemented!("Unimplemented")
303 }
304
305 async fn fetch_commits(
306 &self,
307 _peer: AuthorityIndex,
308 _commit_range: CommitRange,
309 _timeout: Duration,
310 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
311 unimplemented!("Unimplemented")
312 }
313
314 async fn fetch_latest_blocks(
315 &self,
316 _peer: AuthorityIndex,
317 _authorities: Vec<AuthorityIndex>,
318 _timeout: Duration,
319 ) -> ConsensusResult<Vec<Bytes>> {
320 unimplemented!("Unimplemented")
321 }
322
323 async fn get_latest_rounds(
324 &self,
325 _peer: AuthorityIndex,
326 _timeout: Duration,
327 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
328 unimplemented!("Unimplemented")
329 }
330 }
331
332 #[tokio::test(flavor = "current_thread", start_paused = true)]
333 async fn subscriber_retries() {
334 let (context, _keys) = Context::new_for_test(4);
335 let context = Arc::new(context);
336 let authority_service = Arc::new(Mutex::new(TestService::new()));
337 let network_client = Arc::new(SubscriberTestClient::new());
338 let store = Arc::new(MemStore::new());
339 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
340 let subscriber = Subscriber::new(
341 context.clone(),
342 network_client,
343 authority_service.clone(),
344 dag_state,
345 );
346
347 let peer = context.committee.to_authority_index(2).unwrap();
348 subscriber.subscribe(peer);
349
350 for _ in 0..10 {
352 tokio::time::sleep(Duration::from_secs(1)).await;
353 let service = authority_service.lock();
354 if service.handle_send_block.len() >= 100 {
355 break;
356 }
357 }
358
359 let service = authority_service.lock();
362 assert!(service.handle_send_block.len() >= 100);
363 for (p, block) in service.handle_send_block.iter() {
364 assert_eq!(*p, peer);
365 assert_eq!(
366 *block,
367 ExtendedSerializedBlock {
368 block: Bytes::from(vec![1u8; 8]),
369 excluded_ancestors: vec![]
370 }
371 );
372 }
373 }
374}