consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{sync::Arc, time::Duration};
5
6use consensus_config::AuthorityIndex;
7use consensus_types::block::Round;
8use futures::StreamExt;
9use mysten_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15    block::BlockAPI as _,
16    context::Context,
17    dag_state::DagState,
18    error::ConsensusError,
19    network::{ValidatorNetworkClient, ValidatorNetworkService},
20};
21
22/// Subscriber manages the block stream subscriptions to other peers, taking care of retrying
23/// when subscription streams break. Blocks returned from the peer are sent to the authority
24/// service for processing.
25/// Currently subscription management for individual peer is not exposed, but it could become
26/// useful in future.
27pub(crate) struct Subscriber<C: ValidatorNetworkClient, S: ValidatorNetworkService> {
28    context: Arc<Context>,
29    network_client: Arc<C>,
30    authority_service: Arc<S>,
31    dag_state: Arc<RwLock<DagState>>,
32    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
33}
34
35impl<C: ValidatorNetworkClient, S: ValidatorNetworkService> Subscriber<C, S> {
36    pub(crate) fn new(
37        context: Arc<Context>,
38        network_client: Arc<C>,
39        authority_service: Arc<S>,
40        dag_state: Arc<RwLock<DagState>>,
41    ) -> Self {
42        let subscriptions = (0..context.committee.size())
43            .map(|_| None)
44            .collect::<Vec<_>>();
45        Self {
46            context,
47            network_client,
48            authority_service,
49            dag_state,
50            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
51        }
52    }
53
54    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
55        if peer == self.context.own_index {
56            error!("Attempt to subscribe to own validator {peer} is ignored!");
57            return;
58        }
59        let context = self.context.clone();
60        let network_client = self.network_client.clone();
61        let authority_service = self.authority_service.clone();
62        let (mut last_received, gc_round) = {
63            let dag_state = self.dag_state.read();
64            (
65                dag_state.get_last_block_for_authority(peer).round(),
66                dag_state.gc_round(),
67            )
68        };
69
70        // If the latest block we have accepted by an authority is older than the current gc round,
71        // then do not attempt to fetch any blocks from that point as they will simply be skipped. Instead
72        // do attempt to fetch from the gc round.
73        if last_received < gc_round {
74            info!(
75                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
76            );
77            last_received = gc_round;
78        }
79
80        let mut subscriptions = self.subscriptions.lock();
81        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
82        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
83            context,
84            network_client,
85            authority_service,
86            peer,
87            last_received,
88        )));
89    }
90
91    pub(crate) fn stop(&self) {
92        let mut subscriptions = self.subscriptions.lock();
93        for (peer, _) in self.context.committee.authorities() {
94            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
95        }
96    }
97
98    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
99        let peer_hostname = &self.context.committee.authority(peer).hostname;
100        if let Some(subscription) = subscription.take() {
101            subscription.abort();
102        }
103        // There is a race between shutting down the subscription task and clearing the metric here.
104        // TODO: fix the race when unsubscribe_locked() gets called outside of stop().
105        self.context
106            .metrics
107            .node_metrics
108            .subscribed_to
109            .with_label_values(&[peer_hostname])
110            .set(0);
111    }
112
113    async fn subscription_loop(
114        context: Arc<Context>,
115        network_client: Arc<C>,
116        authority_service: Arc<S>,
117        peer: AuthorityIndex,
118        last_received: Round,
119    ) {
120        const IMMEDIATE_RETRIES: i64 = 3;
121        const MIN_TIMEOUT: Duration = Duration::from_millis(500);
122        // When not immediately retrying, limit retry delay between 100ms and 10s.
123        let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
124            Duration::from_millis(100),
125            Duration::from_secs(10),
126        );
127
128        let peer_hostname = &context.committee.authority(peer).hostname;
129        let mut retries: i64 = 0;
130        'subscription: loop {
131            context
132                .metrics
133                .node_metrics
134                .subscribed_to
135                .with_label_values(&[peer_hostname])
136                .set(0);
137
138            let mut delay = Duration::ZERO;
139            if retries > IMMEDIATE_RETRIES {
140                delay = backoff.next().unwrap();
141                debug!(
142                    "Delaying retry {} of peer {} subscription, in {} seconds",
143                    retries,
144                    peer_hostname,
145                    delay.as_secs_f32(),
146                );
147                sleep(delay).await;
148            } else if retries > 0 {
149                // Retry immediately, but still yield to avoid monopolizing the thread.
150                tokio::task::yield_now().await;
151            }
152            retries += 1;
153
154            // Use longer timeout when retry delay is long, to adapt to slow network.
155            let request_timeout = MIN_TIMEOUT.max(delay);
156            let mut blocks = match network_client
157                .subscribe_blocks(peer, last_received, request_timeout)
158                .await
159            {
160                Ok(blocks) => {
161                    debug!(
162                        "Subscribed to peer {} {} after {} attempts",
163                        peer, peer_hostname, retries
164                    );
165                    context
166                        .metrics
167                        .node_metrics
168                        .subscriber_connection_attempts
169                        .with_label_values(&[peer_hostname.as_str(), "success"])
170                        .inc();
171                    blocks
172                }
173                Err(e) => {
174                    debug!(
175                        "Failed to subscribe to blocks from peer {} {}: {}",
176                        peer, peer_hostname, e
177                    );
178                    context
179                        .metrics
180                        .node_metrics
181                        .subscriber_connection_attempts
182                        .with_label_values(&[peer_hostname.as_str(), "failure"])
183                        .inc();
184                    continue 'subscription;
185                }
186            };
187
188            // Now can consider the subscription successful
189            context
190                .metrics
191                .node_metrics
192                .subscribed_to
193                .with_label_values(&[peer_hostname])
194                .set(1);
195
196            'stream: loop {
197                match blocks.next().await {
198                    Some(block) => {
199                        context
200                            .metrics
201                            .node_metrics
202                            .subscribed_blocks
203                            .with_label_values(&[peer_hostname])
204                            .inc();
205                        let result = authority_service.handle_send_block(peer, block).await;
206                        if let Err(e) = result {
207                            match e {
208                                ConsensusError::BlockRejected { block_ref, reason } => {
209                                    debug!(
210                                        "Failed to process block from peer {} {} for block {:?}: {}",
211                                        peer, peer_hostname, block_ref, reason
212                                    );
213                                }
214                                _ => {
215                                    info!(
216                                        "Invalid block received from peer {} {}: {}",
217                                        peer, peer_hostname, e
218                                    );
219                                }
220                            }
221                        }
222                        // Reset retries when a block is received.
223                        retries = 0;
224                    }
225                    None => {
226                        debug!(
227                            "Subscription to blocks from peer {} {} ended",
228                            peer, peer_hostname
229                        );
230                        retries += 1;
231                        break 'stream;
232                    }
233                }
234            }
235        }
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use async_trait::async_trait;
242    use bytes::Bytes;
243    use consensus_types::block::BlockRef;
244    use futures::stream;
245
246    use super::*;
247    use crate::{
248        VerifiedBlock,
249        commit::CommitRange,
250        error::ConsensusResult,
251        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
252        storage::mem_store::MemStore,
253    };
254
255    struct SubscriberTestClient {}
256
257    impl SubscriberTestClient {
258        fn new() -> Self {
259            Self {}
260        }
261    }
262
263    #[async_trait]
264    impl ValidatorNetworkClient for SubscriberTestClient {
265        async fn send_block(
266            &self,
267            _peer: AuthorityIndex,
268            _block: &VerifiedBlock,
269            _timeout: Duration,
270        ) -> ConsensusResult<()> {
271            unimplemented!("Unimplemented")
272        }
273
274        async fn subscribe_blocks(
275            &self,
276            _peer: AuthorityIndex,
277            _last_received: Round,
278            _timeout: Duration,
279        ) -> ConsensusResult<BlockStream> {
280            let block_stream = stream::unfold((), |_| async {
281                sleep(Duration::from_millis(1)).await;
282                let block = ExtendedSerializedBlock {
283                    block: Bytes::from(vec![1u8; 8]),
284                    excluded_ancestors: vec![],
285                };
286                Some((block, ()))
287            })
288            .take(10);
289            Ok(Box::pin(block_stream))
290        }
291
292        async fn fetch_blocks(
293            &self,
294            _peer: AuthorityIndex,
295            _block_refs: Vec<BlockRef>,
296            _highest_accepted_rounds: Vec<Round>,
297            _breadth_first: bool,
298            _timeout: Duration,
299        ) -> ConsensusResult<Vec<Bytes>> {
300            unimplemented!("Unimplemented")
301        }
302
303        async fn fetch_commits(
304            &self,
305            _peer: AuthorityIndex,
306            _commit_range: CommitRange,
307            _timeout: Duration,
308        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
309            unimplemented!("Unimplemented")
310        }
311
312        async fn fetch_latest_blocks(
313            &self,
314            _peer: AuthorityIndex,
315            _authorities: Vec<AuthorityIndex>,
316            _timeout: Duration,
317        ) -> ConsensusResult<Vec<Bytes>> {
318            unimplemented!("Unimplemented")
319        }
320
321        async fn get_latest_rounds(
322            &self,
323            _peer: AuthorityIndex,
324            _timeout: Duration,
325        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
326            unimplemented!("Unimplemented")
327        }
328    }
329
330    #[tokio::test(flavor = "current_thread", start_paused = true)]
331    async fn subscriber_retries() {
332        let (context, _keys) = Context::new_for_test(4);
333        let context = Arc::new(context);
334        let authority_service = Arc::new(Mutex::new(TestService::new()));
335        let network_client = Arc::new(SubscriberTestClient::new());
336        let store = Arc::new(MemStore::new());
337        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
338        let subscriber = Subscriber::new(
339            context.clone(),
340            network_client,
341            authority_service.clone(),
342            dag_state,
343        );
344
345        let peer = context.committee.to_authority_index(2).unwrap();
346        subscriber.subscribe(peer);
347
348        // Wait for enough blocks received.
349        for _ in 0..10 {
350            tokio::time::sleep(Duration::from_secs(1)).await;
351            let service = authority_service.lock();
352            if service.handle_send_block.len() >= 100 {
353                break;
354            }
355        }
356
357        // Even if the stream ends after 10 blocks, the subscriber should retry and get enough
358        // blocks eventually.
359        let service = authority_service.lock();
360        assert!(service.handle_send_block.len() >= 100);
361        for (p, block) in service.handle_send_block.iter() {
362            assert_eq!(*p, peer);
363            assert_eq!(
364                *block,
365                ExtendedSerializedBlock {
366                    block: Bytes::from(vec![1u8; 8]),
367                    excluded_ancestors: vec![]
368                }
369            );
370        }
371    }
372}