sui_http/middleware/callback/
body.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::ResponseHandler;
5use http_body::Body;
6use http_body::Frame;
7use pin_project_lite::pin_project;
8use std::fmt;
9use std::pin::Pin;
10use std::task::Context;
11use std::task::Poll;
12use std::task::ready;
13
14pin_project! {
15    /// Response body for [`Callback`].
16    ///
17    /// [`Callback`]: super::Callback
18    pub struct ResponseBody<B, ResponseHandler> {
19        #[pin]
20        pub(crate) inner: B,
21        pub(crate) handler: ResponseHandler,
22    }
23}
24
25impl<B, ResponseHandlerT> Body for ResponseBody<B, ResponseHandlerT>
26where
27    B: Body,
28    B::Error: fmt::Display + 'static,
29    ResponseHandlerT: ResponseHandler,
30{
31    type Data = B::Data;
32    type Error = B::Error;
33
34    fn poll_frame(
35        self: Pin<&mut Self>,
36        cx: &mut Context<'_>,
37    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
38        let this = self.project();
39        let result = ready!(this.inner.poll_frame(cx));
40
41        match result {
42            Some(Ok(frame)) => {
43                let frame = match frame.into_data() {
44                    Ok(chunk) => {
45                        this.handler.on_body_chunk(&chunk);
46                        Frame::data(chunk)
47                    }
48                    Err(frame) => frame,
49                };
50
51                let frame = match frame.into_trailers() {
52                    Ok(trailers) => {
53                        this.handler.on_end_of_stream(Some(&trailers));
54                        Frame::trailers(trailers)
55                    }
56                    Err(frame) => frame,
57                };
58
59                Poll::Ready(Some(Ok(frame)))
60            }
61            Some(Err(err)) => {
62                this.handler.on_error(&err);
63
64                Poll::Ready(Some(Err(err)))
65            }
66            None => {
67                this.handler.on_end_of_stream(None);
68
69                Poll::Ready(None)
70            }
71        }
72    }
73
74    fn is_end_stream(&self) -> bool {
75        self.inner.is_end_stream()
76    }
77
78    fn size_hint(&self) -> http_body::SizeHint {
79        self.inner.size_hint()
80    }
81}