1use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7 expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use std::fs;
17use std::io::BufReader;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_tls::SUI_VALIDATOR_SERVER_NAME;
22use sui_tls::{
23 AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
24};
25use tokio::signal;
26use tower::ServiceBuilder;
27use tower_http::{
28 LatencyUnit,
29 timeout::TimeoutLayer,
30 trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
31};
32use tracing::{Level, info};
33
34pub async fn shutdown_signal(h: axum_server::Handle) {
36 let ctrl_c = async {
37 signal::ctrl_c()
38 .await
39 .expect("failed to install Ctrl+C handler");
40 };
41
42 #[cfg(unix)]
43 let terminate = async {
44 signal::unix::signal(signal::unix::SignalKind::terminate())
45 .expect("failed to install signal handler")
46 .recv()
47 .await;
48 };
49
50 #[cfg(not(unix))]
51 let terminate = std::future::pending::<()>();
52
53 tokio::select! {
54 _ = ctrl_c => {},
55 _ = terminate => {},
56 }
57
58 let grace = 30;
59 info!(
60 "signal received, starting graceful shutdown, grace period {} seconds, if needed",
61 &grace
62 );
63 h.graceful_shutdown(Some(Duration::from_secs(grace)))
64}
65
66#[derive(Clone)]
70pub struct ReqwestClient {
71 pub client: reqwest::Client,
72 pub settings: RemoteWriteConfig,
73}
74
75pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
76 ReqwestClient {
77 client: reqwest::Client::builder()
78 .user_agent(user_agent)
79 .pool_max_idle_per_host(settings.pool_max_idle_per_host)
80 .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
81 .build()
82 .expect("cannot create reqwest client"),
83 settings,
84 }
85}
86
87#[derive(Clone)]
89pub struct Labels {
90 pub network: String,
91 pub inventory_hostname: String,
92}
93
94pub fn app(
96 labels: Labels,
97 client: ReqwestClient,
98 relay: HistogramRelay,
99 allower: Option<SuiNodeProvider>,
100 timeout_secs: Option<u64>,
101) -> Router {
102 let mut router = Router::new()
104 .route("/publish/metrics", post(publish_metrics))
105 .route_layer(DefaultBodyLimit::max(var!(
106 "MAX_BODY_SIZE",
107 1024 * 1024 * 5
108 )))
109 .route_layer(middleware::from_fn(expect_mysten_proxy_header))
110 .route_layer(middleware::from_fn(expect_content_length));
111 if let Some(allower) = allower {
112 router = router
113 .route_layer(middleware::from_fn(expect_valid_public_key))
114 .layer(Extension(Arc::new(allower)));
115 }
116 router
117 .layer(TimeoutLayer::new(Duration::from_secs(
121 timeout_secs.unwrap_or(20),
122 )))
123 .layer(Extension(relay))
124 .layer(Extension(labels))
125 .layer(Extension(client))
126 .layer(
127 ServiceBuilder::new().layer(
128 TraceLayer::new_for_http()
129 .on_response(
130 DefaultOnResponse::new()
131 .level(Level::INFO)
132 .latency_unit(LatencyUnit::Seconds),
133 )
134 .on_failure(
135 DefaultOnFailure::new()
136 .level(Level::ERROR)
137 .latency_unit(LatencyUnit::Seconds),
138 ),
139 ),
140 )
141}
142
143pub async fn server(
145 listener: std::net::TcpListener,
146 app: Router,
147 acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149 let handle = axum_server::Handle::new();
151 tokio::spawn(shutdown_signal(handle.clone()));
153
154 if let Some(verify_peers) = acceptor {
155 axum_server::Server::from_tcp(listener)
156 .acceptor(verify_peers)
157 .handle(handle)
158 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
159 .await
160 } else {
161 axum_server::Server::from_tcp(listener)
162 .handle(handle)
163 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
164 .await
165 }
166}
167
168pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
170
171pub fn generate_self_cert(hostname: String) -> CertKeyPair {
173 let mut rng = rand::thread_rng();
174 let keypair = Ed25519KeyPair::generate(&mut rng);
175 CertKeyPair(
176 SelfSignedCertificate::new(keypair.copy().private(), &hostname),
177 keypair.public().to_owned(),
178 )
179}
180
181fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
183 let certfile = fs::File::open(filename)
184 .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
185 let mut reader = BufReader::new(certfile);
186 rustls_pemfile::certs(&mut reader)
187 .collect::<Result<Vec<_>, _>>()
188 .unwrap()
189}
190
191fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
193 let keyfile = fs::File::open(filename)
194 .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
195 let mut reader = BufReader::new(keyfile);
196
197 loop {
198 match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
199 Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
200 Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
201 Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
202 None => break,
203 _ => {}
204 }
205 }
206
207 panic!(
208 "no keys found in {:?} (encrypted keys not supported)",
209 filename
210 );
211}
212
213fn load_static_peers(
215 static_peers: Option<StaticPeerValidationConfig>,
216) -> Result<Vec<AllowedPeer>, Error> {
217 let Some(static_peers) = static_peers else {
218 return Ok(vec![]);
219 };
220 let static_keys = static_peers
221 .pub_keys
222 .into_iter()
223 .map(|spk| {
224 let peer_id = hex::decode(spk.peer_id).unwrap();
225 let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
226 let s = AllowedPeer {
227 name: spk.name.clone(),
228 public_key,
229 };
230 info!(
231 "loaded static peer: {} public key: {}",
232 &s.name, &s.public_key,
233 );
234 s
235 })
236 .collect();
237 Ok(static_keys)
238}
239
240pub fn create_server_cert_default_allow(
242 hostname: String,
243) -> Result<ServerConfig, sui_tls::rustls::Error> {
244 let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
245
246 ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
247 vec![server_certificate.rustls_certificate()],
248 server_certificate.rustls_private_key(),
249 )
250}
251
252pub fn create_server_cert_enforce_peer(
255 dynamic_peers: DynamicPeerValidationConfig,
256 static_peers: Option<StaticPeerValidationConfig>,
257) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
258 let (Some(certificate_path), Some(private_key_path)) =
259 (dynamic_peers.certificate_file, dynamic_peers.private_key)
260 else {
261 return Err(sui_tls::rustls::Error::General(
262 "missing certs to initialize server".into(),
263 ));
264 };
265 let static_peers = load_static_peers(static_peers).map_err(|e| {
266 sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
267 })?;
268 let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
269 allower.poll_peer_list();
270 let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
271 .rustls_server_config(
272 load_certs(&certificate_path),
273 load_private_key(&private_key_path),
274 )?;
275 Ok((c, Some(allower)))
276}