pub mod admin;
pub mod config;
pub mod consumer;
pub mod handlers;
pub mod histogram_relay;
pub mod metrics;
pub mod middleware;
pub mod peers;
pub mod prom_to_mimir;
pub mod remote_write;
#[macro_export]
macro_rules! var {
($key:expr) => {
match std::env::var($key) {
Ok(val) => val,
Err(_) => "".into(),
}
};
($key:expr, $default:expr) => {
match std::env::var($key) {
Ok(val) => val.parse::<_>().unwrap(),
Err(_) => $default,
}
};
}
#[cfg(test)]
mod tests {
use super::*;
use crate::admin::Labels;
use crate::histogram_relay::HistogramRelay;
use crate::prom_to_mimir::tests::*;
use crate::{admin::CertKeyPair, config::RemoteWriteConfig, peers::SuiNodeProvider};
use axum::http::StatusCode;
use axum::routing::post;
use axum::Router;
use prometheus::Encoder;
use prometheus::PROTOBUF_FORMAT;
use protobuf::RepeatedField;
use std::net::TcpListener;
use std::time::Duration;
use sui_tls::{ClientCertVerifier, TlsAcceptor};
async fn run_dummy_remote_write(listener: TcpListener) {
async fn handler() -> StatusCode {
StatusCode::OK
}
let app = Router::new().route("/v1/push", post(handler));
listener.set_nonblocking(true).unwrap();
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn run_dummy_remote_write_very_slow(listener: TcpListener) {
async fn handler() -> StatusCode {
tokio::time::sleep(Duration::from_secs(60)).await; StatusCode::OK
}
let app = Router::new().route("/v1/push", post(handler));
listener.set_nonblocking(true).unwrap();
let listener = tokio::net::TcpListener::from_std(listener).unwrap();
axum::serve(listener, app).await.unwrap();
}
#[tokio::test]
async fn test_axum_acceptor() {
let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
let dummy_remote_write_url = format!(
"http://localhost:{}/v1/push",
dummy_remote_write_address.port()
);
let _dummy_remote_write =
tokio::spawn(async move { run_dummy_remote_write(dummy_remote_write_listener).await });
let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
let tls_config = ClientCertVerifier::new(
allower.clone(),
sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
)
.rustls_server_config(
vec![server_priv_cert.rustls_certificate()],
server_priv_cert.rustls_private_key(),
)
.unwrap();
let client = admin::make_reqwest_client(
RemoteWriteConfig {
url: dummy_remote_write_url.to_owned(),
username: "bar".into(),
password: "foo".into(),
..Default::default()
},
"dummy user agent",
);
let app = admin::app(
Labels {
network: "unittest-network".into(),
inventory_hostname: "ansible_inventory_name".into(),
},
client,
HistogramRelay::new(),
Some(allower.clone()),
None,
);
let listener = std::net::TcpListener::bind("localhost:0").unwrap();
let server_address = listener.local_addr().unwrap();
let server_url = format!(
"https://localhost:{}/publish/metrics",
server_address.port()
);
let acceptor = TlsAcceptor::new(tls_config);
let _server = tokio::spawn(async move {
admin::server(listener, app, Some(acceptor)).await.unwrap();
});
let client = reqwest::Client::builder()
.add_root_certificate(server_priv_cert.reqwest_certificate())
.identity(client_priv_cert.reqwest_identity())
.https_only(true)
.build()
.unwrap();
client.get(&server_url).send().await.unwrap_err();
allower.get_sui_mut().write().unwrap().insert(
client_pub_key.to_owned(),
peers::AllowedPeer {
name: "some-node".into(),
public_key: client_pub_key.to_owned(),
},
);
let mf = create_metric_family(
"foo_metric",
"some help this is",
None,
RepeatedField::from_vec(vec![create_metric_counter(
RepeatedField::from_vec(create_labels(vec![("some", "label")])),
create_counter(2046.0),
)]),
);
let mut buf = vec![];
let encoder = prometheus::ProtobufEncoder::new();
encoder.encode(&[mf], &mut buf).unwrap();
let res = client
.post(&server_url)
.header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
.body(buf)
.send()
.await
.expect("expected a successful post with a self-signed certificate");
let status = res.status();
let body = res.text().await.unwrap();
assert_eq!("created", body);
assert_eq!(status, reqwest::StatusCode::CREATED);
}
#[tokio::test]
async fn test_client_timeout() {
let CertKeyPair(client_priv_cert, client_pub_key) = admin::generate_self_cert("sui".into());
let CertKeyPair(server_priv_cert, _) = admin::generate_self_cert("localhost".into());
let dummy_remote_write_listener = std::net::TcpListener::bind("localhost:0").unwrap();
let dummy_remote_write_address = dummy_remote_write_listener.local_addr().unwrap();
let dummy_remote_write_url = format!(
"http://localhost:{}/v1/push",
dummy_remote_write_address.port()
);
let _dummy_remote_write = tokio::spawn(async move {
run_dummy_remote_write_very_slow(dummy_remote_write_listener).await
});
let mut allower = SuiNodeProvider::new("".into(), Duration::from_secs(30), vec![]);
let tls_config = ClientCertVerifier::new(
allower.clone(),
sui_tls::SUI_VALIDATOR_SERVER_NAME.to_string(),
)
.rustls_server_config(
vec![server_priv_cert.rustls_certificate()],
server_priv_cert.rustls_private_key(),
)
.unwrap();
let client = admin::make_reqwest_client(
RemoteWriteConfig {
url: dummy_remote_write_url.to_owned(),
username: "bar".into(),
password: "foo".into(),
..Default::default()
},
"dummy user agent",
);
let timeout_secs = Some(2u64);
let app = admin::app(
Labels {
network: "unittest-network".into(),
inventory_hostname: "ansible_inventory_name".into(),
},
client,
HistogramRelay::new(),
Some(allower.clone()),
timeout_secs,
);
let listener = std::net::TcpListener::bind("localhost:0").unwrap();
let server_address = listener.local_addr().unwrap();
let server_url = format!(
"https://localhost:{}/publish/metrics",
server_address.port()
);
let acceptor = TlsAcceptor::new(tls_config);
let _server = tokio::spawn(async move {
admin::server(listener, app, Some(acceptor)).await.unwrap();
});
let client = reqwest::Client::builder()
.add_root_certificate(server_priv_cert.reqwest_certificate())
.identity(client_priv_cert.reqwest_identity())
.https_only(true)
.build()
.unwrap();
client.get(&server_url).send().await.unwrap_err();
allower.get_sui_mut().write().unwrap().insert(
client_pub_key.to_owned(),
peers::AllowedPeer {
name: "some-node".into(),
public_key: client_pub_key.to_owned(),
},
);
let mf = create_metric_family(
"foo_metric",
"some help this is",
None,
RepeatedField::from_vec(vec![create_metric_counter(
RepeatedField::from_vec(create_labels(vec![("some", "label")])),
create_counter(2046.0),
)]),
);
let mut buf = vec![];
let encoder = prometheus::ProtobufEncoder::new();
encoder.encode(&[mf], &mut buf).unwrap();
let res = client
.post(&server_url)
.header(reqwest::header::CONTENT_TYPE, PROTOBUF_FORMAT)
.body(buf)
.send()
.await
.expect("expected a successful post with a self-signed certificate");
let status = res.status();
assert_eq!(status, StatusCode::REQUEST_TIMEOUT);
}
}