mysten_network/callback/
body.rs1use super::ResponseHandler;
5use http_body::{Body, Frame};
6use pin_project_lite::pin_project;
7use std::{
8 fmt,
9 pin::Pin,
10 task::{Context, Poll, ready},
11};
12
13pin_project! {
14 pub struct ResponseBody<B, ResponseHandler> {
18 #[pin]
19 pub(crate) inner: B,
20 pub(crate) handler: ResponseHandler,
21 }
22}
23
24impl<B, ResponseHandlerT> Body for ResponseBody<B, ResponseHandlerT>
25where
26 B: Body,
27 B::Error: fmt::Display + 'static,
28 ResponseHandlerT: ResponseHandler,
29{
30 type Data = B::Data;
31 type Error = B::Error;
32
33 fn poll_frame(
34 self: Pin<&mut Self>,
35 cx: &mut Context<'_>,
36 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
37 let this = self.project();
38 let result = ready!(this.inner.poll_frame(cx));
39
40 match result {
41 Some(Ok(frame)) => {
42 let frame = match frame.into_data() {
43 Ok(chunk) => {
44 this.handler.on_body_chunk(&chunk);
45 Frame::data(chunk)
46 }
47 Err(frame) => frame,
48 };
49
50 let frame = match frame.into_trailers() {
51 Ok(trailers) => {
52 this.handler.on_end_of_stream(Some(&trailers));
53 Frame::trailers(trailers)
54 }
55 Err(frame) => frame,
56 };
57
58 Poll::Ready(Some(Ok(frame)))
59 }
60 Some(Err(err)) => {
61 this.handler.on_error(&err);
62
63 Poll::Ready(Some(Err(err)))
64 }
65 None => {
66 this.handler.on_end_of_stream(None);
67
68 Poll::Ready(None)
69 }
70 }
71 }
72
73 fn is_end_stream(&self) -> bool {
74 self.inner.is_end_stream()
75 }
76
77 fn size_hint(&self) -> http_body::SizeHint {
78 self.inner.size_hint()
79 }
80}