sui_proxy/
admin.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7    expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use std::fs;
17use std::io::BufReader;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_tls::SUI_VALIDATOR_SERVER_NAME;
22use sui_tls::{
23    AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
24};
25use tokio::signal;
26use tower::ServiceBuilder;
27use tower_http::{
28    LatencyUnit,
29    timeout::TimeoutLayer,
30    trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
31};
32use tracing::{Level, info};
33
34/// Configure our graceful shutdown scenarios
35pub async fn shutdown_signal(h: axum_server::Handle<SocketAddr>) {
36    let ctrl_c = async {
37        signal::ctrl_c()
38            .await
39            .expect("failed to install Ctrl+C handler");
40    };
41
42    #[cfg(unix)]
43    let terminate = async {
44        signal::unix::signal(signal::unix::SignalKind::terminate())
45            .expect("failed to install signal handler")
46            .recv()
47            .await;
48    };
49
50    #[cfg(not(unix))]
51    let terminate = std::future::pending::<()>();
52
53    tokio::select! {
54        _ = ctrl_c => {},
55        _ = terminate => {},
56    }
57
58    let grace = 30;
59    info!(
60        "signal received, starting graceful shutdown, grace period {} seconds, if needed",
61        &grace
62    );
63    h.graceful_shutdown(Some(Duration::from_secs(grace)))
64}
65
66/// Reqwest client holds the global client for remote_push api calls
67/// it also holds the username and password.  The client has an underlying
68/// connection pool.  See reqwest documentation for details
69#[derive(Clone)]
70pub struct ReqwestClient {
71    pub client: reqwest::Client,
72    pub settings: RemoteWriteConfig,
73}
74
75pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
76    ReqwestClient {
77        client: reqwest::Client::builder()
78            .user_agent(user_agent)
79            .pool_max_idle_per_host(settings.pool_max_idle_per_host)
80            .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
81            .build()
82            .expect("cannot create reqwest client"),
83        settings,
84    }
85}
86
87// Labels are adhoc labels we will inject per our config
88#[derive(Clone)]
89pub struct Labels {
90    pub network: String,
91    pub inventory_hostname: String,
92}
93
94/// App will configure our routes. This fn is also used to instrument our tests
95pub fn app(
96    labels: Labels,
97    client: ReqwestClient,
98    relay: HistogramRelay,
99    allower: Option<SuiNodeProvider>,
100    timeout_secs: Option<u64>,
101) -> Router {
102    // build our application with a route and our sender mpsc
103    let mut router = Router::new()
104        .route("/publish/metrics", post(publish_metrics))
105        .route_layer(DefaultBodyLimit::max(var!(
106            "MAX_BODY_SIZE",
107            1024 * 1024 * 5
108        )))
109        .route_layer(middleware::from_fn(expect_mysten_proxy_header))
110        .route_layer(middleware::from_fn(expect_content_length));
111    if let Some(allower) = allower {
112        router = router
113            .route_layer(middleware::from_fn(expect_valid_public_key))
114            .layer(Extension(Arc::new(allower)));
115    }
116    router
117        // Enforce on all routes.
118        // If the request does not complete within the specified timeout it will be aborted
119        // and a 408 Request Timeout response will be sent.
120        .layer(TimeoutLayer::new(Duration::from_secs(
121            timeout_secs.unwrap_or(20),
122        )))
123        .layer(Extension(relay))
124        .layer(Extension(labels))
125        .layer(Extension(client))
126        .layer(
127            ServiceBuilder::new().layer(
128                TraceLayer::new_for_http()
129                    .on_response(
130                        DefaultOnResponse::new()
131                            .level(Level::INFO)
132                            .latency_unit(LatencyUnit::Seconds),
133                    )
134                    .on_failure(
135                        DefaultOnFailure::new()
136                            .level(Level::ERROR)
137                            .latency_unit(LatencyUnit::Seconds),
138                    ),
139            ),
140        )
141}
142
143/// Server creates our http/https server
144pub async fn server(
145    listener: std::net::TcpListener,
146    app: Router,
147    acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149    listener.set_nonblocking(true)?;
150    let listener = tokio::net::TcpListener::from_std(listener)?;
151
152    // setup our graceful shutdown
153    let handle = axum_server::Handle::new();
154    // Spawn a task to gracefully shutdown server.
155    tokio::spawn(shutdown_signal(handle.clone()));
156
157    if let Some(verify_peers) = acceptor {
158        axum_server::Server::from_listener(listener)
159            .acceptor(verify_peers)
160            .handle(handle)
161            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
162            .await
163    } else {
164        axum_server::Server::from_listener(listener)
165            .handle(handle)
166            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
167            .await
168    }
169}
170
171/// CertKeyPair wraps a self signed certificate and the corresponding public key
172pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
173
174/// Generate server certs for use with peer verification
175pub fn generate_self_cert(hostname: String) -> CertKeyPair {
176    let mut rng = rand::thread_rng();
177    let keypair = Ed25519KeyPair::generate(&mut rng);
178    CertKeyPair(
179        SelfSignedCertificate::new(keypair.copy().private(), &hostname),
180        keypair.public().to_owned(),
181    )
182}
183
184/// Load a certificate for use by the listening service
185fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
186    let certfile = fs::File::open(filename)
187        .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
188    let mut reader = BufReader::new(certfile);
189    rustls_pemfile::certs(&mut reader)
190        .collect::<Result<Vec<_>, _>>()
191        .unwrap()
192}
193
194/// Load a private key
195fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
196    let keyfile = fs::File::open(filename)
197        .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
198    let mut reader = BufReader::new(keyfile);
199
200    loop {
201        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
202            Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
203            Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
204            Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
205            None => break,
206            _ => {}
207        }
208    }
209
210    panic!(
211        "no keys found in {:?} (encrypted keys not supported)",
212        filename
213    );
214}
215
216/// load the static keys we'll use to allow external non-validator nodes to push metrics
217fn load_static_peers(
218    static_peers: Option<StaticPeerValidationConfig>,
219) -> Result<Vec<AllowedPeer>, Error> {
220    let Some(static_peers) = static_peers else {
221        return Ok(vec![]);
222    };
223    let static_keys = static_peers
224        .pub_keys
225        .into_iter()
226        .map(|spk| {
227            let peer_id = hex::decode(spk.peer_id).unwrap();
228            let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
229            let s = AllowedPeer {
230                name: spk.name.clone(),
231                public_key,
232            };
233            info!(
234                "loaded static peer: {} public key: {}",
235                &s.name, &s.public_key,
236            );
237            s
238        })
239        .collect();
240    Ok(static_keys)
241}
242
243/// Default allow mode for server, we don't verify clients, everything is accepted
244pub fn create_server_cert_default_allow(
245    hostname: String,
246) -> Result<ServerConfig, sui_tls::rustls::Error> {
247    let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
248
249    ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
250        vec![server_certificate.rustls_certificate()],
251        server_certificate.rustls_private_key(),
252    )
253}
254
255/// Verify clients against sui blockchain, clients that are not found in sui_getValidators
256/// will be rejected
257pub fn create_server_cert_enforce_peer(
258    dynamic_peers: DynamicPeerValidationConfig,
259    static_peers: Option<StaticPeerValidationConfig>,
260) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
261    let (Some(certificate_path), Some(private_key_path)) =
262        (dynamic_peers.certificate_file, dynamic_peers.private_key)
263    else {
264        return Err(sui_tls::rustls::Error::General(
265            "missing certs to initialize server".into(),
266        ));
267    };
268    let static_peers = load_static_peers(static_peers).map_err(|e| {
269        sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
270    })?;
271    let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
272    allower.poll_peer_list();
273    let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
274        .rustls_server_config(
275            load_certs(&certificate_path),
276            load_private_key(&private_key_path),
277        )?;
278    Ok((c, Some(allower)))
279}