1use std::{sync::Arc, time::Duration};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use futures::StreamExt;
9use mysten_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15 block::BlockAPI as _,
16 context::Context,
17 dag_state::DagState,
18 error::ConsensusError,
19 network::{ValidatorNetworkClient, ValidatorNetworkService},
20};
21
22pub(crate) struct Subscriber<C: ValidatorNetworkClient, S: ValidatorNetworkService> {
28 context: Arc<Context>,
29 network_client: Arc<C>,
30 authority_service: Arc<S>,
31 dag_state: Arc<RwLock<DagState>>,
32 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
33}
34
35impl<C: ValidatorNetworkClient, S: ValidatorNetworkService> Subscriber<C, S> {
36 pub(crate) fn new(
37 context: Arc<Context>,
38 network_client: Arc<C>,
39 authority_service: Arc<S>,
40 dag_state: Arc<RwLock<DagState>>,
41 ) -> Self {
42 let subscriptions = (0..context.committee.size())
43 .map(|_| None)
44 .collect::<Vec<_>>();
45 Self {
46 context,
47 network_client,
48 authority_service,
49 dag_state,
50 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
51 }
52 }
53
54 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
55 if peer == self.context.own_index {
56 error!("Attempt to subscribe to own validator {peer} is ignored!");
57 return;
58 }
59 let context = self.context.clone();
60 let network_client = self.network_client.clone();
61 let authority_service = self.authority_service.clone();
62 let (mut last_received, gc_round) = {
63 let dag_state = self.dag_state.read();
64 (
65 dag_state.get_last_block_for_authority(peer).round(),
66 dag_state.gc_round(),
67 )
68 };
69
70 if last_received < gc_round {
74 info!(
75 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
76 );
77 last_received = gc_round;
78 }
79
80 let mut subscriptions = self.subscriptions.lock();
81 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
82 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
83 context,
84 network_client,
85 authority_service,
86 peer,
87 last_received,
88 )));
89 }
90
91 pub(crate) fn stop(&self) {
92 let mut subscriptions = self.subscriptions.lock();
93 for (peer, _) in self.context.committee.authorities() {
94 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
95 }
96 }
97
98 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
99 let peer_hostname = &self.context.committee.authority(peer).hostname;
100 if let Some(subscription) = subscription.take() {
101 subscription.abort();
102 }
103 self.context
106 .metrics
107 .node_metrics
108 .subscribed_to
109 .with_label_values(&[peer_hostname])
110 .set(0);
111 }
112
113 async fn subscription_loop(
114 context: Arc<Context>,
115 network_client: Arc<C>,
116 authority_service: Arc<S>,
117 peer: AuthorityIndex,
118 last_received: Round,
119 ) {
120 const IMMEDIATE_RETRIES: i64 = 3;
121 const MIN_TIMEOUT: Duration = Duration::from_millis(500);
122 let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
124 Duration::from_millis(100),
125 Duration::from_secs(10),
126 );
127
128 let peer_hostname = &context.committee.authority(peer).hostname;
129 let mut retries: i64 = 0;
130 'subscription: loop {
131 context
132 .metrics
133 .node_metrics
134 .subscribed_to
135 .with_label_values(&[peer_hostname])
136 .set(0);
137
138 let mut delay = Duration::ZERO;
139 if retries > IMMEDIATE_RETRIES {
140 delay = backoff.next().unwrap();
141 debug!(
142 "Delaying retry {} of peer {} subscription, in {} seconds",
143 retries,
144 peer_hostname,
145 delay.as_secs_f32(),
146 );
147 sleep(delay).await;
148 } else if retries > 0 {
149 tokio::task::yield_now().await;
151 }
152 retries += 1;
153
154 let request_timeout = MIN_TIMEOUT.max(delay);
156 let mut blocks = match network_client
157 .subscribe_blocks(peer, last_received, request_timeout)
158 .await
159 {
160 Ok(blocks) => {
161 debug!(
162 "Subscribed to peer {} {} after {} attempts",
163 peer, peer_hostname, retries
164 );
165 context
166 .metrics
167 .node_metrics
168 .subscriber_connection_attempts
169 .with_label_values(&[peer_hostname.as_str(), "success"])
170 .inc();
171 blocks
172 }
173 Err(e) => {
174 debug!(
175 "Failed to subscribe to blocks from peer {} {}: {}",
176 peer, peer_hostname, e
177 );
178 context
179 .metrics
180 .node_metrics
181 .subscriber_connection_attempts
182 .with_label_values(&[peer_hostname.as_str(), "failure"])
183 .inc();
184 continue 'subscription;
185 }
186 };
187
188 context
190 .metrics
191 .node_metrics
192 .subscribed_to
193 .with_label_values(&[peer_hostname])
194 .set(1);
195
196 'stream: loop {
197 match blocks.next().await {
198 Some(block) => {
199 context
200 .metrics
201 .node_metrics
202 .subscribed_blocks
203 .with_label_values(&[peer_hostname])
204 .inc();
205 let result = authority_service.handle_send_block(peer, block).await;
206 if let Err(e) = result {
207 match e {
208 ConsensusError::BlockRejected { block_ref, reason } => {
209 debug!(
210 "Failed to process block from peer {} {} for block {:?}: {}",
211 peer, peer_hostname, block_ref, reason
212 );
213 }
214 _ => {
215 info!(
216 "Invalid block received from peer {} {}: {}",
217 peer, peer_hostname, e
218 );
219 }
220 }
221 }
222 retries = 0;
224 }
225 None => {
226 debug!(
227 "Subscription to blocks from peer {} {} ended",
228 peer, peer_hostname
229 );
230 retries += 1;
231 break 'stream;
232 }
233 }
234 }
235 }
236 }
237}
238
239#[cfg(test)]
240mod test {
241 use async_trait::async_trait;
242 use bytes::Bytes;
243 use consensus_types::block::BlockRef;
244 use futures::stream;
245
246 use super::*;
247 use crate::{
248 VerifiedBlock,
249 commit::CommitRange,
250 error::ConsensusResult,
251 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
252 storage::mem_store::MemStore,
253 };
254
255 struct SubscriberTestClient {}
256
257 impl SubscriberTestClient {
258 fn new() -> Self {
259 Self {}
260 }
261 }
262
263 #[async_trait]
264 impl ValidatorNetworkClient for SubscriberTestClient {
265 async fn send_block(
266 &self,
267 _peer: AuthorityIndex,
268 _block: &VerifiedBlock,
269 _timeout: Duration,
270 ) -> ConsensusResult<()> {
271 unimplemented!("Unimplemented")
272 }
273
274 async fn subscribe_blocks(
275 &self,
276 _peer: AuthorityIndex,
277 _last_received: Round,
278 _timeout: Duration,
279 ) -> ConsensusResult<BlockStream> {
280 let block_stream = stream::unfold((), |_| async {
281 sleep(Duration::from_millis(1)).await;
282 let block = ExtendedSerializedBlock {
283 block: Bytes::from(vec![1u8; 8]),
284 excluded_ancestors: vec![],
285 };
286 Some((block, ()))
287 })
288 .take(10);
289 Ok(Box::pin(block_stream))
290 }
291
292 async fn fetch_blocks(
293 &self,
294 _peer: AuthorityIndex,
295 _block_refs: Vec<BlockRef>,
296 _highest_accepted_rounds: Vec<Round>,
297 _breadth_first: bool,
298 _timeout: Duration,
299 ) -> ConsensusResult<Vec<Bytes>> {
300 unimplemented!("Unimplemented")
301 }
302
303 async fn fetch_commits(
304 &self,
305 _peer: AuthorityIndex,
306 _commit_range: CommitRange,
307 _timeout: Duration,
308 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
309 unimplemented!("Unimplemented")
310 }
311
312 async fn fetch_latest_blocks(
313 &self,
314 _peer: AuthorityIndex,
315 _authorities: Vec<AuthorityIndex>,
316 _timeout: Duration,
317 ) -> ConsensusResult<Vec<Bytes>> {
318 unimplemented!("Unimplemented")
319 }
320
321 async fn get_latest_rounds(
322 &self,
323 _peer: AuthorityIndex,
324 _timeout: Duration,
325 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
326 unimplemented!("Unimplemented")
327 }
328 }
329
330 #[tokio::test(flavor = "current_thread", start_paused = true)]
331 async fn subscriber_retries() {
332 let (context, _keys) = Context::new_for_test(4);
333 let context = Arc::new(context);
334 let authority_service = Arc::new(Mutex::new(TestService::new()));
335 let network_client = Arc::new(SubscriberTestClient::new());
336 let store = Arc::new(MemStore::new());
337 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
338 let subscriber = Subscriber::new(
339 context.clone(),
340 network_client,
341 authority_service.clone(),
342 dag_state,
343 );
344
345 let peer = context.committee.to_authority_index(2).unwrap();
346 subscriber.subscribe(peer);
347
348 for _ in 0..10 {
350 tokio::time::sleep(Duration::from_secs(1)).await;
351 let service = authority_service.lock();
352 if service.handle_send_block.len() >= 100 {
353 break;
354 }
355 }
356
357 let service = authority_service.lock();
360 assert!(service.handle_send_block.len() >= 100);
361 for (p, block) in service.handle_send_block.iter() {
362 assert_eq!(*p, peer);
363 assert_eq!(
364 *block,
365 ExtendedSerializedBlock {
366 block: Bytes::from(vec![1u8; 8]),
367 excluded_ancestors: vec![]
368 }
369 );
370 }
371 }
372}