sui_proxy/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3pub mod admin;
4pub mod config;
5pub mod consumer;
6pub mod handlers;
7pub mod histogram_relay;
8pub mod metrics;
9pub mod middleware;
10pub mod peers;
11pub mod prom_to_mimir;
12pub mod remote_write;
13
14/// var extracts environment variables at runtime with a default fallback value
15/// if a default is not provided, the value is simply an empty string if not found
16/// This function will return the provided default if env::var cannot find the key
17/// or if the key is somehow malformed.
18#[macro_export]
19macro_rules! var {
20    ($key:expr) => {
21        match std::env::var($key) {
22            Ok(val) => val,
23            Err(_) => "".into(),
24        }
25    };
26    ($key:expr, $default:expr) => {
27        match std::env::var($key) {
28            Ok(val) => val.parse::<_>().unwrap(),
29            Err(_) => $default,
30        }
31    };
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::admin::Labels;
38    use crate::histogram_relay::HistogramRelay;
39    use crate::prom_to_mimir::tests::*;
40
41    use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
42    use axum::Router;
43    use axum::http::StatusCode;
44    use axum::routing::post;
45    use prometheus::Encoder;
46    use prometheus::PROTOBUF_FORMAT;
47    use protobuf::RepeatedField;
48    use std::net::TcpListener;
49    use std::time::Duration;
50    use sui_tls::{ClientCertVerifier, TlsAcceptor};
51
52    async fn run_dummy_remote_write(listener: TcpListener) {
53        /// i accept everything, send me the trash
54        async fn handler() -> StatusCode {
55            StatusCode::OK
56        }
57
58        // build our application with a route
59        let app = Router::new().route("/v1/push", post(handler));
60
61        // run it
62        listener.set_nonblocking(true).unwrap();
63        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
64        axum::serve(listener, app).await.unwrap();
65    }
66
67    async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
68        /// i accept everything, send me the trash, but i will sleep and never return before a timeout
69        /// this is for testing slow clients and this is the easiest way to do so without adding a special
70        /// route in the server to do so
71        async fn handler() -> StatusCode {
72            // Simulate a route that hangs while waiting for a client to send data
73            // but the server itself doesn't delay its processing
74            tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
75            StatusCode::OK
76        }
77
78        // build our application with a route
79        let app = Router::new().route("/v1/push", post(handler));
80
81        // run it
82        listener.set_nonblocking(true).unwrap();
83        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
84        axum::serve(listener, app).await.unwrap();
85    }
86
87    /// test_axum_acceptor is a basic e2e test that creates a mock remote_write post endpoint and has a simple
88    /// sui-node client that posts data to the proxy using the protobuf format.  The server processes this
89    /// data and sends it to the mock remote_write which accepts everything.  Future work is to make this more
90    /// robust and expand the scope of coverage, probabaly moving this test elsewhere and renaming it.
91    #[tokio::test]
92    async fn test_axum_acceptor() {
93        // generate self-signed certificates
94        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
95        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
96
97        // create a fake rpc server
98        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
99        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
100        let dummy_remote_write_url = format!(
101            "http://localhost:{}/v1/push",
102            dummy_remote_write_address.port()
103        );
104
105        let _dummy_remote_write =
106            tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
107
108        // init the tls config and allower
109        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
110        let tls_config = ClientCertVerifier::new(
111            allower.clone(),
112            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
113        )
114        .rustls_server_config(
115            vec![server_priv_cert.rustls_certificate()],
116            server_priv_cert.rustls_private_key(),
117        )
118        .unwrap();
119
120        let client = admin::make_reqwest_client(
121            RemoteWriteConfig {
122                url: dummy_remote_write_url.to_owned(),
123                username: "bar".into(),
124                password: "foo".into(),
125                ..Default::default()
126            },
127            "dummy user agent",
128        );
129
130        let app = admin::app(
131            Labels {
132                network: "unittest-network".into(),
133                inventory_hostname: "ansible_inventory_name".into(),
134            },
135            client,
136            HistogramRelay::new(),
137            Some(allower.clone()),
138            None,
139        );
140
141        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
142        let server_address = listener.local_addr().unwrap();
143        let server_url = format!(
144            "https://localhost:{}/publish/metrics",
145            server_address.port()
146        );
147
148        let acceptor = TlsAcceptor::new(tls_config);
149        let _server = tokio::spawn(async move {
150            admin::server(listener, app, Some(acceptor)).await.unwrap();
151        });
152
153        // build a client
154        let client = reqwest::Client::builder()
155            .add_root_certificate(server_priv_cert.reqwest_certificate())
156            .identity(client_priv_cert.reqwest_identity())
157            .https_only(true)
158            .build()
159            .unwrap();
160
161        // Client request is rejected because it isn't in the allowlist
162        client.get(&server_url).send().await.unwrap_err();
163
164        // Insert the client's public key into the allowlist and verify the request is successful
165        allower.get_sui_mut().write().unwrap().insert(
166            client_pub_key.to_owned(),
167            peers::AllowedPeer {
168                name: "some-node".into(),
169                public_key: client_pub_key.to_owned(),
170            },
171        );
172
173        let mf = create_metric_family(
174            "foo_metric",
175            "some help this is",
176            None,
177            RepeatedField::from_vec(vec![create_metric_counter(
178                RepeatedField::from_vec(create_labels(vec![("some", "label")])),
179                create_counter(2046.0),
180            )]),
181        );
182
183        let mut buf = vec![];
184        let encoder = prometheus::ProtobufEncoder::new();
185        encoder.encode(&[mf], &mut buf).unwrap();
186
187        let res = client
188            .post(&server_url)
189            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
190            .body(buf)
191            .send()
192            .await
193            .expect("expected a successful post with a self-signed certificate");
194        let status = res.status();
195        let body = res.text().await.unwrap();
196        assert_eq!("created", body);
197        assert_eq!(status, reqwest::StatusCode::CREATED);
198    }
199
200    /// this is a long test to ensure we are timing out clients that are slow  
201    #[tokio::test]
202    async fn test_client_timeout() {
203        // generate self-signed certificates
204        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
205        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
206
207        // create a fake rpc server
208        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
209        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
210        let dummy_remote_write_url = format!(
211            "http://localhost:{}/v1/push",
212            dummy_remote_write_address.port()
213        );
214
215        let _dummy_remote_write = tokio::spawn(async move {
216            run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
217        });
218
219        // init the tls config and allower
220        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
221        let tls_config = ClientCertVerifier::new(
222            allower.clone(),
223            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
224        )
225        .rustls_server_config(
226            vec![server_priv_cert.rustls_certificate()],
227            server_priv_cert.rustls_private_key(),
228        )
229        .unwrap();
230
231        let client = admin::make_reqwest_client(
232            RemoteWriteConfig {
233                url: dummy_remote_write_url.to_owned(),
234                username: "bar".into(),
235                password: "foo".into(),
236                ..Default::default()
237            },
238            "dummy user agent",
239        );
240
241        let timeout_secs = Some(2u64);
242
243        let app = admin::app(
244            Labels {
245                network: "unittest-network".into(),
246                inventory_hostname: "ansible_inventory_name".into(),
247            },
248            client,
249            HistogramRelay::new(),
250            Some(allower.clone()),
251            timeout_secs,
252        );
253
254        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
255        let server_address = listener.local_addr().unwrap();
256        let server_url = format!(
257            "https://localhost:{}/publish/metrics",
258            server_address.port()
259        );
260
261        let acceptor = TlsAcceptor::new(tls_config);
262        let _server = tokio::spawn(async move {
263            admin::server(listener, app, Some(acceptor)).await.unwrap();
264        });
265
266        // build a client
267        let client = reqwest::Client::builder()
268            .add_root_certificate(server_priv_cert.reqwest_certificate())
269            .identity(client_priv_cert.reqwest_identity())
270            .https_only(true)
271            .build()
272            .unwrap();
273
274        // Client request is rejected because it isn't in the allowlist
275        client.get(&server_url).send().await.unwrap_err();
276
277        // Insert the client's public key into the allowlist and verify the request is successful
278        allower.get_sui_mut().write().unwrap().insert(
279            client_pub_key.to_owned(),
280            peers::AllowedPeer {
281                name: "some-node".into(),
282                public_key: client_pub_key.to_owned(),
283            },
284        );
285
286        let mf = create_metric_family(
287            "foo_metric",
288            "some help this is",
289            None,
290            RepeatedField::from_vec(vec![create_metric_counter(
291                RepeatedField::from_vec(create_labels(vec![("some", "label")])),
292                create_counter(2046.0),
293            )]),
294        );
295
296        let mut buf = vec![];
297        let encoder = prometheus::ProtobufEncoder::new();
298        encoder.encode(&[mf], &mut buf).unwrap();
299
300        let res = client
301            .post(&server_url)
302            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
303            .body(buf)
304            .send()
305            .await
306            .expect("expected a successful post with a self-signed certificate");
307        let status = res.status();
308        assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
309    }
310}