sui_node/
admin.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::SuiNode;
5use crate::db_shell::{handle_delete, handle_ls, handle_read};
6use axum::{
7    Router,
8    extract::{Query, State},
9    http::StatusCode,
10    routing::{delete, get, post},
11};
12use base64::Engine;
13use fastcrypto::encoding::{Encoding, Hex};
14use fastcrypto::traits::ToFromBytes;
15use humantime::parse_duration;
16use mysten_network::Multiaddr;
17use serde::Deserialize;
18use std::sync::Arc;
19use std::{
20    net::{IpAddr, Ipv4Addr, SocketAddr},
21    str::FromStr,
22};
23use sui_network::endpoint_manager::{AddressSource, EndpointId};
24use sui_types::{
25    base_types::AuthorityName,
26    crypto::{NetworkPublicKey, RandomnessPartialSignature, RandomnessRound, RandomnessSignature},
27    digests::TransactionDigest,
28    error::SuiErrorKind,
29    traffic_control::TrafficControlReconfigParams,
30};
31use telemetry_subscribers::TracingHandle;
32use tokio::sync::oneshot;
33use tracing::info;
34
35// Example commands:
36//
37// Set buffer stake for current epoch 2 to 1500 basis points:
38//
39//   $ curl -X POST 'http://127.0.0.1:1337/set-override-buffer-stake?buffer_bps=1500&epoch=2'
40//
41// Clear buffer stake override for current epoch 2, use
42// ProtocolConfig::buffer_stake_for_protocol_upgrade_bps:
43//
44//   $ curl -X POST 'http://127.0.0.1:1337/clear-override-buffer-stake?epoch=2'
45//
46// Vote to close epoch 2 early
47//
48//   $ curl -X POST 'http://127.0.0.1:1337/force-close-epoch?epoch=2'
49//
50// View current all capabilities from all authorities that have been received by this node:
51//
52//   $ curl 'http://127.0.0.1:1337/capabilities'
53//
54// View the node config (private keys will be masked):
55//
56//   $ curl 'http://127.0.0.1:1337/node-config'
57//
58// Set a time-limited tracing config. After the duration expires, tracing will be disabled
59// automatically.
60//
61//   $ curl -X POST 'http://127.0.0.1:1337/enable-tracing?filter=info&duration=10s'
62//
63// Reset tracing to the TRACE_FILTER env var.
64//
65//   $ curl -X POST 'http://127.0.0.1:1337/reset-tracing'
66//
67// Get the node's randomness partial signatures for round 123.
68//
69//  $ curl 'http://127.0.0.1:1337/randomness-partial-sigs?round=123'
70//
71// Inject a randomness partial signature from another node, bypassing validity checks.
72//
73//  $ curl 'http://127.0.0.1:1337/randomness-inject-partial-sigs?authority_name=hexencodedname&round=123&sigs=base64encodedsigs'
74//
75// Inject a full signature from another node, bypassing validity checks.
76//
77//  $ curl 'http://127.0.0.1:1337/randomness-inject-full-sig?round=123&sigs=base64encodedsig'
78//
79// Get the estimated cost of a transaction
80//
81//  $ curl 'http://127.0.0.1:1337/get-tx-cost?tx=<tx_digest>'
82// Reconfigure traffic control policy
83//
84//  $ curl 'http://127.0.0.1:1337/traffic-control?error_threshold=100&spam_threshold=100&dry_run=true'
85//
86// Update endpoint address(es) for a peer
87//
88//  $ curl -X POST 'http://127.0.0.1:1337/update-endpoint?endpoint_type=p2p&id=<hex_encoded_peer_id>&addresses=<multiaddr1>,<multiaddr2>'
89//  $ curl -X POST 'http://127.0.0.1:1337/update-endpoint?endpoint_type=consensus&id=<hex_encoded_network_pubkey>&addresses=<multiaddr1>,<multiaddr2>'
90
91const NO_TRACING_HANDLE: &str = "tracing handle not available";
92const LOGGING_ROUTE: &str = "/logging";
93const TRACING_ROUTE: &str = "/enable-tracing";
94const TRACING_RESET_ROUTE: &str = "/reset-tracing";
95const SET_BUFFER_STAKE_ROUTE: &str = "/set-override-buffer-stake";
96const CLEAR_BUFFER_STAKE_ROUTE: &str = "/clear-override-buffer-stake";
97const FORCE_CLOSE_EPOCH: &str = "/force-close-epoch";
98const CAPABILITIES: &str = "/capabilities";
99const NODE_CONFIG: &str = "/node-config";
100const RANDOMNESS_PARTIAL_SIGS_ROUTE: &str = "/randomness-partial-sigs";
101const RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE: &str = "/randomness-inject-partial-sigs";
102const RANDOMNESS_INJECT_FULL_SIG_ROUTE: &str = "/randomness-inject-full-sig";
103const GET_TX_COST_ROUTE: &str = "/get-tx-cost";
104const DUMP_CONSENSUS_TX_COST_ESTIMATES_ROUTE: &str = "/dump-consensus-tx-cost-estimates";
105const TRAFFIC_CONTROL: &str = "/traffic-control";
106const UPDATE_ENDPOINT: &str = "/update-endpoint";
107const DB_SHELL_LS: &str = "/db-shell/ls";
108const DB_SHELL_READ: &str = "/db-shell/read";
109const DB_SHELL_DELETE: &str = "/db-shell/delete";
110
111pub(crate) struct AppState {
112    pub(crate) node: Arc<SuiNode>,
113    pub(crate) tracing_handle: Option<TracingHandle>,
114}
115
116pub async fn run_admin_server(
117    node: Arc<SuiNode>,
118    port: u16,
119    tracing_handle: Option<TracingHandle>,
120) {
121    let filter = tracing_handle
122        .as_ref()
123        .and_then(|h| h.get_log().ok())
124        .unwrap_or_else(|| NO_TRACING_HANDLE.to_string());
125
126    let app_state = AppState {
127        node,
128        tracing_handle,
129    };
130
131    let app = Router::new()
132        .route(LOGGING_ROUTE, get(get_filter))
133        .route(CAPABILITIES, get(capabilities))
134        .route(NODE_CONFIG, get(node_config))
135        .route(LOGGING_ROUTE, post(set_filter))
136        .route(
137            SET_BUFFER_STAKE_ROUTE,
138            post(set_override_protocol_upgrade_buffer_stake),
139        )
140        .route(
141            CLEAR_BUFFER_STAKE_ROUTE,
142            post(clear_override_protocol_upgrade_buffer_stake),
143        )
144        .route(FORCE_CLOSE_EPOCH, post(force_close_epoch))
145        .route(TRACING_ROUTE, post(enable_tracing))
146        .route(TRACING_RESET_ROUTE, post(reset_tracing))
147        .route(RANDOMNESS_PARTIAL_SIGS_ROUTE, get(randomness_partial_sigs))
148        .route(
149            RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE,
150            post(randomness_inject_partial_sigs),
151        )
152        .route(
153            RANDOMNESS_INJECT_FULL_SIG_ROUTE,
154            post(randomness_inject_full_sig),
155        )
156        .route(GET_TX_COST_ROUTE, get(get_tx_cost))
157        .route(
158            DUMP_CONSENSUS_TX_COST_ESTIMATES_ROUTE,
159            get(dump_consensus_tx_cost_estimates),
160        )
161        .route(TRAFFIC_CONTROL, post(traffic_control))
162        .route(UPDATE_ENDPOINT, post(update_endpoint))
163        .route(DB_SHELL_LS, get(handle_ls))
164        .route(DB_SHELL_READ, get(handle_read))
165        .route(DB_SHELL_DELETE, delete(handle_delete))
166        .with_state(Arc::new(app_state));
167
168    let socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
169    info!(
170        filter =% filter,
171        address =% socket_address,
172        "starting admin server"
173    );
174
175    let listener = tokio::net::TcpListener::bind(&socket_address)
176        .await
177        .unwrap();
178    axum::serve(
179        listener,
180        app.into_make_service_with_connect_info::<SocketAddr>(),
181    )
182    .await
183    .unwrap();
184}
185
186#[derive(Deserialize)]
187struct EnableTracing {
188    // These params change the filter, and reset it after the duration expires.
189    filter: Option<String>,
190    duration: Option<String>,
191
192    // Change the trace output file (if file output was enabled at program start)
193    trace_file: Option<String>,
194
195    // Change the tracing sample rate
196    sample_rate: Option<f64>,
197}
198
199async fn enable_tracing(
200    State(state): State<Arc<AppState>>,
201    query: Query<EnableTracing>,
202) -> (StatusCode, String) {
203    let Some(tracing_handle) = &state.tracing_handle else {
204        return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
205    };
206
207    let Query(EnableTracing {
208        filter,
209        duration,
210        trace_file,
211        sample_rate,
212    }) = query;
213
214    let mut response = Vec::new();
215
216    if let Some(sample_rate) = sample_rate {
217        tracing_handle.update_sampling_rate(sample_rate);
218        response.push(format!("sample rate set to {:?}", sample_rate));
219    }
220
221    if let Some(trace_file) = trace_file {
222        if let Err(err) = tracing_handle.update_trace_file(&trace_file) {
223            response.push(format!("can't update trace file: {:?}", err));
224            return (StatusCode::BAD_REQUEST, response.join("\n"));
225        } else {
226            response.push(format!("trace file set to {:?}", trace_file));
227        }
228    }
229
230    let Some(filter) = filter else {
231        return (StatusCode::OK, response.join("\n"));
232    };
233
234    // Duration is required if filter is set
235    let Some(duration) = duration else {
236        response.push("can't update filter: missing duration".into());
237        return (StatusCode::BAD_REQUEST, response.join("\n"));
238    };
239
240    let Ok(duration) = parse_duration(&duration) else {
241        response.push("can't update filter: invalid duration".into());
242        return (StatusCode::BAD_REQUEST, response.join("\n"));
243    };
244
245    match tracing_handle.update_trace_filter(&filter, duration) {
246        Ok(()) => {
247            response.push(format!("filter set to {:?}", filter));
248            response.push(format!("filter will be reset after {:?}", duration));
249            (StatusCode::OK, response.join("\n"))
250        }
251        Err(err) => {
252            response.push(format!("can't update filter: {:?}", err));
253            (StatusCode::BAD_REQUEST, response.join("\n"))
254        }
255    }
256}
257
258async fn reset_tracing(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
259    let Some(tracing_handle) = &state.tracing_handle else {
260        return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
261    };
262    tracing_handle.reset_trace();
263    (
264        StatusCode::OK,
265        "tracing filter reset to TRACE_FILTER env var".into(),
266    )
267}
268
269async fn get_filter(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
270    let Some(tracing_handle) = &state.tracing_handle else {
271        return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
272    };
273    match tracing_handle.get_log() {
274        Ok(filter) => (StatusCode::OK, filter),
275        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
276    }
277}
278
279async fn set_filter(
280    State(state): State<Arc<AppState>>,
281    new_filter: String,
282) -> (StatusCode, String) {
283    let Some(tracing_handle) = &state.tracing_handle else {
284        return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
285    };
286    match tracing_handle.update_log(&new_filter) {
287        Ok(()) => {
288            info!(filter =% new_filter, "Log filter updated");
289            (StatusCode::OK, "".into())
290        }
291        Err(err) => (StatusCode::BAD_REQUEST, err.to_string()),
292    }
293}
294
295async fn capabilities(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
296    let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
297
298    let capabilities = epoch_store.get_capabilities_v2();
299    let mut output = String::new();
300    for capability in capabilities.unwrap_or_default() {
301        output.push_str(&format!("{:?}\n", capability));
302    }
303
304    (StatusCode::OK, output)
305}
306
307async fn node_config(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
308    let node_config = &state.node.config;
309
310    // Note private keys will be masked
311    (StatusCode::OK, format!("{:#?}\n", node_config))
312}
313
314#[derive(Deserialize)]
315struct Epoch {
316    epoch: u64,
317}
318
319async fn clear_override_protocol_upgrade_buffer_stake(
320    State(state): State<Arc<AppState>>,
321    epoch: Query<Epoch>,
322) -> (StatusCode, String) {
323    let Query(Epoch { epoch }) = epoch;
324
325    match state
326        .node
327        .clear_override_protocol_upgrade_buffer_stake(epoch)
328    {
329        Ok(()) => (
330            StatusCode::OK,
331            "protocol upgrade buffer stake cleared\n".to_string(),
332        ),
333        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
334    }
335}
336
337#[derive(Deserialize)]
338struct SetBufferStake {
339    buffer_bps: u64,
340    epoch: u64,
341}
342
343async fn set_override_protocol_upgrade_buffer_stake(
344    State(state): State<Arc<AppState>>,
345    buffer_state: Query<SetBufferStake>,
346) -> (StatusCode, String) {
347    let Query(SetBufferStake { buffer_bps, epoch }) = buffer_state;
348
349    match state
350        .node
351        .set_override_protocol_upgrade_buffer_stake(epoch, buffer_bps)
352    {
353        Ok(()) => (
354            StatusCode::OK,
355            format!("protocol upgrade buffer stake set to '{}'\n", buffer_bps),
356        ),
357        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
358    }
359}
360
361async fn force_close_epoch(
362    State(state): State<Arc<AppState>>,
363    epoch: Query<Epoch>,
364) -> (StatusCode, String) {
365    let Query(Epoch {
366        epoch: expected_epoch,
367    }) = epoch;
368    let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
369    let actual_epoch = epoch_store.epoch();
370    if actual_epoch != expected_epoch {
371        let err = SuiErrorKind::WrongEpoch {
372            expected_epoch,
373            actual_epoch,
374        };
375        return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
376    }
377
378    match state.node.close_epoch(&epoch_store).await {
379        Ok(()) => (
380            StatusCode::OK,
381            "close_epoch() called successfully\n".to_string(),
382        ),
383        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
384    }
385}
386
387#[derive(Deserialize)]
388struct Round {
389    round: u64,
390}
391
392async fn randomness_partial_sigs(
393    State(state): State<Arc<AppState>>,
394    round: Query<Round>,
395) -> (StatusCode, String) {
396    let Query(Round { round }) = round;
397
398    let (tx, rx) = oneshot::channel();
399    state
400        .node
401        .randomness_handle()
402        .admin_get_partial_signatures(RandomnessRound(round), tx);
403
404    let sigs = match rx.await {
405        Ok(sigs) => sigs,
406        Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
407    };
408
409    let output = format!(
410        "{}\n",
411        base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(sigs)
412    );
413
414    (StatusCode::OK, output)
415}
416
417#[derive(Deserialize)]
418struct PartialSigsToInject {
419    hex_authority_name: String,
420    round: u64,
421    base64_sigs: String,
422}
423
424async fn randomness_inject_partial_sigs(
425    State(state): State<Arc<AppState>>,
426    args: Query<PartialSigsToInject>,
427) -> (StatusCode, String) {
428    let Query(PartialSigsToInject {
429        hex_authority_name,
430        round,
431        base64_sigs,
432    }) = args;
433
434    let authority_name = match AuthorityName::from_str(hex_authority_name.as_str()) {
435        Ok(authority_name) => authority_name,
436        Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
437    };
438
439    let sigs: Vec<u8> = match base64::engine::general_purpose::URL_SAFE_NO_PAD.decode(base64_sigs) {
440        Ok(sigs) => sigs,
441        Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
442    };
443
444    let sigs: Vec<RandomnessPartialSignature> = match bcs::from_bytes(&sigs) {
445        Ok(sigs) => sigs,
446        Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
447    };
448
449    let (tx_result, rx_result) = oneshot::channel();
450    state
451        .node
452        .randomness_handle()
453        .admin_inject_partial_signatures(authority_name, RandomnessRound(round), sigs, tx_result);
454
455    match rx_result.await {
456        Ok(Ok(())) => (StatusCode::OK, "partial signatures injected\n".to_string()),
457        Ok(Err(e)) => (StatusCode::BAD_REQUEST, e.to_string()),
458        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
459    }
460}
461
462#[derive(Deserialize)]
463struct FullSigToInject {
464    round: u64,
465    base64_sig: String,
466}
467
468async fn randomness_inject_full_sig(
469    State(state): State<Arc<AppState>>,
470    args: Query<FullSigToInject>,
471) -> (StatusCode, String) {
472    let Query(FullSigToInject { round, base64_sig }) = args;
473
474    let sig: Vec<u8> = match base64::engine::general_purpose::URL_SAFE_NO_PAD.decode(base64_sig) {
475        Ok(sig) => sig,
476        Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
477    };
478
479    let sig: RandomnessSignature = match bcs::from_bytes(&sig) {
480        Ok(sig) => sig,
481        Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
482    };
483
484    let (tx_result, rx_result) = oneshot::channel();
485    state.node.randomness_handle().admin_inject_full_signature(
486        RandomnessRound(round),
487        sig,
488        tx_result,
489    );
490
491    match rx_result.await {
492        Ok(Ok(())) => (StatusCode::OK, "full signature injected\n".to_string()),
493        Ok(Err(e)) => (StatusCode::BAD_REQUEST, e.to_string()),
494        Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
495    }
496}
497
498#[derive(Deserialize)]
499struct GetTxCost {
500    tx_digest: String,
501}
502
503async fn get_tx_cost(
504    State(state): State<Arc<AppState>>,
505    args: Query<GetTxCost>,
506) -> (StatusCode, String) {
507    let Query(GetTxCost { tx_digest }) = args;
508    let tx_digest = TransactionDigest::from_str(tx_digest.as_str()).unwrap();
509
510    let Some(transaction) = state
511        .node
512        .state()
513        .get_transaction_cache_reader()
514        .get_transaction_block(&tx_digest)
515    else {
516        return (StatusCode::BAD_REQUEST, "Transaction not found".to_string());
517    };
518
519    let Some(cost) = state
520        .node
521        .state()
522        .load_epoch_store_one_call_per_task()
523        .get_estimated_tx_cost(transaction.transaction_data())
524        .await
525    else {
526        return (StatusCode::BAD_REQUEST, "No estimate available".to_string());
527    };
528
529    (StatusCode::OK, cost.to_string())
530}
531
532async fn dump_consensus_tx_cost_estimates(
533    State(state): State<Arc<AppState>>,
534) -> (StatusCode, String) {
535    let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
536    let estimates = epoch_store.get_consensus_tx_cost_estimates().await;
537    (StatusCode::OK, format!("{:#?}", estimates))
538}
539
540async fn traffic_control(
541    State(state): State<Arc<AppState>>,
542    args: Query<TrafficControlReconfigParams>,
543) -> (StatusCode, String) {
544    let Query(params) = args;
545    match state.node.state().reconfigure_traffic_control(params).await {
546        Ok(updated_state) => (
547            StatusCode::OK,
548            format!(
549                "Traffic control configured with:\n\
550                 Error threshold: {:?}\n\
551                 Spam threshold: {:?}\n\
552                 Dry run: {:?}\n",
553                updated_state.error_threshold, updated_state.spam_threshold, updated_state.dry_run
554            ),
555        ),
556        Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
557    }
558}
559
560#[derive(Deserialize)]
561struct UpdateEndpointArgs {
562    endpoint_type: String,
563    id: String,
564    addresses: String,
565}
566
567async fn update_endpoint(
568    State(state): State<Arc<AppState>>,
569    args: Query<UpdateEndpointArgs>,
570) -> (StatusCode, String) {
571    let Query(UpdateEndpointArgs {
572        endpoint_type,
573        id,
574        addresses,
575    }) = args;
576
577    let endpoint_id = match endpoint_type.as_str() {
578        "p2p" => {
579            let peer_id_bytes = match Hex::decode(&id) {
580                Ok(bytes) => bytes,
581                Err(err) => {
582                    return (
583                        StatusCode::BAD_REQUEST,
584                        format!("Invalid id hex encoding: {err}"),
585                    );
586                }
587            };
588
589            let peer_id_bytes: [u8; 32] = match peer_id_bytes.try_into() {
590                Ok(bytes) => bytes,
591                Err(_) => {
592                    return (
593                        StatusCode::BAD_REQUEST,
594                        "p2p id must be 32 bytes".to_string(),
595                    );
596                }
597            };
598
599            EndpointId::P2p(anemo::PeerId(peer_id_bytes))
600        }
601        "consensus" => {
602            let network_pubkey_bytes = match Hex::decode(&id) {
603                Ok(bytes) => bytes,
604                Err(err) => {
605                    return (
606                        StatusCode::BAD_REQUEST,
607                        format!("Invalid id hex encoding: {err}"),
608                    );
609                }
610            };
611
612            let network_pubkey = match NetworkPublicKey::from_bytes(&network_pubkey_bytes) {
613                Ok(key) => key,
614                Err(err) => {
615                    return (
616                        StatusCode::BAD_REQUEST,
617                        format!("Invalid network public key: {err:?}"),
618                    );
619                }
620            };
621
622            EndpointId::Consensus(network_pubkey)
623        }
624        _ => {
625            return (
626                StatusCode::BAD_REQUEST,
627                format!("Unknown endpoint_type: {endpoint_type}"),
628            );
629        }
630    };
631
632    let mut parsed_addresses = Vec::new();
633    for addr_str in addresses.split(',') {
634        let addr_str = addr_str.trim();
635        if addr_str.is_empty() {
636            continue;
637        }
638        match addr_str.parse::<Multiaddr>() {
639            Ok(addr) => parsed_addresses.push(addr),
640            Err(err) => {
641                return (
642                    StatusCode::BAD_REQUEST,
643                    format!("Invalid address '{addr_str}': {err}"),
644                );
645            }
646        }
647    }
648
649    if let Err(e) = state.node.endpoint_manager().update_endpoint(
650        endpoint_id,
651        AddressSource::Admin,
652        parsed_addresses.clone(),
653    ) {
654        return (StatusCode::BAD_REQUEST, e.to_string());
655    }
656
657    (
658        StatusCode::OK,
659        format!(
660            "Endpoint updated for {endpoint_type} endpoint {id} with {} address(es)\n",
661            parsed_addresses.len(),
662        ),
663    )
664}