sui_indexer_alt_jsonrpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use jsonrpsee::server::BatchRequestConfig;
10use jsonrpsee::server::RpcServiceBuilder;
11use jsonrpsee::server::ServerBuilder;
12use prometheus::Registry;
13use serde_json::json;
14use sui_futures::service::Service;
15use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
16use sui_indexer_alt_reader::fullnode_client::FullnodeArgs;
17use sui_indexer_alt_reader::fullnode_client::FullnodeClient;
18use sui_indexer_alt_reader::kv_loader::KvArgs;
19use sui_indexer_alt_reader::pg_reader::db::DbArgs;
20use sui_indexer_alt_reader::system_package_task::SystemPackageTask;
21use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
22use sui_open_rpc::Project;
23use tower_http::catch_panic;
24use tower_layer::Identity;
25use tracing::info;
26use tracing::warn;
27use url::Url;
28
29use crate::api::checkpoints::Checkpoints;
30use crate::api::coin::Coins;
31use crate::api::dynamic_fields::DynamicFields;
32use crate::api::governance::Governance;
33use crate::api::move_utils::MoveUtils;
34use crate::api::name_service::NameService;
35use crate::api::objects::Objects;
36use crate::api::objects::QueryObjects;
37use crate::api::rpc_module::RpcModule;
38use crate::api::transactions::QueryTransactions;
39use crate::api::transactions::Transactions;
40use crate::api::write::Write;
41use crate::config::RpcConfig;
42use crate::context::Context;
43use crate::error::PanicHandler;
44use crate::metrics::RpcMetrics;
45use crate::metrics::middleware::MetricsLayer;
46use crate::timeout::TimeoutLayer;
47
48pub mod api;
49pub mod args;
50pub mod config;
51mod context;
52pub mod data;
53mod error;
54mod metrics;
55mod paginate;
56mod timeout;
57
58#[derive(clap::Args, Debug, Clone)]
59pub struct RpcArgs {
60    /// Address to listen to for incoming JSON-RPC connections.
61    #[clap(long, default_value_t = Self::default().rpc_listen_address)]
62    pub rpc_listen_address: SocketAddr,
63
64    /// The maximum number of concurrent requests to accept. If the service receives more than this
65    /// many requests, it will start responding with 429.
66    #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
67    pub max_in_flight_requests: u32,
68
69    /// Requests that take longer than this (in milliseconds) to respond to will be terminated, and
70    /// the query itself will be logged as a warning.
71    #[clap(long, default_value_t = Self::default().request_timeout_ms)]
72    pub request_timeout_ms: u64,
73
74    /// Requests that take longer than this (in milliseconds) will be logged even if they succeed.
75    /// This should be shorter than `request_timeout_ms`.
76    #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
77    pub slow_request_threshold_ms: u64,
78}
79
80pub struct RpcService {
81    /// The address that the server will start listening for requests on, when it is run.
82    rpc_listen_address: SocketAddr,
83
84    /// A partially built/configured JSON-RPC server.
85    server: ServerBuilder<Identity, Identity>,
86
87    /// Metrics for the RPC service.
88    metrics: Arc<RpcMetrics>,
89
90    /// Maximum time a request can take to complete.
91    request_timeout: Duration,
92
93    /// Threshold for logging slow requests.
94    slow_request_threshold: Duration,
95
96    /// All the methods added to the server so far.
97    modules: jsonrpsee::RpcModule<()>,
98
99    /// Description of the schema served by this service.
100    schema: Project,
101}
102
103impl RpcArgs {
104    /// Requests that take longer than this are terminated and logged for debugging.
105    fn request_timeout(&self) -> Duration {
106        Duration::from_millis(self.request_timeout_ms)
107    }
108
109    /// Requests that take longer than this are logged for debugging even if they succeed.
110    /// This threshold should be lower than the request timeout threshold.
111    fn slow_request_threshold(&self) -> Duration {
112        Duration::from_millis(self.slow_request_threshold_ms)
113    }
114}
115
116impl RpcService {
117    /// Create a new instance of the JSON-RPC service, configured by `rpc_args`. The service will
118    /// not accept connections until [Self::run] is called.
119    pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
120        let metrics = RpcMetrics::new(registry);
121
122        let server = ServerBuilder::new()
123            .http_only()
124            // `jsonrpsee` calls this a limit on connections, but it is implemented as a limit on
125            // requests.
126            .max_connections(rpc_args.max_in_flight_requests)
127            .max_response_body_size(u32::MAX)
128            .set_batch_request_config(BatchRequestConfig::Disabled);
129
130        let schema = Project::new(
131            env!("CARGO_PKG_VERSION"),
132            "Sui JSON-RPC",
133            "A JSON-RPC API for interacting with the Sui blockchain.",
134            "Mysten Labs",
135            "https://mystenlabs.com",
136            "build@mystenlabs.com",
137            "Apache-2.0",
138            "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
139        );
140
141        Ok(Self {
142            rpc_listen_address: rpc_args.rpc_listen_address,
143            server,
144            metrics,
145            request_timeout: rpc_args.request_timeout(),
146            slow_request_threshold: rpc_args.slow_request_threshold(),
147            modules: jsonrpsee::RpcModule::new(()),
148            schema,
149        })
150    }
151
152    /// Return a copy of the metrics.
153    pub fn metrics(&self) -> Arc<RpcMetrics> {
154        self.metrics.clone()
155    }
156
157    /// Add an `RpcModule` to the service. The module's methods are combined with the existing
158    /// methods registered on the service, and the operation will fail if there is any overlap.
159    pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
160        self.schema.add_module(module.schema());
161        self.modules
162            .merge(module.into_impl().remove_context())
163            .context("Failed to add module because of a name conflict")
164    }
165
166    /// Start the service (it will accept connections) and return a handle that tracks the
167    /// lifecycle of the service.
168    pub async fn run(self) -> anyhow::Result<Service> {
169        let Self {
170            rpc_listen_address,
171            server,
172            metrics,
173            request_timeout,
174            slow_request_threshold,
175            mut modules,
176            schema,
177        } = self;
178
179        info!("Starting JSON-RPC service on {rpc_listen_address}",);
180        info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
181
182        // Add a method to serve the schema to clients.
183        modules
184            .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
185            .context("Failed to add schema discovery method")?;
186
187        let middleware = RpcServiceBuilder::new()
188            .layer(TimeoutLayer::new(request_timeout))
189            .layer(MetricsLayer::new(
190                metrics.clone(),
191                modules.method_names().map(|n| n.to_owned()).collect(),
192                slow_request_threshold,
193            ));
194
195        let handle = server
196            .set_rpc_middleware(middleware)
197            .set_http_middleware(
198                tower::builder::ServiceBuilder::new()
199                    .layer(
200                        tower_http::cors::CorsLayer::new()
201                            .allow_methods([http::Method::GET, http::Method::POST])
202                            .allow_origin(tower_http::cors::Any)
203                            .allow_headers(tower_http::cors::Any),
204                    )
205                    .layer(catch_panic::CatchPanicLayer::custom(PanicHandler::new(
206                        metrics,
207                    ))),
208            )
209            .build(rpc_listen_address)
210            .await
211            .context("Failed to bind JSON-RPC service")?
212            .start(modules);
213
214        let signal = handle.clone();
215        Ok(Service::new()
216            .with_shutdown_signal(async move {
217                let _ = signal.stop();
218            })
219            .spawn(async move {
220                handle.stopped().await;
221                Ok(())
222            }))
223    }
224}
225
226impl Default for RpcArgs {
227    fn default() -> Self {
228        Self {
229            rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
230            max_in_flight_requests: 2000,
231            request_timeout_ms: 60_000,
232            slow_request_threshold_ms: 15_000,
233        }
234    }
235}
236
237/// Configuration for the fullnode RPC that this service will connect to.
238#[derive(clap::Args, Debug, Clone, Default)]
239pub struct NodeArgs {
240    /// The URL of the fullnode gRPC service, used for transaction execution and dry-running.
241    #[arg(long)]
242    pub fullnode_grpc_url: Option<String>,
243}
244
245/// Set-up and run the RPC service, using the provided arguments (expected to be extracted from the
246/// command-line).
247///
248/// Access to most reads is controlled by the `database_url` -- if it is `None`, reads will not
249/// work.
250///
251/// KV queries can optionally be served by a Bigtable instance, if `bigtable_instance` is provided.
252/// Otherwise these requests are served by the database. If a `bigtable_instance` is provided, the
253/// `GOOGLE_APPLICATION_CREDENTIALS` environment variable must point to the credentials JSON file.
254///
255/// Access to writes (executing and dry-running transactions) is controlled by
256/// `node_args.fullnode_grpc_url`, which can be omitted to disable writes from this RPC.
257///
258/// The service may spin up auxiliary services (such as the system package task) to support itself,
259/// and will clean these up on shutdown as well.
260pub async fn start_rpc(
261    database_url: Option<Url>,
262    db_args: DbArgs,
263    kv_args: KvArgs,
264    consistent_reader_args: ConsistentReaderArgs,
265    rpc_args: RpcArgs,
266    node_args: NodeArgs,
267    system_package_task_args: SystemPackageTaskArgs,
268    rpc_config: RpcConfig,
269    registry: &Registry,
270) -> anyhow::Result<Service> {
271    let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
272
273    let fullnode_args = node_args
274        .fullnode_grpc_url
275        .as_deref()
276        .map(Url::parse)
277        .transpose()
278        .context("Invalid fullnode gRPC URL")?
279        .map(FullnodeArgs::new)
280        .unwrap_or_default();
281
282    let fullnode_client =
283        FullnodeClient::new(Some("jsonrpc_alt_fullnode"), fullnode_args, registry)
284            .await
285            .context("Failed to create fullnode gRPC client")?;
286
287    let context = Context::new(
288        database_url,
289        db_args,
290        kv_args,
291        consistent_reader_args,
292        fullnode_client.clone(),
293        rpc_config,
294        rpc.metrics(),
295        registry,
296    )
297    .await?;
298
299    let system_package_task = SystemPackageTask::new(
300        system_package_task_args,
301        context.pg_reader().clone(),
302        context.package_resolver().package_store().clone(),
303    );
304
305    rpc.add_module(Checkpoints(context.clone()))?;
306    rpc.add_module(Coins(context.clone()))?;
307    rpc.add_module(DynamicFields(context.clone()))?;
308    rpc.add_module(MoveUtils(context.clone()))?;
309    rpc.add_module(NameService(context.clone()))?;
310    rpc.add_module(Objects(context.clone()))?;
311    rpc.add_module(QueryObjects(context.clone()))?;
312    rpc.add_module(QueryTransactions(context.clone()))?;
313    rpc.add_module(Transactions(context.clone()))?;
314
315    if let Some(_fullnode_client) = fullnode_client {
316        rpc.add_module(Governance::new(context.clone()))?;
317        rpc.add_module(Write::new(context.clone()))?;
318    } else {
319        warn!("No fullnode grpc url provided, Write and Governance modules will not be added.");
320    }
321
322    let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
323    let s_system_package_task = system_package_task.run();
324
325    Ok(s_rpc.attach(s_system_package_task))
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::BTreeSet;
331    use std::net::IpAddr;
332    use std::net::Ipv4Addr;
333    use std::net::SocketAddr;
334    use std::time::Duration;
335
336    use jsonrpsee::core::RpcResult;
337    use jsonrpsee::proc_macros::rpc;
338    use jsonrpsee::types::error::INTERNAL_ERROR_CODE;
339    use jsonrpsee::types::error::METHOD_NOT_FOUND_CODE;
340    use reqwest::Client;
341    use serde_json::Value;
342    use serde_json::json;
343    use sui_open_rpc::Module;
344    use sui_open_rpc_macros::open_rpc;
345    use sui_pg_db::temp::get_available_port;
346
347    use super::*;
348
349    #[tokio::test]
350    async fn test_add_module() {
351        let mut rpc = test_service().await;
352
353        rpc.add_module(Foo).unwrap();
354
355        assert_eq!(
356            BTreeSet::from_iter(rpc.modules.method_names()),
357            BTreeSet::from_iter(["test_bar"]),
358        )
359    }
360
361    #[tokio::test]
362    async fn test_add_module_multiple_methods() {
363        let mut rpc = test_service().await;
364
365        rpc.add_module(Bar).unwrap();
366
367        assert_eq!(
368            BTreeSet::from_iter(rpc.modules.method_names()),
369            BTreeSet::from_iter(["test_bar", "test_baz"]),
370        )
371    }
372
373    #[tokio::test]
374    async fn test_add_multiple_modules() {
375        let mut rpc = test_service().await;
376
377        rpc.add_module(Foo).unwrap();
378        rpc.add_module(Baz).unwrap();
379
380        assert_eq!(
381            BTreeSet::from_iter(rpc.modules.method_names()),
382            BTreeSet::from_iter(["test_bar", "test_baz"]),
383        )
384    }
385
386    #[tokio::test]
387    async fn test_add_module_conflict() {
388        let mut rpc = test_service().await;
389
390        rpc.add_module(Foo).unwrap();
391        assert!(rpc.add_module(Bar).is_err(),)
392    }
393
394    #[tokio::test]
395    async fn test_graceful_shutdown() {
396        let rpc = test_service().await;
397        let svc = rpc.run().await.unwrap();
398
399        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
400            .await
401            .expect("Shutdown should not timeout")
402            .expect("Shutdown should succeed");
403    }
404
405    #[tokio::test]
406    async fn test_rpc_discovery() {
407        let rpc_listen_address = test_listen_address();
408        let mut rpc = RpcService::new(
409            RpcArgs {
410                rpc_listen_address,
411                ..Default::default()
412            },
413            &Registry::new(),
414        )
415        .unwrap();
416
417        rpc.add_module(Foo).unwrap();
418        rpc.add_module(Baz).unwrap();
419
420        let svc = rpc.run().await.unwrap();
421
422        let url = format!("http://{rpc_listen_address}/");
423        let client = Client::new();
424
425        let resp: Value = client
426            .post(&url)
427            .json(&json!({
428                "jsonrpc": "2.0",
429                "method": "rpc.discover",
430                "id": 1,
431            }))
432            .send()
433            .await
434            .expect("Request should succeed")
435            .json()
436            .await
437            .expect("Deserialization should succeed");
438
439        assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
440        assert_eq!(
441            resp["result"]["methods"],
442            json!([
443                {
444                    "name": "test_bar",
445                    "tags": [{
446                        "name": "Test API"
447                    }],
448                    "params": [],
449                    "result": {
450                        "name": "u64",
451                        "required": true,
452                        "schema": {
453                            "type": "integer",
454                            "format": "uint64",
455                            "minimum": 0.0
456                        }
457                    }
458                },
459                {
460                    "name": "test_baz",
461                    "tags": [{
462                        "name": "Test API"
463                    }],
464                    "params": [],
465                    "result": {
466                        "name": "u64",
467                        "required": true,
468                        "schema": {
469                            "type": "integer",
470                            "format": "uint64",
471                            "minimum": 0.0
472                        }
473                    }
474                }
475            ])
476        );
477
478        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
479            .await
480            .expect("Shutdown should not timeout")
481            .expect("Shutdown should succeed");
482    }
483
484    #[tokio::test]
485    async fn test_request_metrics() {
486        let rpc_listen_address = test_listen_address();
487        let mut rpc = RpcService::new(
488            RpcArgs {
489                rpc_listen_address,
490                ..Default::default()
491            },
492            &Registry::new(),
493        )
494        .unwrap();
495
496        rpc.add_module(Foo).unwrap();
497
498        let metrics = rpc.metrics();
499        let svc = rpc.run().await.unwrap();
500
501        let url = format!("http://{rpc_listen_address}/");
502        let client = Client::new();
503
504        client
505            .post(&url)
506            .json(&json!({
507                "jsonrpc": "2.0",
508                "method": "test_bar",
509                "id": 1,
510            }))
511            .send()
512            .await
513            .expect("Request should succeed");
514
515        client
516            .post(&url)
517            .json(&json!({
518                "jsonrpc": "2.0",
519                "method": "test_baz",
520                "id": 1,
521            }))
522            .send()
523            .await
524            .expect("Request should succeed");
525
526        assert_eq!(
527            metrics
528                .requests_received
529                .with_label_values(&["test_bar"])
530                .get(),
531            1
532        );
533
534        assert_eq!(
535            metrics
536                .requests_succeeded
537                .with_label_values(&["test_bar"])
538                .get(),
539            1
540        );
541
542        assert_eq!(
543            metrics
544                .requests_received
545                .with_label_values(&["<UNKNOWN>"])
546                .get(),
547            1
548        );
549
550        assert_eq!(
551            metrics
552                .requests_succeeded
553                .with_label_values(&["<UNKNOWN>"])
554                .get(),
555            0
556        );
557
558        assert_eq!(
559            metrics
560                .requests_failed
561                .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
562                .get(),
563            1
564        );
565
566        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
567            .await
568            .expect("Shutdown should not timeout")
569            .expect("Shutdown should succeed");
570    }
571
572    #[tokio::test]
573    async fn test_panic_handling() {
574        let rpc_listen_address = test_listen_address();
575        let mut rpc = RpcService::new(
576            RpcArgs {
577                rpc_listen_address,
578                ..Default::default()
579            },
580            &Registry::new(),
581        )
582        .unwrap();
583
584        rpc.add_module(Panic).unwrap();
585
586        let metrics = rpc.metrics();
587        let svc = rpc.run().await.unwrap();
588
589        let url = format!("http://{rpc_listen_address}/");
590        let client = Client::new();
591
592        let resp = client
593            .post(&url)
594            .json(&json!({
595                "jsonrpc": "2.0",
596                "method": "test_panic",
597                "id": 1,
598            }))
599            .send()
600            .await
601            .expect("Request should succeed");
602
603        let body: Value = resp.json().await.expect("Response should be JSON");
604
605        // Verify the response is a JSON-RPC error
606        assert_eq!(body["jsonrpc"], "2.0");
607        assert_eq!(body["error"]["code"], INTERNAL_ERROR_CODE);
608        assert!(body["error"]["message"].as_str().unwrap().contains("Boom!"));
609
610        // Verify the panic is recorded in metrics
611        assert_eq!(metrics.requests_panicked.get(), 1);
612
613        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
614            .await
615            .expect("Shutdown should not timeout")
616            .expect("Shutdown should succeed");
617    }
618
619    // Test Helpers
620
621    #[open_rpc(namespace = "test", tag = "Test API")]
622    #[rpc(server, namespace = "test")]
623    trait FooApi {
624        #[method(name = "bar")]
625        fn bar(&self) -> RpcResult<u64>;
626    }
627
628    #[open_rpc(namespace = "test", tag = "Test API")]
629    #[rpc(server, namespace = "test")]
630    trait BarApi {
631        #[method(name = "bar")]
632        fn bar(&self) -> RpcResult<u64>;
633
634        #[method(name = "baz")]
635        fn baz(&self) -> RpcResult<u64>;
636    }
637
638    #[open_rpc(namespace = "test", tag = "Test API")]
639    #[rpc(server, namespace = "test")]
640    trait BazApi {
641        #[method(name = "baz")]
642        fn baz(&self) -> RpcResult<u64>;
643    }
644
645    #[open_rpc(namespace = "test", tag = "Test API")]
646    #[rpc(server, namespace = "test")]
647    trait PanicApi {
648        #[method(name = "panic")]
649        fn panic(&self) -> RpcResult<u64>;
650    }
651
652    struct Foo;
653    struct Bar;
654    struct Baz;
655    struct Panic;
656
657    impl FooApiServer for Foo {
658        fn bar(&self) -> RpcResult<u64> {
659            Ok(42)
660        }
661    }
662
663    impl BarApiServer for Bar {
664        fn bar(&self) -> RpcResult<u64> {
665            Ok(43)
666        }
667
668        fn baz(&self) -> RpcResult<u64> {
669            Ok(44)
670        }
671    }
672
673    impl BazApiServer for Baz {
674        fn baz(&self) -> RpcResult<u64> {
675            Ok(45)
676        }
677    }
678
679    impl PanicApiServer for Panic {
680        fn panic(&self) -> RpcResult<u64> {
681            panic!("Boom!");
682        }
683    }
684
685    impl RpcModule for Foo {
686        fn schema(&self) -> Module {
687            FooApiOpenRpc::module_doc()
688        }
689
690        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
691            self.into_rpc()
692        }
693    }
694
695    impl RpcModule for Bar {
696        fn schema(&self) -> Module {
697            BarApiOpenRpc::module_doc()
698        }
699
700        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
701            self.into_rpc()
702        }
703    }
704
705    impl RpcModule for Baz {
706        fn schema(&self) -> Module {
707            BazApiOpenRpc::module_doc()
708        }
709
710        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
711            self.into_rpc()
712        }
713    }
714
715    impl RpcModule for Panic {
716        fn schema(&self) -> Module {
717            PanicApiOpenRpc::module_doc()
718        }
719
720        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
721            self.into_rpc()
722        }
723    }
724
725    fn test_listen_address() -> SocketAddr {
726        let port = get_available_port();
727        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
728    }
729
730    async fn test_service() -> RpcService {
731        RpcService::new(
732            RpcArgs {
733                rpc_listen_address: test_listen_address(),
734                ..Default::default()
735            },
736            &Registry::new(),
737        )
738        .expect("Failed to create test JSON-RPC service")
739    }
740}