sui_core/authority/
backpressure.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use mysten_metrics::monitored_scope;
7use sui_types::messages_checkpoint::CheckpointSequenceNumber;
8use tokio::sync::watch;
9use tracing::{debug, info};
10
11use crate::checkpoints::CheckpointStore;
12
13#[derive(Debug, Default, Copy, Clone)]
14struct Watermarks {
15    executed: CheckpointSequenceNumber,
16    certified: CheckpointSequenceNumber,
17}
18
19impl Watermarks {
20    // we can only permit backpressure if the certified checkpoint is ahead of the executed
21    // checkpoint. Otherwise, backpressure might prevent construction of the next checkpoint,
22    // because it could stop consensus commits from being processed.
23    fn should_suppress_backpressure(&self) -> bool {
24        self.certified <= self.executed
25    }
26}
27
28pub struct BackpressureManager {
29    // Holds the executed and certified checkpoint watermarks.
30    // Because we never execute an uncertified checkpoint, the executed watermark is always
31    // less than or equal to the certified watermark.
32    //
33    // If the watermarks are equal, we must not apply backpressure to consensus handler,
34    // because we could be waiting on the next consensus commit in order to build and eventually
35    // certify the next checkpoint.
36    watermarks_sender: watch::Sender<Watermarks>,
37
38    // used by the WritebackCache to notify us when it has too many pending transactions in memory.
39    backpressure_sender: watch::Sender<bool>,
40}
41
42pub struct BackpressureSubscriber {
43    mgr: Arc<BackpressureManager>,
44}
45
46impl BackpressureManager {
47    pub fn new_for_tests() -> Arc<Self> {
48        Self::new_from_watermarks(Default::default())
49    }
50
51    fn new_from_watermarks(watermarks: Watermarks) -> Arc<Self> {
52        let (watermarks_sender, _) = watch::channel(watermarks);
53        let (backpressure_sender, _) = watch::channel(false);
54        Arc::new(Self {
55            watermarks_sender,
56            backpressure_sender,
57        })
58    }
59
60    pub fn new_from_checkpoint_store(store: &CheckpointStore) -> Arc<Self> {
61        let executed = store
62            .get_highest_executed_checkpoint_seq_number()
63            .expect("read cannot fail")
64            .unwrap_or_default();
65        let certified = store
66            .get_highest_synced_checkpoint_seq_number()
67            .expect("read cannot fail")
68            .unwrap_or_default();
69        info!(
70            ?executed,
71            ?certified,
72            "initializing backpressure manager from checkpoint store"
73        );
74        Self::new_from_watermarks(Watermarks {
75            executed,
76            certified,
77        })
78    }
79
80    pub fn update_highest_certified_checkpoint(&self, seq: CheckpointSequenceNumber) {
81        self.watermarks_sender.send_if_modified(|watermarks| {
82            if seq > watermarks.certified {
83                watermarks.certified = seq;
84                debug!(?watermarks, "updating highest certified checkpoint");
85                true
86            } else {
87                false
88            }
89        });
90    }
91
92    pub fn update_highest_executed_checkpoint(&self, seq: CheckpointSequenceNumber) {
93        self.watermarks_sender.send_if_modified(|watermarks| {
94            if seq > watermarks.executed {
95                debug_assert_eq!(seq, watermarks.executed + 1);
96                watermarks.executed = seq;
97                debug!(?watermarks, "updating highest executed checkpoint");
98                true
99            } else {
100                false
101            }
102        });
103    }
104
105    // Returns true if the backpressure state was changed.
106    pub fn set_backpressure(&self, backpressure: bool) -> bool {
107        self.backpressure_sender.send_if_modified(|bp| {
108            if *bp != backpressure {
109                debug!(?backpressure, "setting backpressure");
110                *bp = backpressure;
111                true
112            } else {
113                false
114            }
115        })
116    }
117
118    pub fn subscribe(self: &Arc<Self>) -> BackpressureSubscriber {
119        BackpressureSubscriber { mgr: self.clone() }
120    }
121}
122
123impl BackpressureSubscriber {
124    pub fn is_backpressure_active(&self) -> bool {
125        *self.mgr.backpressure_sender.borrow()
126    }
127
128    /// If there is no backpressure returns immediately.
129    /// Otherwise, wait until backpressure is lifted or suppressed.
130    pub async fn await_no_backpressure(&self) {
131        let mut watermarks_rx = self.mgr.watermarks_sender.subscribe();
132        if watermarks_rx
133            .borrow_and_update()
134            .should_suppress_backpressure()
135        {
136            return;
137        }
138
139        let mut backpressure_rx = self.mgr.backpressure_sender.subscribe();
140        if !*backpressure_rx.borrow_and_update() {
141            return;
142        }
143
144        info!("waiting for backpressure to be lifted");
145        let _scope = monitored_scope("await_backpressure");
146
147        loop {
148            tokio::select! {
149                _ = backpressure_rx.changed() => {
150                    let backpressure = *backpressure_rx.borrow_and_update();
151                    debug!(?backpressure, "backpressure updated");
152                    if !backpressure {
153                        info!("backpressure lifted");
154                        return;
155                    }
156                }
157                _ = watermarks_rx.changed() => {
158                    let watermarks = watermarks_rx.borrow_and_update();
159                    debug!(?watermarks, "watermarks updated");
160                    if watermarks.should_suppress_backpressure() {
161                        info!("backpressure suppressed");
162                        return;
163                    }
164                }
165            }
166        }
167    }
168}
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use futures::FutureExt;
173    use parking_lot::Mutex;
174    use std::sync::Arc;
175    use std::time::Duration;
176
177    #[tokio::test]
178    async fn test_no_backpressure() {
179        let manager = Arc::new(BackpressureManager::new_for_tests());
180
181        manager.update_highest_certified_checkpoint(1);
182        manager.set_backpressure(false);
183
184        let subscriber = manager.subscribe();
185
186        subscriber.await_no_backpressure().now_or_never().unwrap();
187    }
188
189    #[tokio::test]
190    async fn test_backpressure_suppressed() {
191        let manager = Arc::new(BackpressureManager::new_for_tests());
192
193        // watermarks start at 0, 0
194        manager.set_backpressure(true);
195
196        let subscriber = manager.subscribe();
197
198        // backpressure should be suppressed because of watermarks.
199        subscriber.await_no_backpressure().now_or_never().unwrap();
200    }
201
202    async fn await_with_timeout<R>(f: impl std::future::Future<Output = R>) {
203        tokio::time::timeout(Duration::from_secs(1), f)
204            .await
205            .unwrap();
206    }
207
208    #[derive(Clone)]
209    struct Log {
210        log: Arc<Mutex<Vec<String>>>,
211        manager: Arc<BackpressureManager>,
212    }
213
214    impl Log {
215        fn new(manager: Arc<BackpressureManager>) -> Self {
216            Self {
217                log: Arc::new(Mutex::new(Vec::new())),
218                manager,
219            }
220        }
221
222        fn set_backpressure(&self, backpressure: bool) {
223            self.log
224                .lock()
225                .push(format!("set backpressure {}", backpressure));
226            self.manager.set_backpressure(backpressure);
227        }
228
229        fn update_executed(&self, executed: u64) {
230            self.log
231                .lock()
232                .push(format!("update executed {}", executed));
233            self.manager.update_highest_executed_checkpoint(executed);
234        }
235
236        fn push<S: Into<String>>(&self, msg: S) {
237            self.log.lock().push(msg.into());
238        }
239
240        fn get(&self) -> Vec<String> {
241            self.log.lock().clone()
242        }
243    }
244
245    #[tokio::test(flavor = "current_thread", start_paused = true)]
246    async fn test_clear_backpressure() {
247        let manager = BackpressureManager::new_for_tests();
248
249        // backpressure is in effect, and not suppressed by watermarks.
250        manager.update_highest_certified_checkpoint(1);
251        manager.set_backpressure(true);
252
253        let log = Log::new(manager.clone());
254
255        let waiter = tokio::spawn({
256            let subscriber = manager.subscribe();
257            let log = log.clone();
258            log.push("await");
259            async move {
260                subscriber.await_no_backpressure().await;
261                log.push("await_finished");
262            }
263        });
264
265        // clear the backpressure
266        log.set_backpressure(false);
267
268        await_with_timeout(waiter).await;
269
270        assert_eq!(
271            log.get(),
272            vec![
273                "await".to_string(),
274                "set backpressure false".to_string(),
275                "await_finished".to_string(),
276            ]
277        );
278    }
279
280    #[tokio::test(flavor = "current_thread", start_paused = true)]
281    async fn test_backpressure_becomes_suppressed() {
282        let manager = BackpressureManager::new_for_tests();
283
284        // backpressure is in effect, and not suppressed by watermarks.
285        manager.update_highest_certified_checkpoint(1);
286        manager.set_backpressure(true);
287
288        let log = Log::new(manager.clone());
289
290        let waiter = tokio::spawn({
291            let subscriber = manager.subscribe();
292            let log = log.clone();
293            log.push("await");
294            async move {
295                subscriber.await_no_backpressure().await;
296                log.push("await_finished");
297            }
298        });
299
300        // once executed checkpoint catches up to certified checkpoint,
301        // backpressure should be suppressed.
302        log.update_executed(1);
303
304        await_with_timeout(waiter).await;
305
306        assert_eq!(
307            log.get(),
308            vec![
309                "await".to_string(),
310                "update executed 1".to_string(),
311                "await_finished".to_string(),
312            ]
313        );
314    }
315}