1pub mod admin;
4pub mod config;
5pub mod consumer;
6pub mod handlers;
7pub mod histogram_relay;
8pub mod metrics;
9pub mod middleware;
10pub mod peers;
11pub mod prom_to_mimir;
12pub mod remote_write;
13
14#[macro_export]
19macro_rules! var {
20 ($key:expr) => {
21 match std::env::var($key) {
22 Ok(val) => val,
23 Err(_) => "".into(),
24 }
25 };
26 ($key:expr, $default:expr) => {
27 match std::env::var($key) {
28 Ok(val) => val.parse::<_>().unwrap(),
29 Err(_) => $default,
30 }
31 };
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::admin::Labels;
38 use crate::histogram_relay::HistogramRelay;
39 use crate::prom_to_mimir::tests::*;
40
41 use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
42 use axum::Router;
43 use axum::http::StatusCode;
44 use axum::routing::post;
45 use prometheus::Encoder;
46 use prometheus::PROTOBUF_FORMAT;
47 use protobuf::RepeatedField;
48 use std::net::TcpListener;
49 use std::time::Duration;
50 use sui_tls::{ClientCertVerifier, TlsAcceptor};
51
52 async fn run_dummy_remote_write(listener: TcpListener) {
53 async fn handler() -> StatusCode {
55 StatusCode::OK
56 }
57
58 let app = Router::new().route("/v1/push", post(handler));
60
61 listener.set_nonblocking(true).unwrap();
63 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
64 axum::serve(listener, app).await.unwrap();
65 }
66
67 async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
68 async fn handler() -> StatusCode {
72 tokio::time::sleep(Duration::from_secs(60)).await; StatusCode::OK
76 }
77
78 let app = Router::new().route("/v1/push", post(handler));
80
81 listener.set_nonblocking(true).unwrap();
83 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
84 axum::serve(listener, app).await.unwrap();
85 }
86
87 #[tokio::test]
92 async fn test_axum_acceptor() {
93 let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
95 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
96
97 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
99 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
100 let dummy_remote_write_url = format!(
101 "http://localhost:{}/v1/push",
102 dummy_remote_write_address.port()
103 );
104
105 let _dummy_remote_write =
106 tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
107
108 let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
110 let tls_config = ClientCertVerifier::new(
111 allower.clone(),
112 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
113 )
114 .rustls_server_config(
115 vec![server_priv_cert.rustls_certificate()],
116 server_priv_cert.rustls_private_key(),
117 )
118 .unwrap();
119
120 let client = admin::make_reqwest_client(
121 RemoteWriteConfig {
122 url: dummy_remote_write_url.to_owned(),
123 username: "bar".into(),
124 password: "foo".into(),
125 ..Default::default()
126 },
127 "dummy user agent",
128 );
129
130 let app = admin::app(
131 Labels {
132 network: "unittest-network".into(),
133 inventory_hostname: "ansible_inventory_name".into(),
134 },
135 client,
136 HistogramRelay::new(),
137 Some(allower.clone()),
138 None,
139 );
140
141 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
142 let server_address = listener.local_addr().unwrap();
143 let server_url = format!(
144 "https://localhost:{}/publish/metrics",
145 server_address.port()
146 );
147
148 let acceptor = TlsAcceptor::new(tls_config);
149 let _server = tokio::spawn(async move {
150 admin::server(listener, app, Some(acceptor)).await.unwrap();
151 });
152
153 let client = reqwest::Client::builder()
155 .add_root_certificate(server_priv_cert.reqwest_certificate())
156 .identity(client_priv_cert.reqwest_identity())
157 .https_only(true)
158 .build()
159 .unwrap();
160
161 client.get(&server_url).send().await.unwrap_err();
163
164 allower.get_sui_mut().write().unwrap().insert(
166 client_pub_key.to_owned(),
167 peers::AllowedPeer {
168 name: "some-node".into(),
169 public_key: client_pub_key.to_owned(),
170 },
171 );
172
173 let mf = create_metric_family(
174 "foo_metric",
175 "some help this is",
176 None,
177 RepeatedField::from_vec(vec![create_metric_counter(
178 RepeatedField::from_vec(create_labels(vec![("some", "label")])),
179 create_counter(2046.0),
180 )]),
181 );
182
183 let mut buf = vec![];
184 let encoder = prometheus::ProtobufEncoder::new();
185 encoder.encode(&[mf], &mut buf).unwrap();
186
187 let res = client
188 .post(&server_url)
189 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
190 .body(buf)
191 .send()
192 .await
193 .expect("expected a successful post with a self-signed certificate");
194 let status = res.status();
195 let body = res.text().await.unwrap();
196 assert_eq!("created", body);
197 assert_eq!(status, reqwest::StatusCode::CREATED);
198 }
199
200 #[tokio::test]
202 async fn test_client_timeout() {
203 let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
205 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
206
207 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
209 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
210 let dummy_remote_write_url = format!(
211 "http://localhost:{}/v1/push",
212 dummy_remote_write_address.port()
213 );
214
215 let _dummy_remote_write = tokio::spawn(async move {
216 run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
217 });
218
219 let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
221 let tls_config = ClientCertVerifier::new(
222 allower.clone(),
223 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
224 )
225 .rustls_server_config(
226 vec![server_priv_cert.rustls_certificate()],
227 server_priv_cert.rustls_private_key(),
228 )
229 .unwrap();
230
231 let client = admin::make_reqwest_client(
232 RemoteWriteConfig {
233 url: dummy_remote_write_url.to_owned(),
234 username: "bar".into(),
235 password: "foo".into(),
236 ..Default::default()
237 },
238 "dummy user agent",
239 );
240
241 let timeout_secs = Some(2u64);
242
243 let app = admin::app(
244 Labels {
245 network: "unittest-network".into(),
246 inventory_hostname: "ansible_inventory_name".into(),
247 },
248 client,
249 HistogramRelay::new(),
250 Some(allower.clone()),
251 timeout_secs,
252 );
253
254 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
255 let server_address = listener.local_addr().unwrap();
256 let server_url = format!(
257 "https://localhost:{}/publish/metrics",
258 server_address.port()
259 );
260
261 let acceptor = TlsAcceptor::new(tls_config);
262 let _server = tokio::spawn(async move {
263 admin::server(listener, app, Some(acceptor)).await.unwrap();
264 });
265
266 let client = reqwest::Client::builder()
268 .add_root_certificate(server_priv_cert.reqwest_certificate())
269 .identity(client_priv_cert.reqwest_identity())
270 .https_only(true)
271 .build()
272 .unwrap();
273
274 client.get(&server_url).send().await.unwrap_err();
276
277 allower.get_sui_mut().write().unwrap().insert(
279 client_pub_key.to_owned(),
280 peers::AllowedPeer {
281 name: "some-node".into(),
282 public_key: client_pub_key.to_owned(),
283 },
284 );
285
286 let mf = create_metric_family(
287 "foo_metric",
288 "some help this is",
289 None,
290 RepeatedField::from_vec(vec![create_metric_counter(
291 RepeatedField::from_vec(create_labels(vec![("some", "label")])),
292 create_counter(2046.0),
293 )]),
294 );
295
296 let mut buf = vec![];
297 let encoder = prometheus::ProtobufEncoder::new();
298 encoder.encode(&[mf], &mut buf).unwrap();
299
300 let res = client
301 .post(&server_url)
302 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
303 .body(buf)
304 .send()
305 .await
306 .expect("expected a successful post with a self-signed certificate");
307 let status = res.status();
308 assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
309 }
310}