1use crate::SuiNode;
5use crate::db_shell::{handle_delete, handle_ls, handle_read};
6use axum::{
7 Router,
8 extract::{Query, State},
9 http::StatusCode,
10 routing::{delete, get, post},
11};
12use base64::Engine;
13use fastcrypto::encoding::{Encoding, Hex};
14use fastcrypto::traits::ToFromBytes;
15use humantime::parse_duration;
16use mysten_network::Multiaddr;
17use serde::Deserialize;
18use std::sync::Arc;
19use std::{
20 net::{IpAddr, Ipv4Addr, SocketAddr},
21 str::FromStr,
22};
23use sui_network::endpoint_manager::{AddressSource, EndpointId};
24use sui_types::{
25 base_types::AuthorityName,
26 crypto::{NetworkPublicKey, RandomnessPartialSignature, RandomnessRound, RandomnessSignature},
27 digests::TransactionDigest,
28 error::SuiErrorKind,
29 traffic_control::TrafficControlReconfigParams,
30};
31use telemetry_subscribers::TracingHandle;
32use tokio::sync::oneshot;
33use tracing::info;
34
35const NO_TRACING_HANDLE: &str = "tracing handle not available";
92const LOGGING_ROUTE: &str = "/logging";
93const TRACING_ROUTE: &str = "/enable-tracing";
94const TRACING_RESET_ROUTE: &str = "/reset-tracing";
95const SET_BUFFER_STAKE_ROUTE: &str = "/set-override-buffer-stake";
96const CLEAR_BUFFER_STAKE_ROUTE: &str = "/clear-override-buffer-stake";
97const FORCE_CLOSE_EPOCH: &str = "/force-close-epoch";
98const CAPABILITIES: &str = "/capabilities";
99const NODE_CONFIG: &str = "/node-config";
100const RANDOMNESS_PARTIAL_SIGS_ROUTE: &str = "/randomness-partial-sigs";
101const RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE: &str = "/randomness-inject-partial-sigs";
102const RANDOMNESS_INJECT_FULL_SIG_ROUTE: &str = "/randomness-inject-full-sig";
103const GET_TX_COST_ROUTE: &str = "/get-tx-cost";
104const DUMP_CONSENSUS_TX_COST_ESTIMATES_ROUTE: &str = "/dump-consensus-tx-cost-estimates";
105const TRAFFIC_CONTROL: &str = "/traffic-control";
106const UPDATE_ENDPOINT: &str = "/update-endpoint";
107const DB_SHELL_LS: &str = "/db-shell/ls";
108const DB_SHELL_READ: &str = "/db-shell/read";
109const DB_SHELL_DELETE: &str = "/db-shell/delete";
110
111pub(crate) struct AppState {
112 pub(crate) node: Arc<SuiNode>,
113 pub(crate) tracing_handle: Option<TracingHandle>,
114}
115
116pub async fn run_admin_server(
117 node: Arc<SuiNode>,
118 port: u16,
119 tracing_handle: Option<TracingHandle>,
120) {
121 let filter = tracing_handle
122 .as_ref()
123 .and_then(|h| h.get_log().ok())
124 .unwrap_or_else(|| NO_TRACING_HANDLE.to_string());
125
126 let app_state = AppState {
127 node,
128 tracing_handle,
129 };
130
131 let app = Router::new()
132 .route(LOGGING_ROUTE, get(get_filter))
133 .route(CAPABILITIES, get(capabilities))
134 .route(NODE_CONFIG, get(node_config))
135 .route(LOGGING_ROUTE, post(set_filter))
136 .route(
137 SET_BUFFER_STAKE_ROUTE,
138 post(set_override_protocol_upgrade_buffer_stake),
139 )
140 .route(
141 CLEAR_BUFFER_STAKE_ROUTE,
142 post(clear_override_protocol_upgrade_buffer_stake),
143 )
144 .route(FORCE_CLOSE_EPOCH, post(force_close_epoch))
145 .route(TRACING_ROUTE, post(enable_tracing))
146 .route(TRACING_RESET_ROUTE, post(reset_tracing))
147 .route(RANDOMNESS_PARTIAL_SIGS_ROUTE, get(randomness_partial_sigs))
148 .route(
149 RANDOMNESS_INJECT_PARTIAL_SIGS_ROUTE,
150 post(randomness_inject_partial_sigs),
151 )
152 .route(
153 RANDOMNESS_INJECT_FULL_SIG_ROUTE,
154 post(randomness_inject_full_sig),
155 )
156 .route(GET_TX_COST_ROUTE, get(get_tx_cost))
157 .route(
158 DUMP_CONSENSUS_TX_COST_ESTIMATES_ROUTE,
159 get(dump_consensus_tx_cost_estimates),
160 )
161 .route(TRAFFIC_CONTROL, post(traffic_control))
162 .route(UPDATE_ENDPOINT, post(update_endpoint))
163 .route(DB_SHELL_LS, get(handle_ls))
164 .route(DB_SHELL_READ, get(handle_read))
165 .route(DB_SHELL_DELETE, delete(handle_delete))
166 .with_state(Arc::new(app_state));
167
168 let socket_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
169 info!(
170 filter =% filter,
171 address =% socket_address,
172 "starting admin server"
173 );
174
175 let listener = tokio::net::TcpListener::bind(&socket_address)
176 .await
177 .unwrap();
178 axum::serve(
179 listener,
180 app.into_make_service_with_connect_info::<SocketAddr>(),
181 )
182 .await
183 .unwrap();
184}
185
186#[derive(Deserialize)]
187struct EnableTracing {
188 filter: Option<String>,
190 duration: Option<String>,
191
192 trace_file: Option<String>,
194
195 sample_rate: Option<f64>,
197}
198
199async fn enable_tracing(
200 State(state): State<Arc<AppState>>,
201 query: Query<EnableTracing>,
202) -> (StatusCode, String) {
203 let Some(tracing_handle) = &state.tracing_handle else {
204 return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
205 };
206
207 let Query(EnableTracing {
208 filter,
209 duration,
210 trace_file,
211 sample_rate,
212 }) = query;
213
214 let mut response = Vec::new();
215
216 if let Some(sample_rate) = sample_rate {
217 tracing_handle.update_sampling_rate(sample_rate);
218 response.push(format!("sample rate set to {:?}", sample_rate));
219 }
220
221 if let Some(trace_file) = trace_file {
222 if let Err(err) = tracing_handle.update_trace_file(&trace_file) {
223 response.push(format!("can't update trace file: {:?}", err));
224 return (StatusCode::BAD_REQUEST, response.join("\n"));
225 } else {
226 response.push(format!("trace file set to {:?}", trace_file));
227 }
228 }
229
230 let Some(filter) = filter else {
231 return (StatusCode::OK, response.join("\n"));
232 };
233
234 let Some(duration) = duration else {
236 response.push("can't update filter: missing duration".into());
237 return (StatusCode::BAD_REQUEST, response.join("\n"));
238 };
239
240 let Ok(duration) = parse_duration(&duration) else {
241 response.push("can't update filter: invalid duration".into());
242 return (StatusCode::BAD_REQUEST, response.join("\n"));
243 };
244
245 match tracing_handle.update_trace_filter(&filter, duration) {
246 Ok(()) => {
247 response.push(format!("filter set to {:?}", filter));
248 response.push(format!("filter will be reset after {:?}", duration));
249 (StatusCode::OK, response.join("\n"))
250 }
251 Err(err) => {
252 response.push(format!("can't update filter: {:?}", err));
253 (StatusCode::BAD_REQUEST, response.join("\n"))
254 }
255 }
256}
257
258async fn reset_tracing(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
259 let Some(tracing_handle) = &state.tracing_handle else {
260 return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
261 };
262 tracing_handle.reset_trace();
263 (
264 StatusCode::OK,
265 "tracing filter reset to TRACE_FILTER env var".into(),
266 )
267}
268
269async fn get_filter(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
270 let Some(tracing_handle) = &state.tracing_handle else {
271 return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
272 };
273 match tracing_handle.get_log() {
274 Ok(filter) => (StatusCode::OK, filter),
275 Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
276 }
277}
278
279async fn set_filter(
280 State(state): State<Arc<AppState>>,
281 new_filter: String,
282) -> (StatusCode, String) {
283 let Some(tracing_handle) = &state.tracing_handle else {
284 return (StatusCode::UNPROCESSABLE_ENTITY, NO_TRACING_HANDLE.into());
285 };
286 match tracing_handle.update_log(&new_filter) {
287 Ok(()) => {
288 info!(filter =% new_filter, "Log filter updated");
289 (StatusCode::OK, "".into())
290 }
291 Err(err) => (StatusCode::BAD_REQUEST, err.to_string()),
292 }
293}
294
295async fn capabilities(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
296 let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
297
298 let capabilities = epoch_store.get_capabilities_v2();
299 let mut output = String::new();
300 for capability in capabilities.unwrap_or_default() {
301 output.push_str(&format!("{:?}\n", capability));
302 }
303
304 (StatusCode::OK, output)
305}
306
307async fn node_config(State(state): State<Arc<AppState>>) -> (StatusCode, String) {
308 let node_config = &state.node.config;
309
310 (StatusCode::OK, format!("{:#?}\n", node_config))
312}
313
314#[derive(Deserialize)]
315struct Epoch {
316 epoch: u64,
317}
318
319async fn clear_override_protocol_upgrade_buffer_stake(
320 State(state): State<Arc<AppState>>,
321 epoch: Query<Epoch>,
322) -> (StatusCode, String) {
323 let Query(Epoch { epoch }) = epoch;
324
325 match state
326 .node
327 .clear_override_protocol_upgrade_buffer_stake(epoch)
328 {
329 Ok(()) => (
330 StatusCode::OK,
331 "protocol upgrade buffer stake cleared\n".to_string(),
332 ),
333 Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
334 }
335}
336
337#[derive(Deserialize)]
338struct SetBufferStake {
339 buffer_bps: u64,
340 epoch: u64,
341}
342
343async fn set_override_protocol_upgrade_buffer_stake(
344 State(state): State<Arc<AppState>>,
345 buffer_state: Query<SetBufferStake>,
346) -> (StatusCode, String) {
347 let Query(SetBufferStake { buffer_bps, epoch }) = buffer_state;
348
349 match state
350 .node
351 .set_override_protocol_upgrade_buffer_stake(epoch, buffer_bps)
352 {
353 Ok(()) => (
354 StatusCode::OK,
355 format!("protocol upgrade buffer stake set to '{}'\n", buffer_bps),
356 ),
357 Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
358 }
359}
360
361async fn force_close_epoch(
362 State(state): State<Arc<AppState>>,
363 epoch: Query<Epoch>,
364) -> (StatusCode, String) {
365 let Query(Epoch {
366 epoch: expected_epoch,
367 }) = epoch;
368 let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
369 let actual_epoch = epoch_store.epoch();
370 if actual_epoch != expected_epoch {
371 let err = SuiErrorKind::WrongEpoch {
372 expected_epoch,
373 actual_epoch,
374 };
375 return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
376 }
377
378 match state.node.close_epoch(&epoch_store).await {
379 Ok(()) => (
380 StatusCode::OK,
381 "close_epoch() called successfully\n".to_string(),
382 ),
383 Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
384 }
385}
386
387#[derive(Deserialize)]
388struct Round {
389 round: u64,
390}
391
392async fn randomness_partial_sigs(
393 State(state): State<Arc<AppState>>,
394 round: Query<Round>,
395) -> (StatusCode, String) {
396 let Query(Round { round }) = round;
397
398 let (tx, rx) = oneshot::channel();
399 state
400 .node
401 .randomness_handle()
402 .admin_get_partial_signatures(RandomnessRound(round), tx);
403
404 let sigs = match rx.await {
405 Ok(sigs) => sigs,
406 Err(err) => return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
407 };
408
409 let output = format!(
410 "{}\n",
411 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(sigs)
412 );
413
414 (StatusCode::OK, output)
415}
416
417#[derive(Deserialize)]
418struct PartialSigsToInject {
419 hex_authority_name: String,
420 round: u64,
421 base64_sigs: String,
422}
423
424async fn randomness_inject_partial_sigs(
425 State(state): State<Arc<AppState>>,
426 args: Query<PartialSigsToInject>,
427) -> (StatusCode, String) {
428 let Query(PartialSigsToInject {
429 hex_authority_name,
430 round,
431 base64_sigs,
432 }) = args;
433
434 let authority_name = match AuthorityName::from_str(hex_authority_name.as_str()) {
435 Ok(authority_name) => authority_name,
436 Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
437 };
438
439 let sigs: Vec<u8> = match base64::engine::general_purpose::URL_SAFE_NO_PAD.decode(base64_sigs) {
440 Ok(sigs) => sigs,
441 Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
442 };
443
444 let sigs: Vec<RandomnessPartialSignature> = match bcs::from_bytes(&sigs) {
445 Ok(sigs) => sigs,
446 Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
447 };
448
449 let (tx_result, rx_result) = oneshot::channel();
450 state
451 .node
452 .randomness_handle()
453 .admin_inject_partial_signatures(authority_name, RandomnessRound(round), sigs, tx_result);
454
455 match rx_result.await {
456 Ok(Ok(())) => (StatusCode::OK, "partial signatures injected\n".to_string()),
457 Ok(Err(e)) => (StatusCode::BAD_REQUEST, e.to_string()),
458 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
459 }
460}
461
462#[derive(Deserialize)]
463struct FullSigToInject {
464 round: u64,
465 base64_sig: String,
466}
467
468async fn randomness_inject_full_sig(
469 State(state): State<Arc<AppState>>,
470 args: Query<FullSigToInject>,
471) -> (StatusCode, String) {
472 let Query(FullSigToInject { round, base64_sig }) = args;
473
474 let sig: Vec<u8> = match base64::engine::general_purpose::URL_SAFE_NO_PAD.decode(base64_sig) {
475 Ok(sig) => sig,
476 Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
477 };
478
479 let sig: RandomnessSignature = match bcs::from_bytes(&sig) {
480 Ok(sig) => sig,
481 Err(err) => return (StatusCode::BAD_REQUEST, err.to_string()),
482 };
483
484 let (tx_result, rx_result) = oneshot::channel();
485 state.node.randomness_handle().admin_inject_full_signature(
486 RandomnessRound(round),
487 sig,
488 tx_result,
489 );
490
491 match rx_result.await {
492 Ok(Ok(())) => (StatusCode::OK, "full signature injected\n".to_string()),
493 Ok(Err(e)) => (StatusCode::BAD_REQUEST, e.to_string()),
494 Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
495 }
496}
497
498#[derive(Deserialize)]
499struct GetTxCost {
500 tx_digest: String,
501}
502
503async fn get_tx_cost(
504 State(state): State<Arc<AppState>>,
505 args: Query<GetTxCost>,
506) -> (StatusCode, String) {
507 let Query(GetTxCost { tx_digest }) = args;
508 let tx_digest = TransactionDigest::from_str(tx_digest.as_str()).unwrap();
509
510 let Some(transaction) = state
511 .node
512 .state()
513 .get_transaction_cache_reader()
514 .get_transaction_block(&tx_digest)
515 else {
516 return (StatusCode::BAD_REQUEST, "Transaction not found".to_string());
517 };
518
519 let Some(cost) = state
520 .node
521 .state()
522 .load_epoch_store_one_call_per_task()
523 .get_estimated_tx_cost(transaction.transaction_data())
524 .await
525 else {
526 return (StatusCode::BAD_REQUEST, "No estimate available".to_string());
527 };
528
529 (StatusCode::OK, cost.to_string())
530}
531
532async fn dump_consensus_tx_cost_estimates(
533 State(state): State<Arc<AppState>>,
534) -> (StatusCode, String) {
535 let epoch_store = state.node.state().load_epoch_store_one_call_per_task();
536 let estimates = epoch_store.get_consensus_tx_cost_estimates().await;
537 (StatusCode::OK, format!("{:#?}", estimates))
538}
539
540async fn traffic_control(
541 State(state): State<Arc<AppState>>,
542 args: Query<TrafficControlReconfigParams>,
543) -> (StatusCode, String) {
544 let Query(params) = args;
545 match state.node.state().reconfigure_traffic_control(params).await {
546 Ok(updated_state) => (
547 StatusCode::OK,
548 format!(
549 "Traffic control configured with:\n\
550 Error threshold: {:?}\n\
551 Spam threshold: {:?}\n\
552 Dry run: {:?}\n",
553 updated_state.error_threshold, updated_state.spam_threshold, updated_state.dry_run
554 ),
555 ),
556 Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
557 }
558}
559
560#[derive(Deserialize)]
561struct UpdateEndpointArgs {
562 endpoint_type: String,
563 id: String,
564 addresses: String,
565}
566
567async fn update_endpoint(
568 State(state): State<Arc<AppState>>,
569 args: Query<UpdateEndpointArgs>,
570) -> (StatusCode, String) {
571 let Query(UpdateEndpointArgs {
572 endpoint_type,
573 id,
574 addresses,
575 }) = args;
576
577 let endpoint_id = match endpoint_type.as_str() {
578 "p2p" => {
579 let peer_id_bytes = match Hex::decode(&id) {
580 Ok(bytes) => bytes,
581 Err(err) => {
582 return (
583 StatusCode::BAD_REQUEST,
584 format!("Invalid id hex encoding: {err}"),
585 );
586 }
587 };
588
589 let peer_id_bytes: [u8; 32] = match peer_id_bytes.try_into() {
590 Ok(bytes) => bytes,
591 Err(_) => {
592 return (
593 StatusCode::BAD_REQUEST,
594 "p2p id must be 32 bytes".to_string(),
595 );
596 }
597 };
598
599 EndpointId::P2p(anemo::PeerId(peer_id_bytes))
600 }
601 "consensus" => {
602 let network_pubkey_bytes = match Hex::decode(&id) {
603 Ok(bytes) => bytes,
604 Err(err) => {
605 return (
606 StatusCode::BAD_REQUEST,
607 format!("Invalid id hex encoding: {err}"),
608 );
609 }
610 };
611
612 let network_pubkey = match NetworkPublicKey::from_bytes(&network_pubkey_bytes) {
613 Ok(key) => key,
614 Err(err) => {
615 return (
616 StatusCode::BAD_REQUEST,
617 format!("Invalid network public key: {err:?}"),
618 );
619 }
620 };
621
622 EndpointId::Consensus(network_pubkey)
623 }
624 _ => {
625 return (
626 StatusCode::BAD_REQUEST,
627 format!("Unknown endpoint_type: {endpoint_type}"),
628 );
629 }
630 };
631
632 let mut parsed_addresses = Vec::new();
633 for addr_str in addresses.split(',') {
634 let addr_str = addr_str.trim();
635 if addr_str.is_empty() {
636 continue;
637 }
638 match addr_str.parse::<Multiaddr>() {
639 Ok(addr) => parsed_addresses.push(addr),
640 Err(err) => {
641 return (
642 StatusCode::BAD_REQUEST,
643 format!("Invalid address '{addr_str}': {err}"),
644 );
645 }
646 }
647 }
648
649 if let Err(e) = state.node.endpoint_manager().update_endpoint(
650 endpoint_id,
651 AddressSource::Admin,
652 parsed_addresses.clone(),
653 ) {
654 return (StatusCode::BAD_REQUEST, e.to_string());
655 }
656
657 (
658 StatusCode::OK,
659 format!(
660 "Endpoint updated for {endpoint_type} endpoint {id} with {} address(es)\n",
661 parsed_addresses.len(),
662 ),
663 )
664}