1use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7 expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use rustls::pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject};
17use std::net::SocketAddr;
18use std::sync::Arc;
19use std::time::Duration;
20use sui_tls::SUI_VALIDATOR_SERVER_NAME;
21use sui_tls::{
22 AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
23};
24use tokio::signal;
25use tower::ServiceBuilder;
26use tower_http::{
27 LatencyUnit,
28 timeout::TimeoutLayer,
29 trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
30};
31use tracing::{Level, info};
32
33pub async fn shutdown_signal(h: axum_server::Handle<SocketAddr>) {
35 let ctrl_c = async {
36 signal::ctrl_c()
37 .await
38 .expect("failed to install Ctrl+C handler");
39 };
40
41 #[cfg(unix)]
42 let terminate = async {
43 signal::unix::signal(signal::unix::SignalKind::terminate())
44 .expect("failed to install signal handler")
45 .recv()
46 .await;
47 };
48
49 #[cfg(not(unix))]
50 let terminate = std::future::pending::<()>();
51
52 tokio::select! {
53 _ = ctrl_c => {},
54 _ = terminate => {},
55 }
56
57 let grace = 30;
58 info!(
59 "signal received, starting graceful shutdown, grace period {} seconds, if needed",
60 &grace
61 );
62 h.graceful_shutdown(Some(Duration::from_secs(grace)))
63}
64
65#[derive(Clone)]
69pub struct ReqwestClient {
70 pub client: reqwest::Client,
71 pub settings: RemoteWriteConfig,
72}
73
74pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
75 ReqwestClient {
76 client: reqwest::Client::builder()
77 .user_agent(user_agent)
78 .pool_max_idle_per_host(settings.pool_max_idle_per_host)
79 .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
80 .build()
81 .expect("cannot create reqwest client"),
82 settings,
83 }
84}
85
86#[derive(Clone)]
88pub struct Labels {
89 pub network: String,
90 pub inventory_hostname: String,
91}
92
93pub fn app(
95 labels: Labels,
96 client: ReqwestClient,
97 relay: HistogramRelay,
98 allower: Option<SuiNodeProvider>,
99 timeout_secs: Option<u64>,
100) -> Router {
101 let mut router = Router::new()
103 .route("/publish/metrics", post(publish_metrics))
104 .route_layer(DefaultBodyLimit::max(var!(
105 "MAX_BODY_SIZE",
106 1024 * 1024 * 5
107 )))
108 .route_layer(middleware::from_fn(expect_mysten_proxy_header))
109 .route_layer(middleware::from_fn(expect_content_length));
110 if let Some(allower) = allower {
111 router = router
112 .route_layer(middleware::from_fn(expect_valid_public_key))
113 .layer(Extension(Arc::new(allower)));
114 }
115 router
116 .layer(TimeoutLayer::new(Duration::from_secs(
120 timeout_secs.unwrap_or(20),
121 )))
122 .layer(Extension(relay))
123 .layer(Extension(labels))
124 .layer(Extension(client))
125 .layer(
126 ServiceBuilder::new().layer(
127 TraceLayer::new_for_http()
128 .on_response(
129 DefaultOnResponse::new()
130 .level(Level::INFO)
131 .latency_unit(LatencyUnit::Seconds),
132 )
133 .on_failure(
134 DefaultOnFailure::new()
135 .level(Level::ERROR)
136 .latency_unit(LatencyUnit::Seconds),
137 ),
138 ),
139 )
140}
141
142pub async fn server(
144 listener: std::net::TcpListener,
145 app: Router,
146 acceptor: Option<TlsAcceptor>,
147) -> std::io::Result<()> {
148 listener.set_nonblocking(true)?;
149 let listener = tokio::net::TcpListener::from_std(listener)?;
150
151 let handle = axum_server::Handle::new();
153 tokio::spawn(shutdown_signal(handle.clone()));
155
156 if let Some(verify_peers) = acceptor {
157 axum_server::Server::from_listener(listener)
158 .acceptor(verify_peers)
159 .handle(handle)
160 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
161 .await
162 } else {
163 axum_server::Server::from_listener(listener)
164 .handle(handle)
165 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
166 .await
167 }
168}
169
170pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
172
173pub fn generate_self_cert(hostname: String) -> CertKeyPair {
175 let mut rng = rand::thread_rng();
176 let keypair = Ed25519KeyPair::generate(&mut rng);
177 CertKeyPair(
178 SelfSignedCertificate::new(keypair.copy().private(), &hostname),
179 keypair.public().to_owned(),
180 )
181}
182
183fn load_certs(filename: &str) -> Vec<CertificateDer<'static>> {
185 CertificateDer::pem_file_iter(filename)
186 .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e))
187 .collect::<Result<Vec<_>, _>>()
188 .unwrap_or_else(|e| panic!("cannot parse certificate file: {}; {}", filename, e))
189}
190
191fn load_private_key(filename: &str) -> PrivateKeyDer<'static> {
193 PrivateKeyDer::from_pem_file(filename).unwrap_or_else(|e| {
194 panic!(
195 "cannot load private key from {} (encrypted keys not supported): {}",
196 filename, e
197 )
198 })
199}
200
201fn load_static_peers(
203 static_peers: Option<StaticPeerValidationConfig>,
204) -> Result<Vec<AllowedPeer>, Error> {
205 let Some(static_peers) = static_peers else {
206 return Ok(vec![]);
207 };
208 let static_keys = static_peers
209 .pub_keys
210 .into_iter()
211 .map(|spk| {
212 let peer_id = hex::decode(spk.peer_id).unwrap();
213 let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
214 let s = AllowedPeer {
215 name: spk.name.clone(),
216 public_key,
217 };
218 info!(
219 "loaded static peer: {} public key: {}",
220 &s.name, &s.public_key,
221 );
222 s
223 })
224 .collect();
225 Ok(static_keys)
226}
227
228pub fn create_server_cert_default_allow(
230 hostname: String,
231) -> Result<ServerConfig, sui_tls::rustls::Error> {
232 let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
233
234 ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
235 vec![server_certificate.rustls_certificate()],
236 server_certificate.rustls_private_key(),
237 )
238}
239
240pub fn create_server_cert_enforce_peer(
243 dynamic_peers: DynamicPeerValidationConfig,
244 static_peers: Option<StaticPeerValidationConfig>,
245) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
246 let (Some(certificate_path), Some(private_key_path)) =
247 (dynamic_peers.certificate_file, dynamic_peers.private_key)
248 else {
249 return Err(sui_tls::rustls::Error::General(
250 "missing certs to initialize server".into(),
251 ));
252 };
253 let static_peers = load_static_peers(static_peers).map_err(|e| {
254 sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
255 })?;
256 let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
257 allower.poll_peer_list();
258 let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
259 .rustls_server_config(
260 load_certs(&certificate_path),
261 load_private_key(&private_key_path),
262 )?;
263 Ok((c, Some(allower)))
264}