1use bytes::{Buf, BufMut};
5use std::{io::Read, marker::PhantomData};
6use tonic::{
7 Status,
8 codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
9};
10
11#[derive(Debug)]
12pub struct BcsEncoder<T>(PhantomData<T>);
13
14impl<T: serde::Serialize> Encoder for BcsEncoder<T> {
15 type Item = T;
16 type Error = Status;
17
18 fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
19 bcs::serialize_into(&mut buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
20 }
21}
22
23#[derive(Debug)]
24pub struct BcsDecoder<U>(PhantomData<U>);
25
26impl<U: serde::de::DeserializeOwned> Decoder for BcsDecoder<U> {
27 type Item = U;
28 type Error = Status;
29
30 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
31 if !buf.has_remaining() {
32 return Ok(None);
33 }
34
35 let chunk = buf.chunk();
36
37 let item: Self::Item =
38 bcs::from_bytes(chunk).map_err(|e| Status::internal(e.to_string()))?;
39 buf.advance(chunk.len());
40
41 Ok(Some(item))
42 }
43}
44
45#[derive(Debug, Clone)]
47pub struct BcsCodec<T, U>(PhantomData<(T, U)>);
48
49impl<T, U> Default for BcsCodec<T, U> {
50 fn default() -> Self {
51 Self(PhantomData)
52 }
53}
54
55impl<T, U> Codec for BcsCodec<T, U>
56where
57 T: serde::Serialize + Send + 'static,
58 U: serde::de::DeserializeOwned + Send + 'static,
59{
60 type Encode = T;
61 type Decode = U;
62 type Encoder = BcsEncoder<T>;
63 type Decoder = BcsDecoder<U>;
64
65 fn encoder(&mut self) -> Self::Encoder {
66 BcsEncoder(PhantomData)
67 }
68
69 fn decoder(&mut self) -> Self::Decoder {
70 BcsDecoder(PhantomData)
71 }
72}
73
74#[derive(Debug)]
75pub struct BcsSnappyEncoder<T>(PhantomData<T>);
76
77impl<T: serde::Serialize> Encoder for BcsSnappyEncoder<T> {
78 type Item = T;
79 type Error = Status;
80
81 fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
82 let mut snappy_encoder = snap::write::FrameEncoder::new(buf.writer());
83 bcs::serialize_into(&mut snappy_encoder, &item).map_err(|e| Status::internal(e.to_string()))
84 }
85}
86
87#[derive(Debug)]
88pub struct BcsSnappyDecoder<U>(PhantomData<U>);
89
90impl<U: serde::de::DeserializeOwned> Decoder for BcsSnappyDecoder<U> {
91 type Item = U;
92 type Error = Status;
93
94 fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
95 let compressed_size = buf.remaining();
96 if compressed_size == 0 {
97 return Ok(None);
98 }
99 let mut snappy_decoder = snap::read::FrameDecoder::new(buf.reader());
100 let mut bytes = Vec::with_capacity(compressed_size);
101 snappy_decoder.read_to_end(&mut bytes)?;
102 let item =
103 bcs::from_bytes(bytes.as_slice()).map_err(|e| Status::internal(e.to_string()))?;
104 Ok(Some(item))
105 }
106}
107
108#[derive(Debug, Clone)]
111pub struct BcsSnappyCodec<T, U>(PhantomData<(T, U)>);
112
113impl<T, U> Default for BcsSnappyCodec<T, U> {
114 fn default() -> Self {
115 Self(PhantomData)
116 }
117}
118
119impl<T, U> Codec for BcsSnappyCodec<T, U>
120where
121 T: serde::Serialize + Send + 'static,
122 U: serde::de::DeserializeOwned + Send + 'static,
123{
124 type Encode = T;
125 type Decode = U;
126 type Encoder = BcsSnappyEncoder<T>;
127 type Decoder = BcsSnappyDecoder<U>;
128
129 fn encoder(&mut self) -> Self::Encoder {
130 BcsSnappyEncoder(PhantomData)
131 }
132
133 fn decoder(&mut self) -> Self::Decoder {
134 BcsSnappyDecoder(PhantomData)
135 }
136}
137
138pub mod anemo {
140 use ::anemo::rpc::codec::{Codec, Decoder, Encoder};
141 use bytes::Buf;
142 use std::{io::Read, marker::PhantomData};
143
144 #[derive(Debug)]
145 pub struct BcsSnappyEncoder<T>(PhantomData<T>);
146
147 impl<T: serde::Serialize> Encoder for BcsSnappyEncoder<T> {
148 type Item = T;
149 type Error = bcs::Error;
150
151 fn encode(&mut self, item: Self::Item) -> Result<bytes::Bytes, Self::Error> {
152 let mut buf = Vec::<u8>::new();
153 let mut snappy_encoder = snap::write::FrameEncoder::new(&mut buf);
154 bcs::serialize_into(&mut snappy_encoder, &item)?;
155 drop(snappy_encoder);
156 Ok(buf.into())
157 }
158 }
159
160 #[derive(Debug)]
161 pub struct BcsSnappyDecoder<U>(PhantomData<U>);
162
163 impl<U: serde::de::DeserializeOwned> Decoder for BcsSnappyDecoder<U> {
164 type Item = U;
165 type Error = bcs::Error;
166
167 fn decode(&mut self, buf: bytes::Bytes) -> Result<Self::Item, Self::Error> {
168 let compressed_size = buf.len();
169 let mut snappy_decoder = snap::read::FrameDecoder::new(buf.reader()).take(1 << 30);
170 let mut bytes = Vec::with_capacity(compressed_size);
171 snappy_decoder.read_to_end(&mut bytes)?;
172 bcs::from_bytes(bytes.as_slice())
173 }
174 }
175
176 #[derive(Debug, Clone)]
178 pub struct BcsSnappyCodec<T, U>(PhantomData<(T, U)>);
179
180 impl<T, U> Default for BcsSnappyCodec<T, U> {
181 fn default() -> Self {
182 Self(PhantomData)
183 }
184 }
185
186 impl<T, U> Codec for BcsSnappyCodec<T, U>
187 where
188 T: serde::Serialize + Send + 'static,
189 U: serde::de::DeserializeOwned + Send + 'static,
190 {
191 type Encode = T;
192 type Decode = U;
193 type Encoder = BcsSnappyEncoder<T>;
194 type Decoder = BcsSnappyDecoder<U>;
195
196 fn encoder(&mut self) -> Self::Encoder {
197 BcsSnappyEncoder(PhantomData)
198 }
199
200 fn decoder(&mut self) -> Self::Decoder {
201 BcsSnappyDecoder(PhantomData)
202 }
203
204 fn format_name(&self) -> &'static str {
205 "bcs"
206 }
207 }
208}