sui_proxy/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
pub mod admin;
pub mod config;
pub mod consumer;
pub mod handlers;
pub mod histogram_relay;
pub mod metrics;
pub mod middleware;
pub mod peers;
pub mod prom_to_mimir;
pub mod remote_write;

/// var extracts environment variables at runtime with a default fallback value
/// if a default is not provided, the value is simply an empty string if not found
/// This function will return the provided default if env::var cannot find the key
/// or if the key is somehow malformed.
#[macro_export]
macro_rules! var {
    ($key:expr) => {
        match std::env::var($key) {
            Ok(val) => val,
            Err(_) => "".into(),
        }
    };
    ($key:expr, $default:expr) => {
        match std::env::var($key) {
            Ok(val) => val.parse::<_>().unwrap(),
            Err(_) => $default,
        }
    };
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::admin::Labels;
    use crate::histogram_relay::HistogramRelay;
    use crate::prom_to_mimir::tests::*;

    use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
    use axum::http::StatusCode;
    use axum::routing::post;
    use axum::Router;
    use prometheus::Encoder;
    use prometheus::PROTOBUF_FORMAT;
    use protobuf::RepeatedField;
    use std::net::TcpListener;
    use std::time::Duration;
    use sui_tls::{ClientCertVerifier, TlsAcceptor};

    async fn run_dummy_remote_write(listener: TcpListener) {
        /// i accept everything, send me the trash
        async fn handler() -> StatusCode {
            StatusCode::OK
        }

        // build our application with a route
        let app = Router::new().route("/v1/push", post(handler));

        // run it
        listener.set_nonblocking(true).unwrap();
        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
        axum::serve(listener, app).await.unwrap();
    }

    async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
        /// i accept everything, send me the trash, but i will sleep and never return before a timeout
        /// this is for testing slow clients and this is the easiest way to do so without adding a special
        /// route in the server to do so
        async fn handler() -> StatusCode {
            // Simulate a route that hangs while waiting for a client to send data
            // but the server itself doesn't delay its processing
            tokio::time::sleep(Duration::from_secs(60)).await; // A very long sleep
            StatusCode::OK
        }

        // build our application with a route
        let app = Router::new().route("/v1/push", post(handler));

        // run it
        listener.set_nonblocking(true).unwrap();
        let listener = tokio::net::TcpListener::from_std(listener).unwrap();
        axum::serve(listener, app).await.unwrap();
    }

    /// test_axum_acceptor is a basic e2e test that creates a mock remote_write post endpoint and has a simple
    /// sui-node client that posts data to the proxy using the protobuf format.  The server processes this
    /// data and sends it to the mock remote_write which accepts everything.  Future work is to make this more
    /// robust and expand the scope of coverage, probabaly moving this test elsewhere and renaming it.
    #[tokio::test]
    async fn test_axum_acceptor() {
        // generate self-signed certificates
        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());

        // create a fake rpc server
        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
        let dummy_remote_write_url = format!(
            "http://localhost:{}/v1/push",
            dummy_remote_write_address.port()
        );

        let _dummy_remote_write =
            tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });

        // init the tls config and allower
        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
        let tls_config = ClientCertVerifier::new(
            allower.clone(),
            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
        )
        .rustls_server_config(
            vec![server_priv_cert.rustls_certificate()],
            server_priv_cert.rustls_private_key(),
        )
        .unwrap();

        let client = admin::make_reqwest_client(
            RemoteWriteConfig {
                url: dummy_remote_write_url.to_owned(),
                username: "bar".into(),
                password: "foo".into(),
                ..Default::default()
            },
            "dummy user agent",
        );

        let app = admin::app(
            Labels {
                network: "unittest-network".into(),
                inventory_hostname: "ansible_inventory_name".into(),
            },
            client,
            HistogramRelay::new(),
            Some(allower.clone()),
            None,
        );

        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
        let server_address = listener.local_addr().unwrap();
        let server_url = format!(
            "https://localhost:{}/publish/metrics",
            server_address.port()
        );

        let acceptor = TlsAcceptor::new(tls_config);
        let _server = tokio::spawn(async move {
            admin::server(listener, app, Some(acceptor)).await.unwrap();
        });

        // build a client
        let client = reqwest::Client::builder()
            .add_root_certificate(server_priv_cert.reqwest_certificate())
            .identity(client_priv_cert.reqwest_identity())
            .https_only(true)
            .build()
            .unwrap();

        // Client request is rejected because it isn't in the allowlist
        client.get(&server_url).send().await.unwrap_err();

        // Insert the client's public key into the allowlist and verify the request is successful
        allower.get_sui_mut().write().unwrap().insert(
            client_pub_key.to_owned(),
            peers::AllowedPeer {
                name: "some-node".into(),
                public_key: client_pub_key.to_owned(),
            },
        );

        let mf = create_metric_family(
            "foo_metric",
            "some help this is",
            None,
            RepeatedField::from_vec(vec![create_metric_counter(
                RepeatedField::from_vec(create_labels(vec![("some", "label")])),
                create_counter(2046.0),
            )]),
        );

        let mut buf = vec![];
        let encoder = prometheus::ProtobufEncoder::new();
        encoder.encode(&[mf], &mut buf).unwrap();

        let res = client
            .post(&server_url)
            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
            .body(buf)
            .send()
            .await
            .expect("expected a successful post with a self-signed certificate");
        let status = res.status();
        let body = res.text().await.unwrap();
        assert_eq!("created", body);
        assert_eq!(status, reqwest::StatusCode::CREATED);
    }

    /// this is a long test to ensure we are timing out clients that are slow  
    #[tokio::test]
    async fn test_client_timeout() {
        // generate self-signed certificates
        let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
        let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());

        // create a fake rpc server
        let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
        let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
        let dummy_remote_write_url = format!(
            "http://localhost:{}/v1/push",
            dummy_remote_write_address.port()
        );

        let _dummy_remote_write = tokio::spawn(async move {
            run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
        });

        // init the tls config and allower
        let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
        let tls_config = ClientCertVerifier::new(
            allower.clone(),
            sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
        )
        .rustls_server_config(
            vec![server_priv_cert.rustls_certificate()],
            server_priv_cert.rustls_private_key(),
        )
        .unwrap();

        let client = admin::make_reqwest_client(
            RemoteWriteConfig {
                url: dummy_remote_write_url.to_owned(),
                username: "bar".into(),
                password: "foo".into(),
                ..Default::default()
            },
            "dummy user agent",
        );

        let timeout_secs = Some(2u64);

        let app = admin::app(
            Labels {
                network: "unittest-network".into(),
                inventory_hostname: "ansible_inventory_name".into(),
            },
            client,
            HistogramRelay::new(),
            Some(allower.clone()),
            timeout_secs,
        );

        let listener = std::net::TcpListener::bind("localhost:0").unwrap();
        let server_address = listener.local_addr().unwrap();
        let server_url = format!(
            "https://localhost:{}/publish/metrics",
            server_address.port()
        );

        let acceptor = TlsAcceptor::new(tls_config);
        let _server = tokio::spawn(async move {
            admin::server(listener, app, Some(acceptor)).await.unwrap();
        });

        // build a client
        let client = reqwest::Client::builder()
            .add_root_certificate(server_priv_cert.reqwest_certificate())
            .identity(client_priv_cert.reqwest_identity())
            .https_only(true)
            .build()
            .unwrap();

        // Client request is rejected because it isn't in the allowlist
        client.get(&server_url).send().await.unwrap_err();

        // Insert the client's public key into the allowlist and verify the request is successful
        allower.get_sui_mut().write().unwrap().insert(
            client_pub_key.to_owned(),
            peers::AllowedPeer {
                name: "some-node".into(),
                public_key: client_pub_key.to_owned(),
            },
        );

        let mf = create_metric_family(
            "foo_metric",
            "some help this is",
            None,
            RepeatedField::from_vec(vec![create_metric_counter(
                RepeatedField::from_vec(create_labels(vec![("some", "label")])),
                create_counter(2046.0),
            )]),
        );

        let mut buf = vec![];
        let encoder = prometheus::ProtobufEncoder::new();
        encoder.encode(&[mf], &mut buf).unwrap();

        let res = client
            .post(&server_url)
            .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
            .body(buf)
            .send()
            .await
            .expect("expected a successful post with a self-signed certificate");
        let status = res.status();
        assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
    }
}