1use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7 expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use std::fs;
17use std::io::BufReader;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_tls::SUI_VALIDATOR_SERVER_NAME;
22use sui_tls::{
23 AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
24};
25use tokio::signal;
26use tower::ServiceBuilder;
27use tower_http::{
28 LatencyUnit,
29 timeout::TimeoutLayer,
30 trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
31};
32use tracing::{Level, info};
33
34pub async fn shutdown_signal(h: axum_server::Handle<SocketAddr>) {
36 let ctrl_c = async {
37 signal::ctrl_c()
38 .await
39 .expect("failed to install Ctrl+C handler");
40 };
41
42 #[cfg(unix)]
43 let terminate = async {
44 signal::unix::signal(signal::unix::SignalKind::terminate())
45 .expect("failed to install signal handler")
46 .recv()
47 .await;
48 };
49
50 #[cfg(not(unix))]
51 let terminate = std::future::pending::<()>();
52
53 tokio::select! {
54 _ = ctrl_c => {},
55 _ = terminate => {},
56 }
57
58 let grace = 30;
59 info!(
60 "signal received, starting graceful shutdown, grace period {} seconds, if needed",
61 &grace
62 );
63 h.graceful_shutdown(Some(Duration::from_secs(grace)))
64}
65
66#[derive(Clone)]
70pub struct ReqwestClient {
71 pub client: reqwest::Client,
72 pub settings: RemoteWriteConfig,
73}
74
75pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
76 ReqwestClient {
77 client: reqwest::Client::builder()
78 .user_agent(user_agent)
79 .pool_max_idle_per_host(settings.pool_max_idle_per_host)
80 .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
81 .build()
82 .expect("cannot create reqwest client"),
83 settings,
84 }
85}
86
87#[derive(Clone)]
89pub struct Labels {
90 pub network: String,
91 pub inventory_hostname: String,
92}
93
94pub fn app(
96 labels: Labels,
97 client: ReqwestClient,
98 relay: HistogramRelay,
99 allower: Option<SuiNodeProvider>,
100 timeout_secs: Option<u64>,
101) -> Router {
102 let mut router = Router::new()
104 .route("/publish/metrics", post(publish_metrics))
105 .route_layer(DefaultBodyLimit::max(var!(
106 "MAX_BODY_SIZE",
107 1024 * 1024 * 5
108 )))
109 .route_layer(middleware::from_fn(expect_mysten_proxy_header))
110 .route_layer(middleware::from_fn(expect_content_length));
111 if let Some(allower) = allower {
112 router = router
113 .route_layer(middleware::from_fn(expect_valid_public_key))
114 .layer(Extension(Arc::new(allower)));
115 }
116 router
117 .layer(TimeoutLayer::new(Duration::from_secs(
121 timeout_secs.unwrap_or(20),
122 )))
123 .layer(Extension(relay))
124 .layer(Extension(labels))
125 .layer(Extension(client))
126 .layer(
127 ServiceBuilder::new().layer(
128 TraceLayer::new_for_http()
129 .on_response(
130 DefaultOnResponse::new()
131 .level(Level::INFO)
132 .latency_unit(LatencyUnit::Seconds),
133 )
134 .on_failure(
135 DefaultOnFailure::new()
136 .level(Level::ERROR)
137 .latency_unit(LatencyUnit::Seconds),
138 ),
139 ),
140 )
141}
142
143pub async fn server(
145 listener: std::net::TcpListener,
146 app: Router,
147 acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149 listener.set_nonblocking(true)?;
150 let listener = tokio::net::TcpListener::from_std(listener)?;
151
152 let handle = axum_server::Handle::new();
154 tokio::spawn(shutdown_signal(handle.clone()));
156
157 if let Some(verify_peers) = acceptor {
158 axum_server::Server::from_listener(listener)
159 .acceptor(verify_peers)
160 .handle(handle)
161 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
162 .await
163 } else {
164 axum_server::Server::from_listener(listener)
165 .handle(handle)
166 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
167 .await
168 }
169}
170
171pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
173
174pub fn generate_self_cert(hostname: String) -> CertKeyPair {
176 let mut rng = rand::thread_rng();
177 let keypair = Ed25519KeyPair::generate(&mut rng);
178 CertKeyPair(
179 SelfSignedCertificate::new(keypair.copy().private(), &hostname),
180 keypair.public().to_owned(),
181 )
182}
183
184fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
186 let certfile = fs::File::open(filename)
187 .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
188 let mut reader = BufReader::new(certfile);
189 rustls_pemfile::certs(&mut reader)
190 .collect::<Result<Vec<_>, _>>()
191 .unwrap()
192}
193
194fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
196 let keyfile = fs::File::open(filename)
197 .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
198 let mut reader = BufReader::new(keyfile);
199
200 loop {
201 match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
202 Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
203 Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
204 Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
205 None => break,
206 _ => {}
207 }
208 }
209
210 panic!(
211 "no keys found in {:?} (encrypted keys not supported)",
212 filename
213 );
214}
215
216fn load_static_peers(
218 static_peers: Option<StaticPeerValidationConfig>,
219) -> Result<Vec<AllowedPeer>, Error> {
220 let Some(static_peers) = static_peers else {
221 return Ok(vec![]);
222 };
223 let static_keys = static_peers
224 .pub_keys
225 .into_iter()
226 .map(|spk| {
227 let peer_id = hex::decode(spk.peer_id).unwrap();
228 let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
229 let s = AllowedPeer {
230 name: spk.name.clone(),
231 public_key,
232 };
233 info!(
234 "loaded static peer: {} public key: {}",
235 &s.name, &s.public_key,
236 );
237 s
238 })
239 .collect();
240 Ok(static_keys)
241}
242
243pub fn create_server_cert_default_allow(
245 hostname: String,
246) -> Result<ServerConfig, sui_tls::rustls::Error> {
247 let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
248
249 ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
250 vec![server_certificate.rustls_certificate()],
251 server_certificate.rustls_private_key(),
252 )
253}
254
255pub fn create_server_cert_enforce_peer(
258 dynamic_peers: DynamicPeerValidationConfig,
259 static_peers: Option<StaticPeerValidationConfig>,
260) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
261 let (Some(certificate_path), Some(private_key_path)) =
262 (dynamic_peers.certificate_file, dynamic_peers.private_key)
263 else {
264 return Err(sui_tls::rustls::Error::General(
265 "missing certs to initialize server".into(),
266 ));
267 };
268 let static_peers = load_static_peers(static_peers).map_err(|e| {
269 sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
270 })?;
271 let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
272 allower.poll_peer_list();
273 let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
274 .rustls_server_config(
275 load_certs(&certificate_path),
276 load_private_key(&private_key_path),
277 )?;
278 Ok((c, Some(allower)))
279}