sui_rpc_api/grpc/
deadline.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::Stream;
5use futures::StreamExt;
6use futures::stream::BoxStream;
7use tokio::time::Instant;
8
9/// Wrap a server-streaming response with a wall-clock deadline.
10///
11/// Guarantee: when the deadline fires, the inner stream is dropped and
12/// its resources (Bigtable permits, in-flight RPCs, render buffers,
13/// blocking scan workers) are released in real time — even if the gRPC
14/// consumer has stopped pulling frames. The `DeadlineExceeded` Status
15/// itself is delivered on the next poll from tonic, which may be later if
16/// the h2 send window is closed.
17///
18/// The naive design — race deadline and `inner.next()` in a single
19/// `select!` inside the wrapper — fails when tonic's task is parked at
20/// its h2-write await: timer wakes hit the task but resume at the wrong
21/// await point, and the wrapper's select never runs. Spawning gives the
22/// drain loop its own task whose only outer await is `timeout_at(...)`,
23/// so deadline wakes always land where they can cancel.
24///
25/// The mpsc(1) channel is just the bridge between two polling roots
26/// (tonic ↔ our spawn). Capacity 1 = tightest backpressure; per-item
27/// wake overhead is negligible against IO/render cost.
28///
29/// Shared by both the fullnode (`sui-rpc-api`) and bigtable (`sui-kv-rpc`)
30/// ledger-history streaming services so they enforce deadlines identically.
31pub fn with_deadline<S, T>(
32    stream: S,
33    deadline: Instant,
34    operation: &'static str,
35) -> BoxStream<'static, Result<T, tonic::Status>>
36where
37    S: Stream<Item = Result<T, tonic::Status>> + Send + 'static,
38    T: Send + 'static,
39{
40    let (tx, mut rx) = tokio::sync::mpsc::channel::<Result<T, tonic::Status>>(1);
41
42    // Spawn the drain loop. `timeout_at` is the outermost await of the
43    // task, so a deadline wake always lands inside it and observes
44    // Ready, regardless of which inner await (next / send) is suspended.
45    let producer = tokio::spawn(async move {
46        let _ = tokio::time::timeout_at(deadline, async move {
47            futures::pin_mut!(stream);
48            while let Some(item) = stream.next().await {
49                // Consumer dropped → channel closed → stop work.
50                if tx.send(item).await.is_err() {
51                    return;
52                }
53            }
54        })
55        .await;
56    });
57
58    // Synchronous abort if the wrapper is dropped before its body runs
59    // (or while suspended inside `inner.next()` rather than `send`).
60    struct AbortOnDrop(tokio::task::AbortHandle);
61    impl Drop for AbortOnDrop {
62        fn drop(&mut self) {
63            self.0.abort();
64        }
65    }
66    let abort_guard = AbortOnDrop(producer.abort_handle());
67
68    // Lift the select! result into an enum so the macro's elaboration
69    // can infer the try_stream's error type from the match arms.
70    enum Step<T> {
71        Item(Option<Result<T, tonic::Status>>),
72        Deadline,
73    }
74
75    async_stream::try_stream! {
76        // Move the guard into the generator so dropping the unpolled
77        // stream still drops it (and thus aborts the producer).
78        let _abort_on_drop = abort_guard;
79        // Held until the consumer drains the channel — then awaited to
80        // surface any panic from the producer task as an Internal error.
81        let mut producer = producer;
82        let sleep = tokio::time::sleep_until(deadline);
83        futures::pin_mut!(sleep);
84        loop {
85            // `biased`: past-deadline polls emit DeadlineExceeded
86            // promptly without waiting on a buffered item.
87            let step = tokio::select! {
88                biased;
89                _ = &mut sleep => Step::Deadline,
90                item = rx.recv() => Step::Item(item),
91            };
92            match step {
93                Step::Item(Some(Ok(it))) => yield it,
94                Step::Item(Some(Err(e))) => Err(e)?,
95                Step::Item(None) => {
96                    // Producer closed the channel — either natural EOF or
97                    // a panic that aborted the task before EOF. Distinguish
98                    // by awaiting the JoinHandle: a panic surfaces as an
99                    // Internal error so the consumer doesn't see truncated
100                    // success. The panic message itself is logged by the
101                    // global `telemetry-subscribers` panic hook (the boxed
102                    // payload here is an opaque Rust-internal type that
103                    // can't be cheaply downcast to a string), so the wire
104                    // status carries only a generic marker.
105                    //
106                    // TODO: once these services add a CatchPanicLayer to
107                    // their Tower stack (sister services already do), this
108                    // translation can move there and we can just
109                    // `resume_unwind` here.
110                    match (&mut producer).await {
111                        Ok(()) => break,
112                        Err(e) if e.is_panic() => {
113                            tracing::error!(operation, "producer task panicked");
114                            Err(tonic::Status::internal(format!(
115                                "{operation} request panicked"
116                            )))?;
117                        }
118                        Err(_) => {
119                            // Cancellation — only reachable at runtime shutdown
120                            // (our abort guard fires on Drop, when we wouldn't be
121                            // polling), so EOF is harmless.                             break;
122                        }
123                    }
124                }
125                Step::Deadline => {
126                    tracing::warn!(operation, "request deadline exceeded");
127                    Err(tonic::Status::deadline_exceeded(format!(
128                        "{operation} request deadline exceeded"
129                    )))?;
130                }
131            }
132        }
133    }
134    .boxed()
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use futures::stream;
141    use std::sync::Arc;
142    use std::sync::atomic::AtomicBool;
143    use std::sync::atomic::AtomicU64;
144    use std::sync::atomic::Ordering as AtomicOrdering;
145    use std::time::Duration;
146
147    /// `start_paused = true` makes `tokio::time` virtual: sleeps and
148    /// `Instant::now()` advance only when the runtime explicitly waits, so
149    /// the test runs instantly and deterministically.
150    #[tokio::test(start_paused = true)]
151    async fn with_deadline_emits_deadline_exceeded_when_inner_hangs() {
152        let inner: BoxStream<'static, Result<u64, tonic::Status>> = stream::pending().boxed();
153        let deadline = Instant::now() + Duration::from_secs(5);
154        let mut bounded = with_deadline(inner, deadline, "test");
155
156        let item = bounded.next().await;
157        let status = item.expect("got an item").expect_err("got a status error");
158        assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
159        assert!(bounded.next().await.is_none());
160    }
161
162    #[tokio::test(start_paused = true)]
163    async fn with_deadline_passes_items_through_until_deadline() {
164        let inner = stream::iter([Ok::<_, tonic::Status>(1), Ok(2)]).boxed();
165        let deadline = Instant::now() + Duration::from_secs(5);
166        let mut bounded = with_deadline(inner, deadline, "test");
167
168        assert_eq!(bounded.next().await.unwrap().unwrap(), 1);
169        assert_eq!(bounded.next().await.unwrap().unwrap(), 2);
170        assert!(bounded.next().await.is_none());
171    }
172
173    #[tokio::test(start_paused = true)]
174    async fn with_deadline_propagates_inner_error_before_deadline() {
175        let inner = stream::iter([Err::<u64, _>(tonic::Status::unavailable("nope"))]).boxed();
176        let deadline = Instant::now() + Duration::from_secs(5);
177        let mut bounded = with_deadline(inner, deadline, "test");
178
179        let status = bounded.next().await.unwrap().unwrap_err();
180        assert_eq!(status.code(), tonic::Code::Unavailable);
181    }
182
183    /// Counts inner-stream yields to prove the spawned producer is dropped
184    /// at deadline-time even when the consumer never polls. If the deadline
185    /// were only observed on the consumer's next poll, the counter would
186    /// keep growing while virtual time advances. Defends the contract that
187    /// `with_deadline` cancels in-flight work in wall-clock time regardless
188    /// of consumer pace.
189    #[tokio::test(start_paused = true)]
190    async fn with_deadline_drops_inner_when_consumer_is_slow_past_deadline() {
191        let count = Arc::new(AtomicU64::new(0));
192        let inner: BoxStream<'static, Result<u64, tonic::Status>> = {
193            let count = count.clone();
194            stream::unfold((), move |()| {
195                let count = count.clone();
196                async move {
197                    count.fetch_add(1, AtomicOrdering::SeqCst);
198                    Some((Ok::<u64, tonic::Status>(1), ()))
199                }
200            })
201            .boxed()
202        };
203        let deadline = Instant::now() + Duration::from_secs(5);
204        let mut bounded = with_deadline(inner, deadline, "test");
205
206        // Drain one item; producer is now blocked on a full channel
207        // somewhere past it.
208        assert_eq!(bounded.next().await.unwrap().unwrap(), 1);
209
210        // Stop polling. Advance virtual time past the deadline. The
211        // producer-side `timeout_at` must drop the inner stream here.
212        tokio::time::sleep(Duration::from_secs(10)).await;
213        let snapshot = count.load(AtomicOrdering::SeqCst);
214
215        // Past-deadline: no further inner-stream yields should be observed.
216        tokio::time::sleep(Duration::from_secs(10)).await;
217        assert_eq!(
218            count.load(AtomicOrdering::SeqCst),
219            snapshot,
220            "inner stream kept producing past the deadline",
221        );
222
223        // And the consumer sees `DeadlineExceeded` on its next poll.
224        let status = bounded.next().await.unwrap().unwrap_err();
225        assert_eq!(status.code(), tonic::Code::DeadlineExceeded);
226    }
227
228    /// A panic inside the producer task must surface as `Internal`
229    /// instead of silently closing the channel (which the consumer can't
230    /// distinguish from a clean EOF). Without this translation, a
231    /// truncated response looks like a successful one to the client.
232    #[tokio::test(start_paused = true)]
233    async fn with_deadline_translates_producer_panic_to_internal() {
234        let inner: BoxStream<'static, Result<u64, tonic::Status>> =
235            stream::unfold(0u64, |i| async move {
236                if i == 1 {
237                    panic!("boom from inner stream");
238                }
239                Some((Ok::<u64, tonic::Status>(i), i + 1))
240            })
241            .boxed();
242        let deadline = Instant::now() + Duration::from_secs(60);
243        let mut bounded = with_deadline(inner, deadline, "test");
244
245        // First item flows through normally.
246        assert_eq!(bounded.next().await.unwrap().unwrap(), 0);
247        // Next pull observes the producer panic via the JoinHandle.
248        let status = bounded.next().await.unwrap().unwrap_err();
249        assert_eq!(status.code(), tonic::Code::Internal);
250    }
251
252    /// Dropping the wrapper before any item is drained must abort the
253    /// spawned producer and drop the inner stream — otherwise a client
254    /// that hangs up early would leak the in-flight pipeline until its
255    /// own deadline.
256    #[tokio::test(start_paused = true)]
257    async fn with_deadline_aborts_producer_when_consumer_drops() {
258        struct DropBeacon(Arc<AtomicBool>);
259        impl Drop for DropBeacon {
260            fn drop(&mut self) {
261                self.0.store(true, AtomicOrdering::SeqCst);
262            }
263        }
264
265        let dropped = Arc::new(AtomicBool::new(false));
266        let beacon = DropBeacon(dropped.clone());
267        // Stream that never yields but holds the beacon for its lifetime —
268        // beacon's Drop fires iff the producer task drops the inner stream.
269        let inner: BoxStream<'static, Result<u64, tonic::Status>> =
270            stream::unfold(beacon, |state| async move {
271                let _hold = &state;
272                std::future::pending::<()>().await;
273                Some((Ok::<u64, tonic::Status>(1), state))
274            })
275            .boxed();
276        let deadline = Instant::now() + Duration::from_secs(60);
277
278        let bounded = with_deadline(inner, deadline, "test");
279        drop(bounded);
280
281        // Give the runtime cycles to deliver the abort and drop the
282        // producer task's future.
283        for _ in 0..10 {
284            tokio::task::yield_now().await;
285            if dropped.load(AtomicOrdering::SeqCst) {
286                break;
287            }
288        }
289        assert!(
290            dropped.load(AtomicOrdering::SeqCst),
291            "inner stream was not dropped after consumer dropped the wrapper",
292        );
293    }
294}