sui_http/
connection_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{pin::pin, time::Duration};
5
6use http::{Request, Response};
7use tracing::{debug, trace};
8
9use crate::{ActiveConnections, BoxError, ConnectionId, fuse::Fuse};
10
11// This is moved to its own function as a way to get around
12// https://github.com/rust-lang/rust/issues/102211
13pub async fn serve_connection<IO, S, B, C>(
14    hyper_io: IO,
15    hyper_svc: S,
16    builder: hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
17    graceful_shutdown_token: tokio_util::sync::CancellationToken,
18    max_connection_age: Option<Duration>,
19    on_connection_close: C,
20) where
21    B: http_body::Body + Send + 'static,
22    B::Data: Send,
23    B::Error: Into<BoxError>,
24    IO: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
25    S: hyper::service::Service<Request<hyper::body::Incoming>, Response = Response<B>> + 'static,
26    S::Future: Send + 'static,
27    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
28{
29    let mut sig = pin!(Fuse::new(graceful_shutdown_token.cancelled_owned()));
30
31    let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_svc));
32
33    let sleep = sleep_or_pending(max_connection_age);
34    tokio::pin!(sleep);
35
36    loop {
37        tokio::select! {
38            _ = &mut sig => {
39                conn.as_mut().graceful_shutdown();
40            }
41            rv = &mut conn => {
42                if let Err(err) = rv {
43                    debug!("failed serving connection: {:#}", err);
44                }
45                break;
46            },
47            _ = &mut sleep  => {
48                conn.as_mut().graceful_shutdown();
49                sleep.set(sleep_or_pending(None));
50            },
51        }
52    }
53
54    trace!("connection closed");
55    drop(on_connection_close);
56}
57
58async fn sleep_or_pending(wait_for: Option<Duration>) {
59    match wait_for {
60        Some(wait) => tokio::time::sleep(wait).await,
61        None => std::future::pending().await,
62    };
63}
64
65pub(crate) struct OnConnectionClose<A> {
66    id: ConnectionId,
67    active_connections: ActiveConnections<A>,
68}
69
70impl<A> OnConnectionClose<A> {
71    pub(crate) fn new(id: ConnectionId, active_connections: ActiveConnections<A>) -> Self {
72        Self {
73            id,
74            active_connections,
75        }
76    }
77}
78
79impl<A> Drop for OnConnectionClose<A> {
80    fn drop(&mut self) {
81        self.active_connections.write().unwrap().remove(&self.id);
82    }
83}