consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use futures::StreamExt;
9use mysten_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15    block::BlockAPI as _,
16    context::Context,
17    dag_state::DagState,
18    error::ConsensusError,
19    network::{NetworkClient, NetworkService},
20};
21
22/// Subscriber manages the block stream subscriptions to other peers, taking care of retrying
23/// when subscription streams break. Blocks returned from the peer are sent to the authority
24/// service for processing.
25/// Currently subscription management for individual peer is not exposed, but it could become
26/// useful in future.
27pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
28    context: Arc<Context>,
29    network_client: Arc<C>,
30    authority_service: Arc<S>,
31    dag_state: Arc<RwLock<DagState>>,
32    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
33}
34
35impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
36    pub(crate) fn new(
37        context: Arc<Context>,
38        network_client: Arc<C>,
39        authority_service: Arc<S>,
40        dag_state: Arc<RwLock<DagState>>,
41    ) -> Self {
42        let subscriptions = (0..context.committee.size())
43            .map(|_| None)
44            .collect::<Vec<_>>();
45        Self {
46            context,
47            network_client,
48            authority_service,
49            dag_state,
50            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
51        }
52    }
53
54    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
55        if peer == self.context.own_index {
56            error!("Attempt to subscribe to own validator {peer} is ignored!");
57            return;
58        }
59        let context = self.context.clone();
60        let network_client = self.network_client.clone();
61        let authority_service = self.authority_service.clone();
62        let (mut last_received, gc_round) = {
63            let dag_state = self.dag_state.read();
64            (
65                dag_state.get_last_block_for_authority(peer).round(),
66                dag_state.gc_round(),
67            )
68        };
69
70        // If the latest block we have accepted by an authority is older than the current gc round,
71        // then do not attempt to fetch any blocks from that point as they will simply be skipped. Instead
72        // do attempt to fetch from the gc round.
73        if last_received < gc_round {
74            info!(
75                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
76            );
77            last_received = gc_round;
78        }
79
80        let mut subscriptions = self.subscriptions.lock();
81        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
82        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
83            context,
84            network_client,
85            authority_service,
86            peer,
87            last_received,
88        )));
89    }
90
91    pub(crate) fn stop(&self) {
92        let mut subscriptions = self.subscriptions.lock();
93        for (peer, _) in self.context.committee.authorities() {
94            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
95        }
96    }
97
98    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
99        let peer_hostname = &self.context.committee.authority(peer).hostname;
100        if let Some(subscription) = subscription.take() {
101            subscription.abort();
102        }
103        // There is a race between shutting down the subscription task and clearing the metric here.
104        // TODO: fix the race when unsubscribe_locked() gets called outside of stop().
105        self.context
106            .metrics
107            .node_metrics
108            .subscribed_to
109            .with_label_values(&[peer_hostname])
110            .set(0);
111    }
112
113    async fn subscription_loop(
114        context: Arc<Context>,
115        network_client: Arc<C>,
116        authority_service: Arc<S>,
117        peer: AuthorityIndex,
118        last_received: Round,
119    ) {
120        const IMMEDIATE_RETRIES: i64 = 3;
121        const MIN_TIMEOUT: Duration = Duration::from_millis(500);
122        // When not immediately retrying, limit retry delay between 100ms and 10s.
123        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
124            Duration::from_millis(100),
125            Duration::from_secs(10),
126        );
127
128        let peer_hostname = &context.committee.authority(peer).hostname;
129        let mut retries: i64 = 0;
130        'subscription: loop {
131            context
132                .metrics
133                .node_metrics
134                .subscribed_to
135                .with_label_values(&[peer_hostname])
136                .set(0);
137
138            let mut delay = Duration::ZERO;
139            if retries > IMMEDIATE_RETRIES {
140                delay = backoff.next().unwrap();
141                debug!(
142                    "Delaying retry {} of peer {} subscription, in {} seconds",
143                    retries,
144                    peer_hostname,
145                    delay.as_secs_f32(),
146                );
147                sleep(delay).await;
148            } else if retries > 0 {
149                // Retry immediately, but still yield to avoid monopolizing the thread.
150                tokio::task::yield_now().await;
151            }
152            retries += 1;
153
154            // Use longer timeout when retry delay is long, to adapt to slow network.
155            let request_timeout = MIN_TIMEOUT.max(delay);
156            let mut blocks = match network_client
157                .subscribe_blocks(peer, last_received, request_timeout)
158                .await
159            {
160                Ok(blocks) => {
161                    debug!(
162                        "Subscribed to peer {} {} after {} attempts",
163                        peer, peer_hostname, retries
164                    );
165                    context
166                        .metrics
167                        .node_metrics
168                        .subscriber_connection_attempts
169                        .with_label_values(&[peer_hostname, "success"])
170                        .inc();
171                    blocks
172                }
173                Err(e) => {
174                    debug!(
175                        "Failed to subscribe to blocks from peer {} {}: {}",
176                        peer, peer_hostname, e
177                    );
178                    context
179                        .metrics
180                        .node_metrics
181                        .subscriber_connection_attempts
182                        .with_label_values(&[peer_hostname, "failure"])
183                        .inc();
184                    continue 'subscription;
185                }
186            };
187
188            // Now can consider the subscription successful
189            context
190                .metrics
191                .node_metrics
192                .subscribed_to
193                .with_label_values(&[peer_hostname])
194                .set(1);
195
196            'stream: loop {
197                match blocks.next().await {
198                    Some(block) => {
199                        context
200                            .metrics
201                            .node_metrics
202                            .subscribed_blocks
203                            .with_label_values(&[peer_hostname])
204                            .inc();
205                        let result = authority_service
206                            .handle_send_block(peer, block.clone())
207                            .await;
208                        if let Err(e) = result {
209                            match e {
210                                ConsensusError::BlockRejected { block_ref, reason } => {
211                                    debug!(
212                                        "Failed to process block from peer {} {} for block {:?}: {}",
213                                        peer, peer_hostname, block_ref, reason
214                                    );
215                                }
216                                _ => {
217                                    info!(
218                                        "Invalid block received from peer {} {}: {}",
219                                        peer, peer_hostname, e
220                                    );
221                                }
222                            }
223                        }
224                        // Reset retries when a block is received.
225                        retries = 0;
226                    }
227                    None => {
228                        debug!(
229                            "Subscription to blocks from peer {} {} ended",
230                            peer, peer_hostname
231                        );
232                        retries += 1;
233                        break 'stream;
234                    }
235                }
236            }
237        }
238    }
239}
240
241#[cfg(test)]
242mod test {
243    use async_trait::async_trait;
244    use bytes::Bytes;
245    use consensus_types::block::BlockRef;
246    use futures::stream;
247
248    use super::*;
249    use crate::{
250        VerifiedBlock,
251        commit::CommitRange,
252        error::ConsensusResult,
253        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
254        storage::mem_store::MemStore,
255    };
256
257    struct SubscriberTestClient {}
258
259    impl SubscriberTestClient {
260        fn new() -> Self {
261            Self {}
262        }
263    }
264
265    #[async_trait]
266    impl NetworkClient for SubscriberTestClient {
267        async fn send_block(
268            &self,
269            _peer: AuthorityIndex,
270            _block: &VerifiedBlock,
271            _timeout: Duration,
272        ) -> ConsensusResult<()> {
273            unimplemented!("Unimplemented")
274        }
275
276        async fn subscribe_blocks(
277            &self,
278            _peer: AuthorityIndex,
279            _last_received: Round,
280            _timeout: Duration,
281        ) -> ConsensusResult<BlockStream> {
282            let block_stream = stream::unfold((), |_| async {
283                sleep(Duration::from_millis(1)).await;
284                let block = ExtendedSerializedBlock {
285                    block: Bytes::from(vec![1u8; 8]),
286                    excluded_ancestors: vec![],
287                };
288                Some((block, ()))
289            })
290            .take(10);
291            Ok(Box::pin(block_stream))
292        }
293
294        async fn fetch_blocks(
295            &self,
296            _peer: AuthorityIndex,
297            _block_refs: Vec<BlockRef>,
298            _highest_accepted_rounds: Vec<Round>,
299            _breadth_first: bool,
300            _timeout: Duration,
301        ) -> ConsensusResult<Vec<Bytes>> {
302            unimplemented!("Unimplemented")
303        }
304
305        async fn fetch_commits(
306            &self,
307            _peer: AuthorityIndex,
308            _commit_range: CommitRange,
309            _timeout: Duration,
310        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
311            unimplemented!("Unimplemented")
312        }
313
314        async fn fetch_latest_blocks(
315            &self,
316            _peer: AuthorityIndex,
317            _authorities: Vec<AuthorityIndex>,
318            _timeout: Duration,
319        ) -> ConsensusResult<Vec<Bytes>> {
320            unimplemented!("Unimplemented")
321        }
322
323        async fn get_latest_rounds(
324            &self,
325            _peer: AuthorityIndex,
326            _timeout: Duration,
327        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
328            unimplemented!("Unimplemented")
329        }
330    }
331
332    #[tokio::test(flavor = "current_thread", start_paused = true)]
333    async fn subscriber_retries() {
334        let (context, _keys) = Context::new_for_test(4);
335        let context = Arc::new(context);
336        let authority_service = Arc::new(Mutex::new(TestService::new()));
337        let network_client = Arc::new(SubscriberTestClient::new());
338        let store = Arc::new(MemStore::new());
339        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
340        let subscriber = Subscriber::new(
341            context.clone(),
342            network_client,
343            authority_service.clone(),
344            dag_state,
345        );
346
347        let peer = context.committee.to_authority_index(2).unwrap();
348        subscriber.subscribe(peer);
349
350        // Wait for enough blocks received.
351        for _ in 0..10 {
352            tokio::time::sleep(Duration::from_secs(1)).await;
353            let service = authority_service.lock();
354            if service.handle_send_block.len() >= 100 {
355                break;
356            }
357        }
358
359        // Even if the stream ends after 10 blocks, the subscriber should retry and get enough
360        // blocks eventually.
361        let service = authority_service.lock();
362        assert!(service.handle_send_block.len() >= 100);
363        for (p, block) in service.handle_send_block.iter() {
364            assert_eq!(*p, peer);
365            assert_eq!(
366                *block,
367                ExtendedSerializedBlock {
368                    block: Bytes::from(vec![1u8; 8]),
369                    excluded_ancestors: vec![]
370                }
371            );
372        }
373    }
374}