sui_http/
connection_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::pin::pin;
5use std::time::Duration;
6
7use http::Request;
8use http::Response;
9use tracing::debug;
10use tracing::trace;
11
12use crate::ActiveConnections;
13use crate::BoxError;
14use crate::ConnectionId;
15use crate::fuse::Fuse;
16
17// This is moved to its own function as a way to get around
18// https://github.com/rust-lang/rust/issues/102211
19pub async fn serve_connection<IO, S, B, C>(
20    hyper_io: IO,
21    hyper_svc: S,
22    builder: hyper_util::server::conn::auto::Builder<hyper_util::rt::TokioExecutor>,
23    graceful_shutdown_token: tokio_util::sync::CancellationToken,
24    max_connection_age: Option<Duration>,
25    on_connection_close: C,
26) where
27    B: http_body::Body + Send + 'static,
28    B::Data: Send,
29    B::Error: Into<BoxError>,
30    IO: hyper::rt::Read + hyper::rt::Write + Send + Unpin + 'static,
31    S: hyper::service::Service<Request<hyper::body::Incoming>, Response = Response<B>> + 'static,
32    S::Future: Send + 'static,
33    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
34{
35    let mut sig = pin!(Fuse::new(graceful_shutdown_token.cancelled_owned()));
36
37    let mut conn = pin!(builder.serve_connection_with_upgrades(hyper_io, hyper_svc));
38
39    let sleep = sleep_or_pending(max_connection_age);
40    tokio::pin!(sleep);
41
42    loop {
43        tokio::select! {
44            _ = &mut sig => {
45                conn.as_mut().graceful_shutdown();
46            }
47            rv = &mut conn => {
48                if let Err(err) = rv {
49                    debug!("failed serving connection: {:#}", err);
50                }
51                break;
52            },
53            _ = &mut sleep  => {
54                conn.as_mut().graceful_shutdown();
55                sleep.set(sleep_or_pending(None));
56            },
57        }
58    }
59
60    trace!("connection closed");
61    drop(on_connection_close);
62}
63
64async fn sleep_or_pending(wait_for: Option<Duration>) {
65    match wait_for {
66        Some(wait) => tokio::time::sleep(wait).await,
67        None => std::future::pending().await,
68    };
69}
70
71pub(crate) struct OnConnectionClose<A> {
72    id: ConnectionId,
73    active_connections: ActiveConnections<A>,
74}
75
76impl<A> OnConnectionClose<A> {
77    pub(crate) fn new(id: ConnectionId, active_connections: ActiveConnections<A>) -> Self {
78        Self {
79            id,
80            active_connections,
81        }
82    }
83}
84
85impl<A> Drop for OnConnectionClose<A> {
86    fn drop(&mut self) {
87        self.active_connections.write().unwrap().remove(&self.id);
88    }
89}