sui_core/authority/
backpressure.rs1use std::sync::Arc;
5
6use mysten_metrics::monitored_scope;
7use sui_types::messages_checkpoint::CheckpointSequenceNumber;
8use tokio::sync::watch;
9use tracing::{debug, info};
10
11use crate::checkpoints::CheckpointStore;
12
13#[derive(Debug, Default, Copy, Clone)]
14struct Watermarks {
15 executed: CheckpointSequenceNumber,
16 certified: CheckpointSequenceNumber,
17}
18
19impl Watermarks {
20 fn should_suppress_backpressure(&self) -> bool {
24 self.certified <= self.executed
25 }
26}
27
28pub struct BackpressureManager {
29 watermarks_sender: watch::Sender<Watermarks>,
37
38 backpressure_sender: watch::Sender<bool>,
40}
41
42pub struct BackpressureSubscriber {
43 mgr: Arc<BackpressureManager>,
44}
45
46impl BackpressureManager {
47 pub fn new_for_tests() -> Arc<Self> {
48 Self::new_from_watermarks(Default::default())
49 }
50
51 fn new_from_watermarks(watermarks: Watermarks) -> Arc<Self> {
52 let (watermarks_sender, _) = watch::channel(watermarks);
53 let (backpressure_sender, _) = watch::channel(false);
54 Arc::new(Self {
55 watermarks_sender,
56 backpressure_sender,
57 })
58 }
59
60 pub fn new_from_checkpoint_store(store: &CheckpointStore) -> Arc<Self> {
61 let executed = store
62 .get_highest_executed_checkpoint_seq_number()
63 .expect("read cannot fail")
64 .unwrap_or_default();
65 let certified = store
66 .get_highest_synced_checkpoint_seq_number()
67 .expect("read cannot fail")
68 .unwrap_or_default();
69 info!(
70 ?executed,
71 ?certified,
72 "initializing backpressure manager from checkpoint store"
73 );
74 Self::new_from_watermarks(Watermarks {
75 executed,
76 certified,
77 })
78 }
79
80 pub fn update_highest_certified_checkpoint(&self, seq: CheckpointSequenceNumber) {
81 self.watermarks_sender.send_if_modified(|watermarks| {
82 if seq > watermarks.certified {
83 watermarks.certified = seq;
84 debug!(?watermarks, "updating highest certified checkpoint");
85 true
86 } else {
87 false
88 }
89 });
90 }
91
92 pub fn update_highest_executed_checkpoint(&self, seq: CheckpointSequenceNumber) {
93 self.watermarks_sender.send_if_modified(|watermarks| {
94 if seq > watermarks.executed {
95 debug_assert_eq!(seq, watermarks.executed + 1);
96 watermarks.executed = seq;
97 debug!(?watermarks, "updating highest executed checkpoint");
98 true
99 } else {
100 false
101 }
102 });
103 }
104
105 pub fn set_backpressure(&self, backpressure: bool) -> bool {
107 self.backpressure_sender.send_if_modified(|bp| {
108 if *bp != backpressure {
109 debug!(?backpressure, "setting backpressure");
110 *bp = backpressure;
111 true
112 } else {
113 false
114 }
115 })
116 }
117
118 pub fn subscribe(self: &Arc<Self>) -> BackpressureSubscriber {
119 BackpressureSubscriber { mgr: self.clone() }
120 }
121}
122
123impl BackpressureSubscriber {
124 pub fn is_backpressure_active(&self) -> bool {
125 *self.mgr.backpressure_sender.borrow()
126 }
127
128 pub async fn await_no_backpressure(&self) {
131 let mut watermarks_rx = self.mgr.watermarks_sender.subscribe();
132 if watermarks_rx
133 .borrow_and_update()
134 .should_suppress_backpressure()
135 {
136 return;
137 }
138
139 let mut backpressure_rx = self.mgr.backpressure_sender.subscribe();
140 if !*backpressure_rx.borrow_and_update() {
141 return;
142 }
143
144 info!("waiting for backpressure to be lifted");
145 let _scope = monitored_scope("await_backpressure");
146
147 loop {
148 tokio::select! {
149 _ = backpressure_rx.changed() => {
150 let backpressure = *backpressure_rx.borrow_and_update();
151 debug!(?backpressure, "backpressure updated");
152 if !backpressure {
153 info!("backpressure lifted");
154 return;
155 }
156 }
157 _ = watermarks_rx.changed() => {
158 let watermarks = watermarks_rx.borrow_and_update();
159 debug!(?watermarks, "watermarks updated");
160 if watermarks.should_suppress_backpressure() {
161 info!("backpressure suppressed");
162 return;
163 }
164 }
165 }
166 }
167 }
168}
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use futures::FutureExt;
173 use parking_lot::Mutex;
174 use std::sync::Arc;
175 use std::time::Duration;
176
177 #[tokio::test]
178 async fn test_no_backpressure() {
179 let manager = Arc::new(BackpressureManager::new_for_tests());
180
181 manager.update_highest_certified_checkpoint(1);
182 manager.set_backpressure(false);
183
184 let subscriber = manager.subscribe();
185
186 subscriber.await_no_backpressure().now_or_never().unwrap();
187 }
188
189 #[tokio::test]
190 async fn test_backpressure_suppressed() {
191 let manager = Arc::new(BackpressureManager::new_for_tests());
192
193 manager.set_backpressure(true);
195
196 let subscriber = manager.subscribe();
197
198 subscriber.await_no_backpressure().now_or_never().unwrap();
200 }
201
202 async fn await_with_timeout<R>(f: impl std::future::Future<Output = R>) {
203 tokio::time::timeout(Duration::from_secs(1), f)
204 .await
205 .unwrap();
206 }
207
208 #[derive(Clone)]
209 struct Log {
210 log: Arc<Mutex<Vec<String>>>,
211 manager: Arc<BackpressureManager>,
212 }
213
214 impl Log {
215 fn new(manager: Arc<BackpressureManager>) -> Self {
216 Self {
217 log: Arc::new(Mutex::new(Vec::new())),
218 manager,
219 }
220 }
221
222 fn set_backpressure(&self, backpressure: bool) {
223 self.log
224 .lock()
225 .push(format!("set backpressure {}", backpressure));
226 self.manager.set_backpressure(backpressure);
227 }
228
229 fn update_executed(&self, executed: u64) {
230 self.log
231 .lock()
232 .push(format!("update executed {}", executed));
233 self.manager.update_highest_executed_checkpoint(executed);
234 }
235
236 fn push<S: Into<String>>(&self, msg: S) {
237 self.log.lock().push(msg.into());
238 }
239
240 fn get(&self) -> Vec<String> {
241 self.log.lock().clone()
242 }
243 }
244
245 #[tokio::test(flavor = "current_thread", start_paused = true)]
246 async fn test_clear_backpressure() {
247 let manager = BackpressureManager::new_for_tests();
248
249 manager.update_highest_certified_checkpoint(1);
251 manager.set_backpressure(true);
252
253 let log = Log::new(manager.clone());
254
255 let waiter = tokio::spawn({
256 let subscriber = manager.subscribe();
257 let log = log.clone();
258 log.push("await");
259 async move {
260 subscriber.await_no_backpressure().await;
261 log.push("await_finished");
262 }
263 });
264
265 log.set_backpressure(false);
267
268 await_with_timeout(waiter).await;
269
270 assert_eq!(
271 log.get(),
272 vec![
273 "await".to_string(),
274 "set backpressure false".to_string(),
275 "await_finished".to_string(),
276 ]
277 );
278 }
279
280 #[tokio::test(flavor = "current_thread", start_paused = true)]
281 async fn test_backpressure_becomes_suppressed() {
282 let manager = BackpressureManager::new_for_tests();
283
284 manager.update_highest_certified_checkpoint(1);
286 manager.set_backpressure(true);
287
288 let log = Log::new(manager.clone());
289
290 let waiter = tokio::spawn({
291 let subscriber = manager.subscribe();
292 let log = log.clone();
293 log.push("await");
294 async move {
295 subscriber.await_no_backpressure().await;
296 log.push("await_finished");
297 }
298 });
299
300 log.update_executed(1);
303
304 await_with_timeout(waiter).await;
305
306 assert_eq!(
307 log.get(),
308 vec![
309 "await".to_string(),
310 "update executed 1".to_string(),
311 "await_finished".to_string(),
312 ]
313 );
314 }
315}