1use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use jsonrpsee::server::BatchRequestConfig;
10use jsonrpsee::server::RpcServiceBuilder;
11use jsonrpsee::server::ServerBuilder;
12use prometheus::Registry;
13use serde_json::json;
14use sui_futures::service::Service;
15use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
16use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
17use sui_indexer_alt_reader::fullnode_client::FullnodeClient;
18use sui_indexer_alt_reader::kv_loader::KvArgs;
19use sui_indexer_alt_reader::pg_reader::db::DbArgs;
20use sui_indexer_alt_reader::system_package_task::SystemPackageTask;
21use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
22use sui_open_rpc::Project;
23use tower_http::catch_panic;
24use tower_layer::Identity;
25use tracing::info;
26use tracing::warn;
27use url::Url;
28
29use crate::api::checkpoints::Checkpoints;
30use crate::api::coin::Coins;
31use crate::api::dynamic_fields::DynamicFields;
32use crate::api::governance::Governance;
33use crate::api::move_utils::MoveUtils;
34use crate::api::name_service::NameService;
35use crate::api::objects::Objects;
36use crate::api::objects::QueryObjects;
37use crate::api::rpc_module::RpcModule;
38use crate::api::transactions::QueryTransactions;
39use crate::api::transactions::Transactions;
40use crate::api::write::Write;
41use crate::config::RpcConfig;
42use crate::context::Context;
43use crate::error::PanicHandler;
44use crate::metrics::RpcMetrics;
45use crate::metrics::middleware::MetricsLayer;
46use crate::timeout::TimeoutLayer;
47
48pub mod api;
49pub mod args;
50pub mod config;
51mod context;
52pub mod data;
53mod error;
54mod metrics;
55mod paginate;
56mod timeout;
57
58#[derive(clap::Args, Debug, Clone)]
59pub struct RpcArgs {
60 #[clap(long, default_value_t = Self::default().rpc_listen_address)]
62 pub rpc_listen_address: SocketAddr,
63
64 #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
67 pub max_in_flight_requests: u32,
68
69 #[clap(long, default_value_t = Self::default().request_timeout_ms)]
72 pub request_timeout_ms: u64,
73
74 #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
77 pub slow_request_threshold_ms: u64,
78}
79
80pub struct RpcService {
81 rpc_listen_address: SocketAddr,
83
84 server: ServerBuilder<Identity, Identity>,
86
87 metrics: Arc<RpcMetrics>,
89
90 request_timeout: Duration,
92
93 slow_request_threshold: Duration,
95
96 modules: jsonrpsee::RpcModule<()>,
98
99 schema: Project,
101}
102
103impl RpcArgs {
104 fn request_timeout(&self) -> Duration {
106 Duration::from_millis(self.request_timeout_ms)
107 }
108
109 fn slow_request_threshold(&self) -> Duration {
112 Duration::from_millis(self.slow_request_threshold_ms)
113 }
114}
115
116impl RpcService {
117 pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
120 let metrics = RpcMetrics::new(registry);
121
122 let server = ServerBuilder::new()
123 .http_only()
124 .max_connections(rpc_args.max_in_flight_requests)
127 .max_response_body_size(u32::MAX)
128 .set_batch_request_config(BatchRequestConfig::Disabled);
129
130 let schema = Project::new(
131 env!("CARGO_PKG_VERSION"),
132 "Sui JSON-RPC",
133 "A JSON-RPC API for interacting with the Sui blockchain.",
134 "Mysten Labs",
135 "https://mystenlabs.com",
136 "build@mystenlabs.com",
137 "Apache-2.0",
138 "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
139 );
140
141 Ok(Self {
142 rpc_listen_address: rpc_args.rpc_listen_address,
143 server,
144 metrics,
145 request_timeout: rpc_args.request_timeout(),
146 slow_request_threshold: rpc_args.slow_request_threshold(),
147 modules: jsonrpsee::RpcModule::new(()),
148 schema,
149 })
150 }
151
152 pub fn metrics(&self) -> Arc<RpcMetrics> {
154 self.metrics.clone()
155 }
156
157 pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
160 self.schema.add_module(module.schema());
161 self.modules
162 .merge(module.into_impl().remove_context())
163 .context("Failed to add module because of a name conflict")
164 }
165
166 pub async fn run(self) -> anyhow::Result<Service> {
169 let Self {
170 rpc_listen_address,
171 server,
172 metrics,
173 request_timeout,
174 slow_request_threshold,
175 mut modules,
176 schema,
177 } = self;
178
179 info!("Starting JSON-RPC service on {rpc_listen_address}",);
180 info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
181
182 modules
184 .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
185 .context("Failed to add schema discovery method")?;
186
187 let middleware = RpcServiceBuilder::new()
188 .layer(TimeoutLayer::new(request_timeout))
189 .layer(MetricsLayer::new(
190 metrics.clone(),
191 modules.method_names().map(|n| n.to_owned()).collect(),
192 slow_request_threshold,
193 ));
194
195 let handle = server
196 .set_rpc_middleware(middleware)
197 .set_http_middleware(
198 tower::builder::ServiceBuilder::new()
199 .layer(
200 tower_http::cors::CorsLayer::new()
201 .allow_methods([http::Method::GET, http::Method::POST])
202 .allow_origin(tower_http::cors::Any)
203 .allow_headers(tower_http::cors::Any),
204 )
205 .layer(catch_panic::CatchPanicLayer::custom(PanicHandler::new(
206 metrics,
207 ))),
208 )
209 .build(rpc_listen_address)
210 .await
211 .context("Failed to bind JSON-RPC service")?
212 .start(modules);
213
214 let signal = handle.clone();
215 Ok(Service::new()
216 .with_shutdown_signal(async move {
217 let _ = signal.stop();
218 })
219 .spawn(async move {
220 handle.stopped().await;
221 Ok(())
222 }))
223 }
224}
225
226impl Default for RpcArgs {
227 fn default() -> Self {
228 Self {
229 rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
230 max_in_flight_requests: 2000,
231 request_timeout_ms: 60_000,
232 slow_request_threshold_ms: 15_000,
233 }
234 }
235}
236
237#[derive(clap::Args, Debug, Clone, Default)]
239pub struct NodeArgs {
240 #[arg(long)]
242 pub fullnode_grpc_url: Option<String>,
243}
244
245pub async fn start_rpc(
261 database_url: Option<Url>,
262 db_args: DbArgs,
263 kv_args: KvArgs,
264 consistent_reader_args: ConsistentReaderArgs,
265 rpc_args: RpcArgs,
266 node_args: NodeArgs,
267 system_package_task_args: SystemPackageTaskArgs,
268 rpc_config: RpcConfig,
269 registry: &Registry,
270) -> anyhow::Result<Service> {
271 let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
272
273 let fullnode_args = node_args
274 .fullnode_grpc_url
275 .as_deref()
276 .map(Url::parse)
277 .transpose()
278 .context("Invalid fullnode gRPC URL")?
279 .map(FullnodeArgs::new)
280 .unwrap_or_default();
281
282 let fullnode_client =
283 FullnodeClient::new(Some("jsonrpc_alt_fullnode"), fullnode_args, registry)
284 .await
285 .context("Failed to create fullnode gRPC client")?;
286
287 let context = Context::new(
288 database_url,
289 db_args,
290 kv_args,
291 consistent_reader_args,
292 fullnode_client.clone(),
293 rpc_config,
294 rpc.metrics(),
295 registry,
296 )
297 .await?;
298
299 let system_package_task = SystemPackageTask::new(
300 system_package_task_args,
301 context.pg_reader().clone(),
302 context.package_resolver().package_store().clone(),
303 );
304
305 rpc.add_module(Checkpoints(context.clone()))?;
306 rpc.add_module(Coins(context.clone()))?;
307 rpc.add_module(DynamicFields(context.clone()))?;
308 rpc.add_module(MoveUtils(context.clone()))?;
309 rpc.add_module(NameService(context.clone()))?;
310 rpc.add_module(Objects(context.clone()))?;
311 rpc.add_module(QueryObjects(context.clone()))?;
312 rpc.add_module(QueryTransactions(context.clone()))?;
313 rpc.add_module(Transactions(context.clone()))?;
314
315 if let Some(_fullnode_client) = fullnode_client {
316 rpc.add_module(Governance::new(context.clone()))?;
317 rpc.add_module(Write::new(context.clone()))?;
318 } else {
319 warn!("No fullnode grpc url provided, Write and Governance modules will not be added.");
320 }
321
322 let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
323 let s_system_package_task = system_package_task.run();
324
325 Ok(s_rpc.attach(s_system_package_task))
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::BTreeSet;
331 use std::net::IpAddr;
332 use std::net::Ipv4Addr;
333 use std::net::SocketAddr;
334 use std::time::Duration;
335
336 use jsonrpsee::core::RpcResult;
337 use jsonrpsee::proc_macros::rpc;
338 use jsonrpsee::types::error::INTERNAL_ERROR_CODE;
339 use jsonrpsee::types::error::METHOD_NOT_FOUND_CODE;
340 use reqwest::Client;
341 use serde_json::Value;
342 use serde_json::json;
343 use sui_open_rpc::Module;
344 use sui_open_rpc_macros::open_rpc;
345 use sui_pg_db::temp::get_available_port;
346
347 use super::*;
348
349 #[tokio::test]
350 async fn test_add_module() {
351 let mut rpc = test_service().await;
352
353 rpc.add_module(Foo).unwrap();
354
355 assert_eq!(
356 BTreeSet::from_iter(rpc.modules.method_names()),
357 BTreeSet::from_iter(["test_bar"]),
358 )
359 }
360
361 #[tokio::test]
362 async fn test_add_module_multiple_methods() {
363 let mut rpc = test_service().await;
364
365 rpc.add_module(Bar).unwrap();
366
367 assert_eq!(
368 BTreeSet::from_iter(rpc.modules.method_names()),
369 BTreeSet::from_iter(["test_bar", "test_baz"]),
370 )
371 }
372
373 #[tokio::test]
374 async fn test_add_multiple_modules() {
375 let mut rpc = test_service().await;
376
377 rpc.add_module(Foo).unwrap();
378 rpc.add_module(Baz).unwrap();
379
380 assert_eq!(
381 BTreeSet::from_iter(rpc.modules.method_names()),
382 BTreeSet::from_iter(["test_bar", "test_baz"]),
383 )
384 }
385
386 #[tokio::test]
387 async fn test_add_module_conflict() {
388 let mut rpc = test_service().await;
389
390 rpc.add_module(Foo).unwrap();
391 assert!(rpc.add_module(Bar).is_err(),)
392 }
393
394 #[tokio::test]
395 async fn test_graceful_shutdown() {
396 let rpc = test_service().await;
397 let svc = rpc.run().await.unwrap();
398
399 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
400 .await
401 .expect("Shutdown should not timeout")
402 .expect("Shutdown should succeed");
403 }
404
405 #[tokio::test]
406 async fn test_rpc_discovery() {
407 let rpc_listen_address = test_listen_address();
408 let mut rpc = RpcService::new(
409 RpcArgs {
410 rpc_listen_address,
411 ..Default::default()
412 },
413 &Registry::new(),
414 )
415 .unwrap();
416
417 rpc.add_module(Foo).unwrap();
418 rpc.add_module(Baz).unwrap();
419
420 let svc = rpc.run().await.unwrap();
421
422 let url = format!("http://{rpc_listen_address}/");
423 let client = Client::new();
424
425 let resp: Value = client
426 .post(&url)
427 .json(&json!({
428 "jsonrpc": "2.0",
429 "method": "rpc.discover",
430 "id": 1,
431 }))
432 .send()
433 .await
434 .expect("Request should succeed")
435 .json()
436 .await
437 .expect("Deserialization should succeed");
438
439 assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
440 assert_eq!(
441 resp["result"]["methods"],
442 json!([
443 {
444 "name": "test_bar",
445 "tags": [{
446 "name": "Test API"
447 }],
448 "params": [],
449 "result": {
450 "name": "u64",
451 "required": true,
452 "schema": {
453 "type": "integer",
454 "format": "uint64",
455 "minimum": 0.0
456 }
457 }
458 },
459 {
460 "name": "test_baz",
461 "tags": [{
462 "name": "Test API"
463 }],
464 "params": [],
465 "result": {
466 "name": "u64",
467 "required": true,
468 "schema": {
469 "type": "integer",
470 "format": "uint64",
471 "minimum": 0.0
472 }
473 }
474 }
475 ])
476 );
477
478 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
479 .await
480 .expect("Shutdown should not timeout")
481 .expect("Shutdown should succeed");
482 }
483
484 #[tokio::test]
485 async fn test_request_metrics() {
486 let rpc_listen_address = test_listen_address();
487 let mut rpc = RpcService::new(
488 RpcArgs {
489 rpc_listen_address,
490 ..Default::default()
491 },
492 &Registry::new(),
493 )
494 .unwrap();
495
496 rpc.add_module(Foo).unwrap();
497
498 let metrics = rpc.metrics();
499 let svc = rpc.run().await.unwrap();
500
501 let url = format!("http://{rpc_listen_address}/");
502 let client = Client::new();
503
504 client
505 .post(&url)
506 .json(&json!({
507 "jsonrpc": "2.0",
508 "method": "test_bar",
509 "id": 1,
510 }))
511 .send()
512 .await
513 .expect("Request should succeed");
514
515 client
516 .post(&url)
517 .json(&json!({
518 "jsonrpc": "2.0",
519 "method": "test_baz",
520 "id": 1,
521 }))
522 .send()
523 .await
524 .expect("Request should succeed");
525
526 assert_eq!(
527 metrics
528 .requests_received
529 .with_label_values(&["test_bar"])
530 .get(),
531 1
532 );
533
534 assert_eq!(
535 metrics
536 .requests_succeeded
537 .with_label_values(&["test_bar"])
538 .get(),
539 1
540 );
541
542 assert_eq!(
543 metrics
544 .requests_received
545 .with_label_values(&["<UNKNOWN>"])
546 .get(),
547 1
548 );
549
550 assert_eq!(
551 metrics
552 .requests_succeeded
553 .with_label_values(&["<UNKNOWN>"])
554 .get(),
555 0
556 );
557
558 assert_eq!(
559 metrics
560 .requests_failed
561 .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
562 .get(),
563 1
564 );
565
566 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
567 .await
568 .expect("Shutdown should not timeout")
569 .expect("Shutdown should succeed");
570 }
571
572 #[tokio::test]
573 async fn test_panic_handling() {
574 let rpc_listen_address = test_listen_address();
575 let mut rpc = RpcService::new(
576 RpcArgs {
577 rpc_listen_address,
578 ..Default::default()
579 },
580 &Registry::new(),
581 )
582 .unwrap();
583
584 rpc.add_module(Panic).unwrap();
585
586 let metrics = rpc.metrics();
587 let svc = rpc.run().await.unwrap();
588
589 let url = format!("http://{rpc_listen_address}/");
590 let client = Client::new();
591
592 let resp = client
593 .post(&url)
594 .json(&json!({
595 "jsonrpc": "2.0",
596 "method": "test_panic",
597 "id": 1,
598 }))
599 .send()
600 .await
601 .expect("Request should succeed");
602
603 let body: Value = resp.json().await.expect("Response should be JSON");
604
605 assert_eq!(body["jsonrpc"], "2.0");
607 assert_eq!(body["error"]["code"], INTERNAL_ERROR_CODE);
608 assert!(body["error"]["message"].as_str().unwrap().contains("Boom!"));
609
610 assert_eq!(metrics.requests_panicked.get(), 1);
612
613 tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
614 .await
615 .expect("Shutdown should not timeout")
616 .expect("Shutdown should succeed");
617 }
618
619 #[open_rpc(namespace = "test", tag = "Test API")]
622 #[rpc(server, namespace = "test")]
623 trait FooApi {
624 #[method(name = "bar")]
625 fn bar(&self) -> RpcResult<u64>;
626 }
627
628 #[open_rpc(namespace = "test", tag = "Test API")]
629 #[rpc(server, namespace = "test")]
630 trait BarApi {
631 #[method(name = "bar")]
632 fn bar(&self) -> RpcResult<u64>;
633
634 #[method(name = "baz")]
635 fn baz(&self) -> RpcResult<u64>;
636 }
637
638 #[open_rpc(namespace = "test", tag = "Test API")]
639 #[rpc(server, namespace = "test")]
640 trait BazApi {
641 #[method(name = "baz")]
642 fn baz(&self) -> RpcResult<u64>;
643 }
644
645 #[open_rpc(namespace = "test", tag = "Test API")]
646 #[rpc(server, namespace = "test")]
647 trait PanicApi {
648 #[method(name = "panic")]
649 fn panic(&self) -> RpcResult<u64>;
650 }
651
652 struct Foo;
653 struct Bar;
654 struct Baz;
655 struct Panic;
656
657 impl FooApiServer for Foo {
658 fn bar(&self) -> RpcResult<u64> {
659 Ok(42)
660 }
661 }
662
663 impl BarApiServer for Bar {
664 fn bar(&self) -> RpcResult<u64> {
665 Ok(43)
666 }
667
668 fn baz(&self) -> RpcResult<u64> {
669 Ok(44)
670 }
671 }
672
673 impl BazApiServer for Baz {
674 fn baz(&self) -> RpcResult<u64> {
675 Ok(45)
676 }
677 }
678
679 impl PanicApiServer for Panic {
680 fn panic(&self) -> RpcResult<u64> {
681 panic!("Boom!");
682 }
683 }
684
685 impl RpcModule for Foo {
686 fn schema(&self) -> Module {
687 FooApiOpenRpc::module_doc()
688 }
689
690 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
691 self.into_rpc()
692 }
693 }
694
695 impl RpcModule for Bar {
696 fn schema(&self) -> Module {
697 BarApiOpenRpc::module_doc()
698 }
699
700 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
701 self.into_rpc()
702 }
703 }
704
705 impl RpcModule for Baz {
706 fn schema(&self) -> Module {
707 BazApiOpenRpc::module_doc()
708 }
709
710 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
711 self.into_rpc()
712 }
713 }
714
715 impl RpcModule for Panic {
716 fn schema(&self) -> Module {
717 PanicApiOpenRpc::module_doc()
718 }
719
720 fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
721 self.into_rpc()
722 }
723 }
724
725 fn test_listen_address() -> SocketAddr {
726 let port = get_available_port();
727 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
728 }
729
730 async fn test_service() -> RpcService {
731 RpcService::new(
732 RpcArgs {
733 rpc_listen_address: test_listen_address(),
734 ..Default::default()
735 },
736 &Registry::new(),
737 )
738 .expect("Failed to create test JSON-RPC service")
739 }
740}