sui_proxy/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod admin;
4pub mod config;
5pub mod consumer;
6pub mod handlers;
7pub mod histogram_relay;
8pub mod metrics;
9pub mod middleware;
10pub mod peers;
11pub mod prom_to_mimir;
12pub mod remote_write;
13
14/// var extracts environment variables at runtime with a default fallback value
15/// if a default is not provided, the value is simply an empty string if not found
16/// This function will return the provided default if env::var cannot find the key
17/// or if the key is somehow malformed.
18#[macro_export]
19macro_rules! var {
20    ($key:expr) => {
21        match std::env::var($key) {
22            Ok(val) => val,
23            Err(_) => "".into(),
24        }
25    };
26    ($key:expr, $default:expr) => {
27        match std::env::var($key) {
28            Ok(val) => val.parse::<_>().unwrap(),
29            Err(_) => $default,
30        }
31    };
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::admin::Labels;
38    use crate::histogram_relay::HistogramRelay;
39    use crate::prom_to_mimir::tests::*;
40
41    use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
42    use axum::Router;
43    use axum::http::StatusCode;
44    use axum::routing::post;
45    use prometheus::Encoder;
46    use prometheus::PROTOBUF_FORMAT;
47    use std::net::TcpListener;
48    use std::time::Duration;
49    use sui_tls::{ClientCertVerifier, TlsAcceptor};
50
51    async fn run_dummy_remote_write(listener: TcpListener) {
52        /// i accept everything, send me the trash
53        async fn handler() -> StatusCode {
54            StatusCode::OK
55        }
56
57        // build our application with a route
58        let app = Router::new().route("/v1/push", post(handler));
59
60        // run it
61        listener.set_nonblocking(true).unwrap();
62        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
63        axum::serve(listener, app).await.unwrap();
64    }
65
66    async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
67        /// i accept everything, send me the trash, but i will sleep and never return before a timeout
68        /// this is for testing slow clients and this is the easiest way to do so without adding a special
69        /// route in the server to do so
70        async fn handler() -> StatusCode {
71            // Simulate a route that hangs while waiting for a client to send data
72            // but the server itself doesn't delay its processing
73            tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
74            StatusCode::OK
75        }
76
77        // build our application with a route
78        let app = Router::new().route("/v1/push", post(handler));
79
80        // run it
81        listener.set_nonblocking(true).unwrap();
82        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
83        axum::serve(listener, app).await.unwrap();
84    }
85
86    /// test_axum_acceptor is a basic e2e test that creates a mock remote_write post endpoint and has a simple
87    /// sui-node client that posts data to the proxy using the protobuf format.  The server processes this
88    /// data and sends it to the mock remote_write which accepts everything.  Future work is to make this more
89    /// robust and expand the scope of coverage, probabaly moving this test elsewhere and renaming it.
90    #[tokio::test]
91    async fn test_axum_acceptor() {
92        // generate self-signed certificates
93        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
94        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
95
96        // create a fake rpc server
97        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
98        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
99        let dummy_remote_write_url = format!(
100            "http://localhost:{}/v1/push",
101            dummy_remote_write_address.port()
102        );
103
104        let _dummy_remote_write =
105            tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
106
107        // init the tls config and allower
108        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
109        let tls_config = ClientCertVerifier::new(
110            allower.clone(),
111            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
112        )
113        .rustls_server_config(
114            vec![server_priv_cert.rustls_certificate()],
115            server_priv_cert.rustls_private_key(),
116        )
117        .unwrap();
118
119        let client = admin::make_reqwest_client(
120            RemoteWriteConfig {
121                url: dummy_remote_write_url.to_owned(),
122                username: "bar".into(),
123                password: "foo".into(),
124                ..Default::default()
125            },
126            "dummy user agent",
127        );
128
129        let app = admin::app(
130            Labels {
131                network: "unittest-network".into(),
132                inventory_hostname: "ansible_inventory_name".into(),
133            },
134            client,
135            HistogramRelay::new(),
136            Some(allower.clone()),
137            None,
138        );
139
140        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
141        let server_address = listener.local_addr().unwrap();
142        let server_url = format!(
143            "https://localhost:{}/publish/metrics",
144            server_address.port()
145        );
146
147        let acceptor = TlsAcceptor::new(tls_config);
148        let _server = tokio::spawn(async move {
149            admin::server(listener, app, Some(acceptor)).await.unwrap();
150        });
151
152        // build a client
153        let client = reqwest::Client::builder()
154            .add_root_certificate(server_priv_cert.reqwest_certificate())
155            .identity(client_priv_cert.reqwest_identity())
156            .https_only(true)
157            .build()
158            .unwrap();
159
160        // Client request is rejected because it isn't in the allowlist
161        client.get(&server_url).send().await.unwrap_err();
162
163        // Insert the client's public key into the allowlist and verify the request is successful
164        allower.get_sui_mut().write().unwrap().insert(
165            client_pub_key.to_owned(),
166            peers::AllowedPeer {
167                name: "some-node".into(),
168                public_key: client_pub_key.to_owned(),
169            },
170        );
171
172        let mf = create_metric_family(
173            "foo_metric",
174            "some help this is",
175            None,
176            vec![create_metric_counter(
177                create_labels(vec![("some", "label")]),
178                create_counter(2046.0),
179            )],
180        );
181
182        let mut buf = vec![];
183        let encoder = prometheus::ProtobufEncoder::new();
184        encoder.encode(&[mf], &mut buf).unwrap();
185
186        let res = client
187            .post(&server_url)
188            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
189            .body(buf)
190            .send()
191            .await
192            .expect("expected a successful post with a self-signed certificate");
193        let status = res.status();
194        let body = res.text().await.unwrap();
195        assert_eq!("created", body);
196        assert_eq!(status, reqwest::StatusCode::CREATED);
197    }
198
199    /// this is a long test to ensure we are timing out clients that are slow
200    #[tokio::test]
201    async fn test_client_timeout() {
202        // generate self-signed certificates
203        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
204        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
205
206        // create a fake rpc server
207        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
208        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
209        let dummy_remote_write_url = format!(
210            "http://localhost:{}/v1/push",
211            dummy_remote_write_address.port()
212        );
213
214        let _dummy_remote_write = tokio::spawn(async move {
215            run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
216        });
217
218        // init the tls config and allower
219        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
220        let tls_config = ClientCertVerifier::new(
221            allower.clone(),
222            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
223        )
224        .rustls_server_config(
225            vec![server_priv_cert.rustls_certificate()],
226            server_priv_cert.rustls_private_key(),
227        )
228        .unwrap();
229
230        let client = admin::make_reqwest_client(
231            RemoteWriteConfig {
232                url: dummy_remote_write_url.to_owned(),
233                username: "bar".into(),
234                password: "foo".into(),
235                ..Default::default()
236            },
237            "dummy user agent",
238        );
239
240        let timeout_secs = Some(2u64);
241
242        let app = admin::app(
243            Labels {
244                network: "unittest-network".into(),
245                inventory_hostname: "ansible_inventory_name".into(),
246            },
247            client,
248            HistogramRelay::new(),
249            Some(allower.clone()),
250            timeout_secs,
251        );
252
253        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
254        let server_address = listener.local_addr().unwrap();
255        let server_url = format!(
256            "https://localhost:{}/publish/metrics",
257            server_address.port()
258        );
259
260        let acceptor = TlsAcceptor::new(tls_config);
261        let _server = tokio::spawn(async move {
262            admin::server(listener, app, Some(acceptor)).await.unwrap();
263        });
264
265        // build a client
266        let client = reqwest::Client::builder()
267            .add_root_certificate(server_priv_cert.reqwest_certificate())
268            .identity(client_priv_cert.reqwest_identity())
269            .https_only(true)
270            .build()
271            .unwrap();
272
273        // Client request is rejected because it isn't in the allowlist
274        client.get(&server_url).send().await.unwrap_err();
275
276        // Insert the client's public key into the allowlist and verify the request is successful
277        allower.get_sui_mut().write().unwrap().insert(
278            client_pub_key.to_owned(),
279            peers::AllowedPeer {
280                name: "some-node".into(),
281                public_key: client_pub_key.to_owned(),
282            },
283        );
284
285        let mf = create_metric_family(
286            "foo_metric",
287            "some help this is",
288            None,
289            vec![create_metric_counter(
290                create_labels(vec![("some", "label")]),
291                create_counter(2046.0),
292            )],
293        );
294
295        let mut buf = vec![];
296        let encoder = prometheus::ProtobufEncoder::new();
297        encoder.encode(&[mf], &mut buf).unwrap();
298
299        let res = client
300            .post(&server_url)
301            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
302            .body(buf)
303            .send()
304            .await
305            .expect("expected a successful post with a self-signed certificate");
306        let status = res.status();
307        assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
308    }
309}