sui_proxy/
admin.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7    expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use std::fs;
17use std::io::BufReader;
18use std::net::SocketAddr;
19use std::sync::Arc;
20use std::time::Duration;
21use sui_tls::SUI_VALIDATOR_SERVER_NAME;
22use sui_tls::{
23    AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
24};
25use tokio::signal;
26use tower::ServiceBuilder;
27use tower_http::{
28    LatencyUnit,
29    timeout::TimeoutLayer,
30    trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
31};
32use tracing::{Level, info};
33
34/// Configure our graceful shutdown scenarios
35pub async fn shutdown_signal(h: axum_server::Handle) {
36    let ctrl_c = async {
37        signal::ctrl_c()
38            .await
39            .expect("failed to install Ctrl+C handler");
40    };
41
42    #[cfg(unix)]
43    let terminate = async {
44        signal::unix::signal(signal::unix::SignalKind::terminate())
45            .expect("failed to install signal handler")
46            .recv()
47            .await;
48    };
49
50    #[cfg(not(unix))]
51    let terminate = std::future::pending::<()>();
52
53    tokio::select! {
54        _ = ctrl_c => {},
55        _ = terminate => {},
56    }
57
58    let grace = 30;
59    info!(
60        "signal received, starting graceful shutdown, grace period {} seconds, if needed",
61        &grace
62    );
63    h.graceful_shutdown(Some(Duration::from_secs(grace)))
64}
65
66/// Reqwest client holds the global client for remote_push api calls
67/// it also holds the username and password.  The client has an underlying
68/// connection pool.  See reqwest documentation for details
69#[derive(Clone)]
70pub struct ReqwestClient {
71    pub client: reqwest::Client,
72    pub settings: RemoteWriteConfig,
73}
74
75pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
76    ReqwestClient {
77        client: reqwest::Client::builder()
78            .user_agent(user_agent)
79            .pool_max_idle_per_host(settings.pool_max_idle_per_host)
80            .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
81            .build()
82            .expect("cannot create reqwest client"),
83        settings,
84    }
85}
86
87// Labels are adhoc labels we will inject per our config
88#[derive(Clone)]
89pub struct Labels {
90    pub network: String,
91    pub inventory_hostname: String,
92}
93
94/// App will configure our routes. This fn is also used to instrument our tests
95pub fn app(
96    labels: Labels,
97    client: ReqwestClient,
98    relay: HistogramRelay,
99    allower: Option<SuiNodeProvider>,
100    timeout_secs: Option<u64>,
101) -> Router {
102    // build our application with a route and our sender mpsc
103    let mut router = Router::new()
104        .route("/publish/metrics", post(publish_metrics))
105        .route_layer(DefaultBodyLimit::max(var!(
106            "MAX_BODY_SIZE",
107            1024 * 1024 * 5
108        )))
109        .route_layer(middleware::from_fn(expect_mysten_proxy_header))
110        .route_layer(middleware::from_fn(expect_content_length));
111    if let Some(allower) = allower {
112        router = router
113            .route_layer(middleware::from_fn(expect_valid_public_key))
114            .layer(Extension(Arc::new(allower)));
115    }
116    router
117        // Enforce on all routes.
118        // If the request does not complete within the specified timeout it will be aborted
119        // and a 408 Request Timeout response will be sent.
120        .layer(TimeoutLayer::new(Duration::from_secs(
121            timeout_secs.unwrap_or(20),
122        )))
123        .layer(Extension(relay))
124        .layer(Extension(labels))
125        .layer(Extension(client))
126        .layer(
127            ServiceBuilder::new().layer(
128                TraceLayer::new_for_http()
129                    .on_response(
130                        DefaultOnResponse::new()
131                            .level(Level::INFO)
132                            .latency_unit(LatencyUnit::Seconds),
133                    )
134                    .on_failure(
135                        DefaultOnFailure::new()
136                            .level(Level::ERROR)
137                            .latency_unit(LatencyUnit::Seconds),
138                    ),
139            ),
140        )
141}
142
143/// Server creates our http/https server
144pub async fn server(
145    listener: std::net::TcpListener,
146    app: Router,
147    acceptor: Option<TlsAcceptor>,
148) -> std::io::Result<()> {
149    // setup our graceful shutdown
150    let handle = axum_server::Handle::new();
151    // Spawn a task to gracefully shutdown server.
152    tokio::spawn(shutdown_signal(handle.clone()));
153
154    if let Some(verify_peers) = acceptor {
155        axum_server::Server::from_tcp(listener)
156            .acceptor(verify_peers)
157            .handle(handle)
158            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
159            .await
160    } else {
161        axum_server::Server::from_tcp(listener)
162            .handle(handle)
163            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
164            .await
165    }
166}
167
168/// CertKeyPair wraps a self signed certificate and the corresponding public key
169pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
170
171/// Generate server certs for use with peer verification
172pub fn generate_self_cert(hostname: String) -> CertKeyPair {
173    let mut rng = rand::thread_rng();
174    let keypair = Ed25519KeyPair::generate(&mut rng);
175    CertKeyPair(
176        SelfSignedCertificate::new(keypair.copy().private(), &hostname),
177        keypair.public().to_owned(),
178    )
179}
180
181/// Load a certificate for use by the listening service
182fn load_certs(filename: &str) -> Vec<rustls::pki_types::CertificateDer<'static>> {
183    let certfile = fs::File::open(filename)
184        .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e));
185    let mut reader = BufReader::new(certfile);
186    rustls_pemfile::certs(&mut reader)
187        .collect::<Result<Vec<_>, _>>()
188        .unwrap()
189}
190
191/// Load a private key
192fn load_private_key(filename: &str) -> rustls::pki_types::PrivateKeyDer<'static> {
193    let keyfile = fs::File::open(filename)
194        .unwrap_or_else(|e| panic!("cannot open private key file {}; {}", filename, e));
195    let mut reader = BufReader::new(keyfile);
196
197    loop {
198        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
199            Some(rustls_pemfile::Item::Pkcs1Key(key)) => return key.into(),
200            Some(rustls_pemfile::Item::Pkcs8Key(key)) => return key.into(),
201            Some(rustls_pemfile::Item::Sec1Key(key)) => return key.into(),
202            None => break,
203            _ => {}
204        }
205    }
206
207    panic!(
208        "no keys found in {:?} (encrypted keys not supported)",
209        filename
210    );
211}
212
213/// load the static keys we'll use to allow external non-validator nodes to push metrics
214fn load_static_peers(
215    static_peers: Option<StaticPeerValidationConfig>,
216) -> Result<Vec<AllowedPeer>, Error> {
217    let Some(static_peers) = static_peers else {
218        return Ok(vec![]);
219    };
220    let static_keys = static_peers
221        .pub_keys
222        .into_iter()
223        .map(|spk| {
224            let peer_id = hex::decode(spk.peer_id).unwrap();
225            let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
226            let s = AllowedPeer {
227                name: spk.name.clone(),
228                public_key,
229            };
230            info!(
231                "loaded static peer: {} public key: {}",
232                &s.name, &s.public_key,
233            );
234            s
235        })
236        .collect();
237    Ok(static_keys)
238}
239
240/// Default allow mode for server, we don't verify clients, everything is accepted
241pub fn create_server_cert_default_allow(
242    hostname: String,
243) -> Result<ServerConfig, sui_tls::rustls::Error> {
244    let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
245
246    ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
247        vec![server_certificate.rustls_certificate()],
248        server_certificate.rustls_private_key(),
249    )
250}
251
252/// Verify clients against sui blockchain, clients that are not found in sui_getValidators
253/// will be rejected
254pub fn create_server_cert_enforce_peer(
255    dynamic_peers: DynamicPeerValidationConfig,
256    static_peers: Option<StaticPeerValidationConfig>,
257) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
258    let (Some(certificate_path), Some(private_key_path)) =
259        (dynamic_peers.certificate_file, dynamic_peers.private_key)
260    else {
261        return Err(sui_tls::rustls::Error::General(
262            "missing certs to initialize server".into(),
263        ));
264    };
265    let static_peers = load_static_peers(static_peers).map_err(|e| {
266        sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
267    })?;
268    let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
269    allower.poll_peer_list();
270    let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
271        .rustls_server_config(
272            load_certs(&certificate_path),
273            load_private_key(&private_key_path),
274        )?;
275    Ok((c, Some(allower)))
276}