mysten_network/
codec.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use bytes::{Buf, BufMut};
5use std::{io::Read, marker::PhantomData};
6use tonic::{
7    Status,
8    codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder},
9};
10
11#[derive(Debug)]
12pub struct BcsEncoder<T>(PhantomData<T>);
13
14impl<T: serde::Serialize> Encoder for BcsEncoder<T> {
15    type Item = T;
16    type Error = Status;
17
18    fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
19        bcs::serialize_into(&mut buf.writer(), &item).map_err(|e| Status::internal(e.to_string()))
20    }
21}
22
23#[derive(Debug)]
24pub struct BcsDecoder<U>(PhantomData<U>);
25
26impl<U: serde::de::DeserializeOwned> Decoder for BcsDecoder<U> {
27    type Item = U;
28    type Error = Status;
29
30    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
31        if !buf.has_remaining() {
32            return Ok(None);
33        }
34
35        let chunk = buf.chunk();
36
37        let item: Self::Item =
38            bcs::from_bytes(chunk).map_err(|e| Status::internal(e.to_string()))?;
39        buf.advance(chunk.len());
40
41        Ok(Some(item))
42    }
43}
44
45/// A [`Codec`] that implements `application/grpc+bcs` via the serde library.
46#[derive(Debug, Clone)]
47pub struct BcsCodec<T, U>(PhantomData<(T, U)>);
48
49impl<T, U> Default for BcsCodec<T, U> {
50    fn default() -> Self {
51        Self(PhantomData)
52    }
53}
54
55impl<T, U> Codec for BcsCodec<T, U>
56where
57    T: serde::Serialize + Send + 'static,
58    U: serde::de::DeserializeOwned + Send + 'static,
59{
60    type Encode = T;
61    type Decode = U;
62    type Encoder = BcsEncoder<T>;
63    type Decoder = BcsDecoder<U>;
64
65    fn encoder(&mut self) -> Self::Encoder {
66        BcsEncoder(PhantomData)
67    }
68
69    fn decoder(&mut self) -> Self::Decoder {
70        BcsDecoder(PhantomData)
71    }
72}
73
74#[derive(Debug)]
75pub struct BcsSnappyEncoder<T>(PhantomData<T>);
76
77impl<T: serde::Serialize> Encoder for BcsSnappyEncoder<T> {
78    type Item = T;
79    type Error = Status;
80
81    fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
82        let mut snappy_encoder = snap::write::FrameEncoder::new(buf.writer());
83        bcs::serialize_into(&mut snappy_encoder, &item).map_err(|e| Status::internal(e.to_string()))
84    }
85}
86
87#[derive(Debug)]
88pub struct BcsSnappyDecoder<U>(PhantomData<U>);
89
90impl<U: serde::de::DeserializeOwned> Decoder for BcsSnappyDecoder<U> {
91    type Item = U;
92    type Error = Status;
93
94    fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
95        let compressed_size = buf.remaining();
96        if compressed_size == 0 {
97            return Ok(None);
98        }
99        let mut snappy_decoder = snap::read::FrameDecoder::new(buf.reader());
100        let mut bytes = Vec::with_capacity(compressed_size);
101        snappy_decoder.read_to_end(&mut bytes)?;
102        let item =
103            bcs::from_bytes(bytes.as_slice()).map_err(|e| Status::internal(e.to_string()))?;
104        Ok(Some(item))
105    }
106}
107
108/// A [`Codec`] that implements `bcs` encoding/decoding and snappy compression/decompression
109/// via the serde library.
110#[derive(Debug, Clone)]
111pub struct BcsSnappyCodec<T, U>(PhantomData<(T, U)>);
112
113impl<T, U> Default for BcsSnappyCodec<T, U> {
114    fn default() -> Self {
115        Self(PhantomData)
116    }
117}
118
119impl<T, U> Codec for BcsSnappyCodec<T, U>
120where
121    T: serde::Serialize + Send + 'static,
122    U: serde::de::DeserializeOwned + Send + 'static,
123{
124    type Encode = T;
125    type Decode = U;
126    type Encoder = BcsSnappyEncoder<T>;
127    type Decoder = BcsSnappyDecoder<U>;
128
129    fn encoder(&mut self) -> Self::Encoder {
130        BcsSnappyEncoder(PhantomData)
131    }
132
133    fn decoder(&mut self) -> Self::Decoder {
134        BcsSnappyDecoder(PhantomData)
135    }
136}
137
138// Anemo variant of BCS codec using Snappy for compression.
139pub mod anemo {
140    use ::anemo::rpc::codec::{Codec, Decoder, Encoder};
141    use bytes::Buf;
142    use std::{io::Read, marker::PhantomData};
143
144    #[derive(Debug)]
145    pub struct BcsSnappyEncoder<T>(PhantomData<T>);
146
147    impl<T: serde::Serialize> Encoder for BcsSnappyEncoder<T> {
148        type Item = T;
149        type Error = bcs::Error;
150
151        fn encode(&mut self, item: Self::Item) -> Result<bytes::Bytes, Self::Error> {
152            let mut buf = Vec::<u8>::new();
153            let mut snappy_encoder = snap::write::FrameEncoder::new(&mut buf);
154            bcs::serialize_into(&mut snappy_encoder, &item)?;
155            drop(snappy_encoder);
156            Ok(buf.into())
157        }
158    }
159
160    #[derive(Debug)]
161    pub struct BcsSnappyDecoder<U>(PhantomData<U>);
162
163    impl<U: serde::de::DeserializeOwned> Decoder for BcsSnappyDecoder<U> {
164        type Item = U;
165        type Error = bcs::Error;
166
167        fn decode(&mut self, buf: bytes::Bytes) -> Result<Self::Item, Self::Error> {
168            let compressed_size = buf.len();
169            let mut snappy_decoder = snap::read::FrameDecoder::new(buf.reader()).take(1 << 30);
170            let mut bytes = Vec::with_capacity(compressed_size);
171            snappy_decoder.read_to_end(&mut bytes)?;
172            bcs::from_bytes(bytes.as_slice())
173        }
174    }
175
176    /// A [`Codec`] that implements `bcs` encoding/decoding via the serde library.
177    #[derive(Debug, Clone)]
178    pub struct BcsSnappyCodec<T, U>(PhantomData<(T, U)>);
179
180    impl<T, U> Default for BcsSnappyCodec<T, U> {
181        fn default() -> Self {
182            Self(PhantomData)
183        }
184    }
185
186    impl<T, U> Codec for BcsSnappyCodec<T, U>
187    where
188        T: serde::Serialize + Send + 'static,
189        U: serde::de::DeserializeOwned + Send + 'static,
190    {
191        type Encode = T;
192        type Decode = U;
193        type Encoder = BcsSnappyEncoder<T>;
194        type Decoder = BcsSnappyDecoder<U>;
195
196        fn encoder(&mut self) -> Self::Encoder {
197            BcsSnappyEncoder(PhantomData)
198        }
199
200        fn decoder(&mut self) -> Self::Decoder {
201            BcsSnappyDecoder(PhantomData)
202        }
203
204        fn format_name(&self) -> &'static str {
205            "bcs"
206        }
207    }
208}