sui_proxy/
admin.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3use crate::config::{DynamicPeerValidationConfig, RemoteWriteConfig, StaticPeerValidationConfig};
4use crate::handlers::publish_metrics;
5use crate::histogram_relay::HistogramRelay;
6use crate::middleware::{
7    expect_content_length, expect_mysten_proxy_header, expect_valid_public_key,
8};
9use crate::peers::{AllowedPeer, SuiNodeProvider};
10use crate::var;
11use anyhow::Error;
12use anyhow::Result;
13use axum::{Extension, Router, extract::DefaultBodyLimit, middleware, routing::post};
14use fastcrypto::ed25519::{Ed25519KeyPair, Ed25519PublicKey};
15use fastcrypto::traits::{KeyPair, ToFromBytes};
16use rustls::pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject};
17use std::net::SocketAddr;
18use std::sync::Arc;
19use std::time::Duration;
20use sui_tls::SUI_VALIDATOR_SERVER_NAME;
21use sui_tls::{
22    AllowAll, ClientCertVerifier, SelfSignedCertificate, TlsAcceptor, rustls::ServerConfig,
23};
24use tokio::signal;
25use tower::ServiceBuilder;
26use tower_http::{
27    LatencyUnit,
28    timeout::TimeoutLayer,
29    trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
30};
31use tracing::{Level, info};
32
33/// Configure our graceful shutdown scenarios
34pub async fn shutdown_signal(h: axum_server::Handle<SocketAddr>) {
35    let ctrl_c = async {
36        signal::ctrl_c()
37            .await
38            .expect("failed to install Ctrl+C handler");
39    };
40
41    #[cfg(unix)]
42    let terminate = async {
43        signal::unix::signal(signal::unix::SignalKind::terminate())
44            .expect("failed to install signal handler")
45            .recv()
46            .await;
47    };
48
49    #[cfg(not(unix))]
50    let terminate = std::future::pending::<()>();
51
52    tokio::select! {
53        _ = ctrl_c => {},
54        _ = terminate => {},
55    }
56
57    let grace = 30;
58    info!(
59        "signal received, starting graceful shutdown, grace period {} seconds, if needed",
60        &grace
61    );
62    h.graceful_shutdown(Some(Duration::from_secs(grace)))
63}
64
65/// Reqwest client holds the global client for remote_push api calls
66/// it also holds the username and password.  The client has an underlying
67/// connection pool.  See reqwest documentation for details
68#[derive(Clone)]
69pub struct ReqwestClient {
70    pub client: reqwest::Client,
71    pub settings: RemoteWriteConfig,
72}
73
74pub fn make_reqwest_client(settings: RemoteWriteConfig, user_agent: &str) -> ReqwestClient {
75    ReqwestClient {
76        client: reqwest::Client::builder()
77            .user_agent(user_agent)
78            .pool_max_idle_per_host(settings.pool_max_idle_per_host)
79            .timeout(Duration::from_secs(var!("MIMIR_CLIENT_TIMEOUT", 30)))
80            .build()
81            .expect("cannot create reqwest client"),
82        settings,
83    }
84}
85
86// Labels are adhoc labels we will inject per our config
87#[derive(Clone)]
88pub struct Labels {
89    pub network: String,
90    pub inventory_hostname: String,
91}
92
93/// App will configure our routes. This fn is also used to instrument our tests
94pub fn app(
95    labels: Labels,
96    client: ReqwestClient,
97    relay: HistogramRelay,
98    allower: Option<SuiNodeProvider>,
99    timeout_secs: Option<u64>,
100) -> Router {
101    // build our application with a route and our sender mpsc
102    let mut router = Router::new()
103        .route("/publish/metrics", post(publish_metrics))
104        .route_layer(DefaultBodyLimit::max(var!(
105            "MAX_BODY_SIZE",
106            1024 * 1024 * 5
107        )))
108        .route_layer(middleware::from_fn(expect_mysten_proxy_header))
109        .route_layer(middleware::from_fn(expect_content_length));
110    if let Some(allower) = allower {
111        router = router
112            .route_layer(middleware::from_fn(expect_valid_public_key))
113            .layer(Extension(Arc::new(allower)));
114    }
115    router
116        // Enforce on all routes.
117        // If the request does not complete within the specified timeout it will be aborted
118        // and a 408 Request Timeout response will be sent.
119        .layer(TimeoutLayer::new(Duration::from_secs(
120            timeout_secs.unwrap_or(20),
121        )))
122        .layer(Extension(relay))
123        .layer(Extension(labels))
124        .layer(Extension(client))
125        .layer(
126            ServiceBuilder::new().layer(
127                TraceLayer::new_for_http()
128                    .on_response(
129                        DefaultOnResponse::new()
130                            .level(Level::INFO)
131                            .latency_unit(LatencyUnit::Seconds),
132                    )
133                    .on_failure(
134                        DefaultOnFailure::new()
135                            .level(Level::ERROR)
136                            .latency_unit(LatencyUnit::Seconds),
137                    ),
138            ),
139        )
140}
141
142/// Server creates our http/https server
143pub async fn server(
144    listener: std::net::TcpListener,
145    app: Router,
146    acceptor: Option<TlsAcceptor>,
147) -> std::io::Result<()> {
148    listener.set_nonblocking(true)?;
149    let listener = tokio::net::TcpListener::from_std(listener)?;
150
151    // setup our graceful shutdown
152    let handle = axum_server::Handle::new();
153    // Spawn a task to gracefully shutdown server.
154    tokio::spawn(shutdown_signal(handle.clone()));
155
156    if let Some(verify_peers) = acceptor {
157        axum_server::Server::from_listener(listener)
158            .acceptor(verify_peers)
159            .handle(handle)
160            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
161            .await
162    } else {
163        axum_server::Server::from_listener(listener)
164            .handle(handle)
165            .serve(app.into_make_service_with_connect_info::<SocketAddr>())
166            .await
167    }
168}
169
170/// CertKeyPair wraps a self signed certificate and the corresponding public key
171pub struct CertKeyPair(pub SelfSignedCertificate, pub Ed25519PublicKey);
172
173/// Generate server certs for use with peer verification
174pub fn generate_self_cert(hostname: String) -> CertKeyPair {
175    let mut rng = rand::thread_rng();
176    let keypair = Ed25519KeyPair::generate(&mut rng);
177    CertKeyPair(
178        SelfSignedCertificate::new(keypair.copy().private(), &hostname),
179        keypair.public().to_owned(),
180    )
181}
182
183/// Load a certificate for use by the listening service
184fn load_certs(filename: &str) -> Vec<CertificateDer<'static>> {
185    CertificateDer::pem_file_iter(filename)
186        .unwrap_or_else(|e| panic!("cannot open certificate file: {}; {}", filename, e))
187        .collect::<Result<Vec<_>, _>>()
188        .unwrap_or_else(|e| panic!("cannot parse certificate file: {}; {}", filename, e))
189}
190
191/// Load a private key
192fn load_private_key(filename: &str) -> PrivateKeyDer<'static> {
193    PrivateKeyDer::from_pem_file(filename).unwrap_or_else(|e| {
194        panic!(
195            "cannot load private key from {} (encrypted keys not supported): {}",
196            filename, e
197        )
198    })
199}
200
201/// load the static keys we'll use to allow external non-validator nodes to push metrics
202fn load_static_peers(
203    static_peers: Option<StaticPeerValidationConfig>,
204) -> Result<Vec<AllowedPeer>, Error> {
205    let Some(static_peers) = static_peers else {
206        return Ok(vec![]);
207    };
208    let static_keys = static_peers
209        .pub_keys
210        .into_iter()
211        .map(|spk| {
212            let peer_id = hex::decode(spk.peer_id).unwrap();
213            let public_key = Ed25519PublicKey::from_bytes(peer_id.as_ref()).unwrap();
214            let s = AllowedPeer {
215                name: spk.name.clone(),
216                public_key,
217            };
218            info!(
219                "loaded static peer: {} public key: {}",
220                &s.name, &s.public_key,
221            );
222            s
223        })
224        .collect();
225    Ok(static_keys)
226}
227
228/// Default allow mode for server, we don't verify clients, everything is accepted
229pub fn create_server_cert_default_allow(
230    hostname: String,
231) -> Result<ServerConfig, sui_tls::rustls::Error> {
232    let CertKeyPair(server_certificate, _) = generate_self_cert(hostname);
233
234    ClientCertVerifier::new(AllowAll, SUI_VALIDATOR_SERVER_NAME.to_string()).rustls_server_config(
235        vec![server_certificate.rustls_certificate()],
236        server_certificate.rustls_private_key(),
237    )
238}
239
240/// Verify clients against sui blockchain, clients that are not found in sui_getValidators
241/// will be rejected
242pub fn create_server_cert_enforce_peer(
243    dynamic_peers: DynamicPeerValidationConfig,
244    static_peers: Option<StaticPeerValidationConfig>,
245) -> Result<(ServerConfig, Option<SuiNodeProvider>), sui_tls::rustls::Error> {
246    let (Some(certificate_path), Some(private_key_path)) =
247        (dynamic_peers.certificate_file, dynamic_peers.private_key)
248    else {
249        return Err(sui_tls::rustls::Error::General(
250            "missing certs to initialize server".into(),
251        ));
252    };
253    let static_peers = load_static_peers(static_peers).map_err(|e| {
254        sui_tls::rustls::Error::General(format!("unable to load static pub keys: {}", e))
255    })?;
256    let allower = SuiNodeProvider::new(dynamic_peers.url, dynamic_peers.interval, static_peers);
257    allower.poll_peer_list();
258    let c = ClientCertVerifier::new(allower.clone(), SUI_VALIDATOR_SERVER_NAME.to_string())
259        .rustls_server_config(
260            load_certs(&certificate_path),
261            load_private_key(&private_key_path),
262        )?;
263    Ok((c, Some(allower)))
264}