sui_rpc_api/grpc/deadline.rs
1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::Stream;
5use futures::StreamExt;
6use futures::stream::BoxStream;
7use tokio::time::Instant;
8
9/// Wrap a server-streaming response with a wall-clock deadline.
10///
11/// Guarantee: when the deadline fires, the inner stream is dropped and
12/// its resources (Bigtable permits, in-flight RPCs, render buffers,
13/// blocking scan workers) are released in real time — even if the gRPC
14/// consumer has stopped pulling frames. The `DeadlineExceeded` Status
15/// itself is delivered on the next poll from tonic, which may be later if
16/// the h2 send window is closed.
17///
18/// The naive design — race deadline and `inner.next()` in a single
19/// `select!` inside the wrapper — fails when tonic's task is parked at
20/// its h2-write await: timer wakes hit the task but resume at the wrong
21/// await point, and the wrapper's select never runs. Spawning gives the
22/// drain loop its own task whose only outer await is `timeout_at(...)`,
23/// so deadline wakes always land where they can cancel.
24///
25/// The mpsc(1) channel is just the bridge between two polling roots
26/// (tonic ↔ our spawn). Capacity 1 = tightest backpressure; per-item
27/// wake overhead is negligible against IO/render cost.
28///
29/// Shared by both the fullnode (`sui-rpc-api`) and bigtable (`sui-kv-rpc`)
30/// ledger-history streaming services so they enforce deadlines identically.
31pub fn with_deadline<S, T>(
32 stream: S,
33 deadline: Instant,
34 operation: &'static str,
35) -> BoxStream<'static, Result<T, tonic::Status>>
36where
37 S: Stream<Item = Result<T, tonic::Status>> + Send + 'static,
38 T: Send + 'static,
39{
40 let (tx, mut rx) = tokio::sync::mpsc::channel::<Result<T, tonic::Status>>(1);
41
42 // Spawn the drain loop. `timeout_at` is the outermost await of the
43 // task, so a deadline wake always lands inside it and observes
44 // Ready, regardless of which inner await (next / send) is suspended.
45 let producer = tokio::spawn(async move {
46 let _ = tokio::time::timeout_at(deadline, async move {
47 futures::pin_mut!(stream);
48 while let Some(item) = stream.next().await {
49 // Consumer dropped → channel closed → stop work.
50 if tx.send(item).await.is_err() {
51 return;
52 }
53 }
54 })
55 .await;
56 });
57
58 // Synchronous abort if the wrapper is dropped before its body runs
59 // (or while suspended inside `inner.next()` rather than `send`).
60 struct AbortOnDrop(tokio::task::AbortHandle);
61 impl Drop for AbortOnDrop {
62 fn drop(&mut self) {
63 self.0.abort();
64 }
65 }
66 let abort_guard = AbortOnDrop(producer.abort_handle());
67
68 // Lift the select! result into an enum so the macro's elaboration
69 // can infer the try_stream's error type from the match arms.
70 enum Step<T> {
71 Item(Option<Result<T, tonic::Status>>),
72 Deadline,
73 }
74
75 async_stream::try_stream! {
76 // Move the guard into the generator so dropping the unpolled
77 // stream still drops it (and thus aborts the producer).
78 let _abort_on_drop = abort_guard;
79 // Held until the consumer drains the channel — then awaited to
80 // surface any panic from the producer task as an Internal error.
81 let mut producer = producer;
82 let sleep = tokio::time::sleep_until(deadline);
83 futures::pin_mut!(sleep);
84 loop {
85 // `biased`: past-deadline polls emit DeadlineExceeded
86 // promptly without waiting on a buffered item.
87 let step = tokio::select! {
88 biased;
89 _ = &mut sleep => Step::Deadline,
90 item = rx.recv() => Step::Item(item),
91 };
92 match step {
93 Step::Item(Some(Ok(it))) => yield it,
94 Step::Item(Some(Err(e))) => Err(e)?,
95 Step::Item(None) => {
96 // Producer closed the channel — either natural EOF or
97 // a panic that aborted the task before EOF. Distinguish
98 // by awaiting the JoinHandle: a panic surfaces as an
99 // Internal error so the consumer doesn't see truncated
100 // success. The panic message itself is logged by the
101 // global `telemetry-subscribers` panic hook (the boxed
102 // payload here is an opaque Rust-internal type that
103 // can't be cheaply downcast to a string), so the wire
104 // status carries only a generic marker.
105 //
106 // TODO: once these services add a CatchPanicLayer to
107 // their Tower stack (sister services already do), this
108 // translation can move there and we can just
109 // `resume_unwind` here.
110 match (&mut producer).await {
111 Ok(()) => break,
112 Err(e) if e.is_panic() => {
113 tracing::error!(operation, "producer task panicked");
114 Err(tonic::Status::internal(format!(
115 "{operation} request panicked"
116 )))?;
117 }
118 Err(_) => {
119 // Cancellation — only reachable at runtime shutdown
120 // (our abort guard fires on Drop, when we wouldn't be
121 // polling), so EOF is harmless. break;
122 }
123 }
124 }
125 Step::Deadline => {
126 tracing::warn!(operation, "request deadline exceeded");
127 Err(tonic::Status::deadline_exceeded(format!(
128 "{operation} request deadline exceeded"
129 )))?;
130 }
131 }
132 }
133 }
134 .boxed()
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use futures::stream;
141 use std::sync::Arc;
142 use std::sync::atomic::AtomicBool;
143 use std::sync::atomic::AtomicU64;
144 use std::sync::atomic::Ordering as AtomicOrdering;
145 use std::time::Duration;
146
147 /// `start_paused = true` makes `tokio::time` virtual: sleeps and
148 /// `Instant::now()` advance only when the runtime explicitly waits, so
149 /// the test runs instantly and deterministically.
150 #[tokio::test(start_paused = true)]
151 async fn with_deadline_emits_deadline_exceeded_when_inner_hangs() {
152 let inner: BoxStream<'static, Result<u64, tonic::Status>> = stream::pending().boxed();
153 let deadline = Instant::now() + Duration::from_secs(5);
154 let mut bounded = with_deadline(inner, deadline, "test");
155
156 let item = bounded.next().await;
157 let status = item.expect("got an item").expect_err("got a status error");
158 assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
159 assert!(bounded.next().await.is_none());
160 }
161
162 #[tokio::test(start_paused = true)]
163 async fn with_deadline_passes_items_through_until_deadline() {
164 let inner = stream::iter([Ok::<_, tonic::Status>(1), Ok(2)]).boxed();
165 let deadline = Instant::now() + Duration::from_secs(5);
166 let mut bounded = with_deadline(inner, deadline, "test");
167
168 assert_eq!(bounded.next().await.unwrap().unwrap(), 1);
169 assert_eq!(bounded.next().await.unwrap().unwrap(), 2);
170 assert!(bounded.next().await.is_none());
171 }
172
173 #[tokio::test(start_paused = true)]
174 async fn with_deadline_propagates_inner_error_before_deadline() {
175 let inner = stream::iter([Err::<u64, _>(tonic::Status::unavailable("nope"))]).boxed();
176 let deadline = Instant::now() + Duration::from_secs(5);
177 let mut bounded = with_deadline(inner, deadline, "test");
178
179 let status = bounded.next().await.unwrap().unwrap_err();
180 assert_eq!(status.code(), tonic::Code::Unavailable);
181 }
182
183 /// Counts inner-stream yields to prove the spawned producer is dropped
184 /// at deadline-time even when the consumer never polls. If the deadline
185 /// were only observed on the consumer's next poll, the counter would
186 /// keep growing while virtual time advances. Defends the contract that
187 /// `with_deadline` cancels in-flight work in wall-clock time regardless
188 /// of consumer pace.
189 #[tokio::test(start_paused = true)]
190 async fn with_deadline_drops_inner_when_consumer_is_slow_past_deadline() {
191 let count = Arc::new(AtomicU64::new(0));
192 let inner: BoxStream<'static, Result<u64, tonic::Status>> = {
193 let count = count.clone();
194 stream::unfold((), move |()| {
195 let count = count.clone();
196 async move {
197 count.fetch_add(1, AtomicOrdering::SeqCst);
198 Some((Ok::<u64, tonic::Status>(1), ()))
199 }
200 })
201 .boxed()
202 };
203 let deadline = Instant::now() + Duration::from_secs(5);
204 let mut bounded = with_deadline(inner, deadline, "test");
205
206 // Drain one item; producer is now blocked on a full channel
207 // somewhere past it.
208 assert_eq!(bounded.next().await.unwrap().unwrap(), 1);
209
210 // Stop polling. Advance virtual time past the deadline. The
211 // producer-side `timeout_at` must drop the inner stream here.
212 tokio::time::sleep(Duration::from_secs(10)).await;
213 let snapshot = count.load(AtomicOrdering::SeqCst);
214
215 // Past-deadline: no further inner-stream yields should be observed.
216 tokio::time::sleep(Duration::from_secs(10)).await;
217 assert_eq!(
218 count.load(AtomicOrdering::SeqCst),
219 snapshot,
220 "inner stream kept producing past the deadline",
221 );
222
223 // And the consumer sees `DeadlineExceeded` on its next poll.
224 let status = bounded.next().await.unwrap().unwrap_err();
225 assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
226 }
227
228 /// A panic inside the producer task must surface as `Internal`
229 /// instead of silently closing the channel (which the consumer can't
230 /// distinguish from a clean EOF). Without this translation, a
231 /// truncated response looks like a successful one to the client.
232 #[tokio::test(start_paused = true)]
233 async fn with_deadline_translates_producer_panic_to_internal() {
234 let inner: BoxStream<'static, Result<u64, tonic::Status>> =
235 stream::unfold(0u64, |i| async move {
236 if i == 1 {
237 panic!("boom from inner stream");
238 }
239 Some((Ok::<u64, tonic::Status>(i), i + 1))
240 })
241 .boxed();
242 let deadline = Instant::now() + Duration::from_secs(60);
243 let mut bounded = with_deadline(inner, deadline, "test");
244
245 // First item flows through normally.
246 assert_eq!(bounded.next().await.unwrap().unwrap(), 0);
247 // Next pull observes the producer panic via the JoinHandle.
248 let status = bounded.next().await.unwrap().unwrap_err();
249 assert_eq!(status.code(), tonic::Code::Internal);
250 }
251
252 /// Dropping the wrapper before any item is drained must abort the
253 /// spawned producer and drop the inner stream — otherwise a client
254 /// that hangs up early would leak the in-flight pipeline until its
255 /// own deadline.
256 #[tokio::test(start_paused = true)]
257 async fn with_deadline_aborts_producer_when_consumer_drops() {
258 struct DropBeacon(Arc<AtomicBool>);
259 impl Drop for DropBeacon {
260 fn drop(&mut self) {
261 self.0.store(true, AtomicOrdering::SeqCst);
262 }
263 }
264
265 let dropped = Arc::new(AtomicBool::new(false));
266 let beacon = DropBeacon(dropped.clone());
267 // Stream that never yields but holds the beacon for its lifetime —
268 // beacon's Drop fires iff the producer task drops the inner stream.
269 let inner: BoxStream<'static, Result<u64, tonic::Status>> =
270 stream::unfold(beacon, |state| async move {
271 let _hold = &state;
272 std::future::pending::<()>().await;
273 Some((Ok::<u64, tonic::Status>(1), state))
274 })
275 .boxed();
276 let deadline = Instant::now() + Duration::from_secs(60);
277
278 let bounded = with_deadline(inner, deadline, "test");
279 drop(bounded);
280
281 // Give the runtime cycles to deliver the abort and drop the
282 // producer task's future.
283 for _ in 0..10 {
284 tokio::task::yield_now().await;
285 if dropped.load(AtomicOrdering::SeqCst) {
286 break;
287 }
288 }
289 assert!(
290 dropped.load(AtomicOrdering::SeqCst),
291 "inner stream was not dropped after consumer dropped the wrapper",
292 );
293 }
294}