mysten_network/callback/
body.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::ResponseHandler;
5use http_body::{Body, Frame};
6use pin_project_lite::pin_project;
7use std::{
8    fmt,
9    pin::Pin,
10    task::{Context, Poll, ready},
11};
12
13pin_project! {
14    /// Response body for [`Callback`].
15    ///
16    /// [`Callback`]: super::Callback
17    pub struct ResponseBody<B, ResponseHandler> {
18        #[pin]
19        pub(crate) inner: B,
20        pub(crate) handler: ResponseHandler,
21    }
22}
23
24impl<B, ResponseHandlerT> Body for ResponseBody<B, ResponseHandlerT>
25where
26    B: Body,
27    B::Error: fmt::Display + 'static,
28    ResponseHandlerT: ResponseHandler,
29{
30    type Data = B::Data;
31    type Error = B::Error;
32
33    fn poll_frame(
34        self: Pin<&mut Self>,
35        cx: &mut Context<'_>,
36    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
37        let this = self.project();
38        let result = ready!(this.inner.poll_frame(cx));
39
40        match result {
41            Some(Ok(frame)) => {
42                let frame = match frame.into_data() {
43                    Ok(chunk) => {
44                        this.handler.on_body_chunk(&chunk);
45                        Frame::data(chunk)
46                    }
47                    Err(frame) => frame,
48                };
49
50                let frame = match frame.into_trailers() {
51                    Ok(trailers) => {
52                        this.handler.on_end_of_stream(Some(&trailers));
53                        Frame::trailers(trailers)
54                    }
55                    Err(frame) => frame,
56                };
57
58                Poll::Ready(Some(Ok(frame)))
59            }
60            Some(Err(err)) => {
61                this.handler.on_error(&err);
62
63                Poll::Ready(Some(Err(err)))
64            }
65            None => {
66                this.handler.on_end_of_stream(None);
67
68                Poll::Ready(None)
69            }
70        }
71    }
72
73    fn is_end_stream(&self) -> bool {
74        self.inner.is_end_stream()
75    }
76
77    fn size_hint(&self) -> http_body::SizeHint {
78        self.inner.size_hint()
79    }
80}