1pub mod admin;
4pub mod config;
5pub mod consumer;
6pub mod handlers;
7pub mod histogram_relay;
8pub mod metrics;
9pub mod middleware;
10pub mod peers;
11pub mod prom_to_mimir;
12pub mod remote_write;
13
14#[macro_export]
19macro_rules! var {
20 ($key:expr) => {
21 match std::env::var($key) {
22 Ok(val) => val,
23 Err(_) => "".into(),
24 }
25 };
26 ($key:expr, $default:expr) => {
27 match std::env::var($key) {
28 Ok(val) => val.parse::<_>().unwrap(),
29 Err(_) => $default,
30 }
31 };
32}
33
34#[cfg(test)]
35mod tests {
36 use super::*;
37 use crate::admin::Labels;
38 use crate::histogram_relay::HistogramRelay;
39 use crate::prom_to_mimir::tests::*;
40
41 use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
42 use axum::Router;
43 use axum::http::StatusCode;
44 use axum::routing::post;
45 use prometheus::Encoder;
46 use prometheus::PROTOBUF_FORMAT;
47 use std::net::TcpListener;
48 use std::time::Duration;
49 use sui_tls::{ClientCertVerifier, TlsAcceptor};
50
51 async fn run_dummy_remote_write(listener: TcpListener) {
52 async fn handler() -> StatusCode {
54 StatusCode::OK
55 }
56
57 let app = Router::new().route("/v1/push", post(handler));
59
60 listener.set_nonblocking(true).unwrap();
62 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
63 axum::serve(listener, app).await.unwrap();
64 }
65
66 async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
67 async fn handler() -> StatusCode {
71 tokio::time::sleep(Duration::from_secs(60)).await; StatusCode::OK
75 }
76
77 let app = Router::new().route("/v1/push", post(handler));
79
80 listener.set_nonblocking(true).unwrap();
82 let listener = tokio::net::TcpListener::from_std(listener).unwrap();
83 axum::serve(listener, app).await.unwrap();
84 }
85
86 #[tokio::test]
91 async fn test_axum_acceptor() {
92 let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
94 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
95
96 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
98 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
99 let dummy_remote_write_url = format!(
100 "http://localhost:{}/v1/push",
101 dummy_remote_write_address.port()
102 );
103
104 let _dummy_remote_write =
105 tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
106
107 let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
109 let tls_config = ClientCertVerifier::new(
110 allower.clone(),
111 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
112 )
113 .rustls_server_config(
114 vec![server_priv_cert.rustls_certificate()],
115 server_priv_cert.rustls_private_key(),
116 )
117 .unwrap();
118
119 let client = admin::make_reqwest_client(
120 RemoteWriteConfig {
121 url: dummy_remote_write_url.to_owned(),
122 username: "bar".into(),
123 password: "foo".into(),
124 ..Default::default()
125 },
126 "dummy user agent",
127 );
128
129 let app = admin::app(
130 Labels {
131 network: "unittest-network".into(),
132 inventory_hostname: "ansible_inventory_name".into(),
133 },
134 client,
135 HistogramRelay::new(),
136 Some(allower.clone()),
137 None,
138 );
139
140 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
141 let server_address = listener.local_addr().unwrap();
142 let server_url = format!(
143 "https://localhost:{}/publish/metrics",
144 server_address.port()
145 );
146
147 let acceptor = TlsAcceptor::new(tls_config);
148 let _server = tokio::spawn(async move {
149 admin::server(listener, app, Some(acceptor)).await.unwrap();
150 });
151
152 let client = reqwest::Client::builder()
154 .add_root_certificate(server_priv_cert.reqwest_certificate())
155 .identity(client_priv_cert.reqwest_identity())
156 .https_only(true)
157 .build()
158 .unwrap();
159
160 client.get(&server_url).send().await.unwrap_err();
162
163 allower.get_sui_mut().write().unwrap().insert(
165 client_pub_key.to_owned(),
166 peers::AllowedPeer {
167 name: "some-node".into(),
168 public_key: client_pub_key.to_owned(),
169 },
170 );
171
172 let mf = create_metric_family(
173 "foo_metric",
174 "some help this is",
175 None,
176 vec![create_metric_counter(
177 create_labels(vec![("some", "label")]),
178 create_counter(2046.0),
179 )],
180 );
181
182 let mut buf = vec![];
183 let encoder = prometheus::ProtobufEncoder::new();
184 encoder.encode(&[mf], &mut buf).unwrap();
185
186 let res = client
187 .post(&server_url)
188 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
189 .body(buf)
190 .send()
191 .await
192 .expect("expected a successful post with a self-signed certificate");
193 let status = res.status();
194 let body = res.text().await.unwrap();
195 assert_eq!("created", body);
196 assert_eq!(status, reqwest::StatusCode::CREATED);
197 }
198
199 #[tokio::test]
201 async fn test_client_timeout() {
202 let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
204 let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
205
206 let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
208 let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
209 let dummy_remote_write_url = format!(
210 "http://localhost:{}/v1/push",
211 dummy_remote_write_address.port()
212 );
213
214 let _dummy_remote_write = tokio::spawn(async move {
215 run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
216 });
217
218 let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
220 let tls_config = ClientCertVerifier::new(
221 allower.clone(),
222 sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
223 )
224 .rustls_server_config(
225 vec![server_priv_cert.rustls_certificate()],
226 server_priv_cert.rustls_private_key(),
227 )
228 .unwrap();
229
230 let client = admin::make_reqwest_client(
231 RemoteWriteConfig {
232 url: dummy_remote_write_url.to_owned(),
233 username: "bar".into(),
234 password: "foo".into(),
235 ..Default::default()
236 },
237 "dummy user agent",
238 );
239
240 let timeout_secs = Some(2u64);
241
242 let app = admin::app(
243 Labels {
244 network: "unittest-network".into(),
245 inventory_hostname: "ansible_inventory_name".into(),
246 },
247 client,
248 HistogramRelay::new(),
249 Some(allower.clone()),
250 timeout_secs,
251 );
252
253 let listener = std::net::TcpListener::bind("localhost:0").unwrap();
254 let server_address = listener.local_addr().unwrap();
255 let server_url = format!(
256 "https://localhost:{}/publish/metrics",
257 server_address.port()
258 );
259
260 let acceptor = TlsAcceptor::new(tls_config);
261 let _server = tokio::spawn(async move {
262 admin::server(listener, app, Some(acceptor)).await.unwrap();
263 });
264
265 let client = reqwest::Client::builder()
267 .add_root_certificate(server_priv_cert.reqwest_certificate())
268 .identity(client_priv_cert.reqwest_identity())
269 .https_only(true)
270 .build()
271 .unwrap();
272
273 client.get(&server_url).send().await.unwrap_err();
275
276 allower.get_sui_mut().write().unwrap().insert(
278 client_pub_key.to_owned(),
279 peers::AllowedPeer {
280 name: "some-node".into(),
281 public_key: client_pub_key.to_owned(),
282 },
283 );
284
285 let mf = create_metric_family(
286 "foo_metric",
287 "some help this is",
288 None,
289 vec![create_metric_counter(
290 create_labels(vec![("some", "label")]),
291 create_counter(2046.0),
292 )],
293 );
294
295 let mut buf = vec![];
296 let encoder = prometheus::ProtobufEncoder::new();
297 encoder.encode(&[mf], &mut buf).unwrap();
298
299 let res = client
300 .post(&server_url)
301 .header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
302 .body(buf)
303 .send()
304 .await
305 .expect("expected a successful post with a self-signed certificate");
306 let status = res.status();
307 assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
308 }
309}