sui_indexer_alt_jsonrpc/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::time::Duration;
7
8use anyhow::Context as _;
9use jsonrpsee::server::BatchRequestConfig;
10use jsonrpsee::server::RpcServiceBuilder;
11use jsonrpsee::server::ServerBuilder;
12use prometheus::Registry;
13use serde_json::json;
14use sui_futures::service::Service;
15use sui_indexer_alt_reader::bigtable_reader::BigtableArgs;
16use sui_indexer_alt_reader::consistent_reader::ConsistentReaderArgs;
17use sui_indexer_alt_reader::pg_reader::db::DbArgs;
18use sui_indexer_alt_reader::system_package_task::SystemPackageTask;
19use sui_indexer_alt_reader::system_package_task::SystemPackageTaskArgs;
20use sui_open_rpc::Project;
21use tower_http::catch_panic;
22use tower_layer::Identity;
23use tracing::info;
24use tracing::warn;
25use url::Url;
26
27use crate::api::checkpoints::Checkpoints;
28use crate::api::coin::Coins;
29use crate::api::dynamic_fields::DynamicFields;
30use crate::api::governance::DelegationGovernance;
31use crate::api::governance::Governance;
32use crate::api::move_utils::MoveUtils;
33use crate::api::name_service::NameService;
34use crate::api::objects::Objects;
35use crate::api::objects::QueryObjects;
36use crate::api::rpc_module::RpcModule;
37use crate::api::transactions::QueryTransactions;
38use crate::api::transactions::Transactions;
39use crate::api::write::Write;
40use crate::config::RpcConfig;
41use crate::context::Context;
42use crate::error::PanicHandler;
43use crate::metrics::RpcMetrics;
44use crate::metrics::middleware::MetricsLayer;
45use crate::timeout::TimeoutLayer;
46
47pub mod api;
48pub mod args;
49pub mod config;
50mod context;
51pub mod data;
52mod error;
53mod metrics;
54mod paginate;
55mod timeout;
56
57#[derive(clap::Args, Debug, Clone)]
58pub struct RpcArgs {
59    /// Address to listen to for incoming JSON-RPC connections.
60    #[clap(long, default_value_t = Self::default().rpc_listen_address)]
61    pub rpc_listen_address: SocketAddr,
62
63    /// The maximum number of concurrent requests to accept. If the service receives more than this
64    /// many requests, it will start responding with 429.
65    #[clap(long, default_value_t = Self::default().max_in_flight_requests)]
66    pub max_in_flight_requests: u32,
67
68    /// Requests that take longer than this (in milliseconds) to respond to will be terminated, and
69    /// the query itself will be logged as a warning.
70    #[clap(long, default_value_t = Self::default().request_timeout_ms)]
71    pub request_timeout_ms: u64,
72
73    /// Requests that take longer than this (in milliseconds) will be logged even if they succeed.
74    /// This should be shorter than `request_timeout_ms`.
75    #[clap(long, default_value_t = Self::default().slow_request_threshold_ms)]
76    pub slow_request_threshold_ms: u64,
77}
78
79pub struct RpcService {
80    /// The address that the server will start listening for requests on, when it is run.
81    rpc_listen_address: SocketAddr,
82
83    /// A partially built/configured JSON-RPC server.
84    server: ServerBuilder<Identity, Identity>,
85
86    /// Metrics for the RPC service.
87    metrics: Arc<RpcMetrics>,
88
89    /// Maximum time a request can take to complete.
90    request_timeout: Duration,
91
92    /// Threshold for logging slow requests.
93    slow_request_threshold: Duration,
94
95    /// All the methods added to the server so far.
96    modules: jsonrpsee::RpcModule<()>,
97
98    /// Description of the schema served by this service.
99    schema: Project,
100}
101
102impl RpcArgs {
103    /// Requests that take longer than this are terminated and logged for debugging.
104    fn request_timeout(&self) -> Duration {
105        Duration::from_millis(self.request_timeout_ms)
106    }
107
108    /// Requests that take longer than this are logged for debugging even if they succeed.
109    /// This threshold should be lower than the request timeout threshold.
110    fn slow_request_threshold(&self) -> Duration {
111        Duration::from_millis(self.slow_request_threshold_ms)
112    }
113}
114
115impl RpcService {
116    /// Create a new instance of the JSON-RPC service, configured by `rpc_args`. The service will
117    /// not accept connections until [Self::run] is called.
118    pub fn new(rpc_args: RpcArgs, registry: &Registry) -> anyhow::Result<Self> {
119        let metrics = RpcMetrics::new(registry);
120
121        let server = ServerBuilder::new()
122            .http_only()
123            // `jsonrpsee` calls this a limit on connections, but it is implemented as a limit on
124            // requests.
125            .max_connections(rpc_args.max_in_flight_requests)
126            .max_response_body_size(u32::MAX)
127            .set_batch_request_config(BatchRequestConfig::Disabled);
128
129        let schema = Project::new(
130            env!("CARGO_PKG_VERSION"),
131            "Sui JSON-RPC",
132            "A JSON-RPC API for interacting with the Sui blockchain.",
133            "Mysten Labs",
134            "https://mystenlabs.com",
135            "build@mystenlabs.com",
136            "Apache-2.0",
137            "https://raw.githubusercontent.com/MystenLabs/sui/main/LICENSE",
138        );
139
140        Ok(Self {
141            rpc_listen_address: rpc_args.rpc_listen_address,
142            server,
143            metrics,
144            request_timeout: rpc_args.request_timeout(),
145            slow_request_threshold: rpc_args.slow_request_threshold(),
146            modules: jsonrpsee::RpcModule::new(()),
147            schema,
148        })
149    }
150
151    /// Return a copy of the metrics.
152    pub fn metrics(&self) -> Arc<RpcMetrics> {
153        self.metrics.clone()
154    }
155
156    /// Add an `RpcModule` to the service. The module's methods are combined with the existing
157    /// methods registered on the service, and the operation will fail if there is any overlap.
158    pub fn add_module(&mut self, module: impl RpcModule) -> anyhow::Result<()> {
159        self.schema.add_module(module.schema());
160        self.modules
161            .merge(module.into_impl().remove_context())
162            .context("Failed to add module because of a name conflict")
163    }
164
165    /// Start the service (it will accept connections) and return a handle that tracks the
166    /// lifecycle of the service.
167    pub async fn run(self) -> anyhow::Result<Service> {
168        let Self {
169            rpc_listen_address,
170            server,
171            metrics,
172            request_timeout,
173            slow_request_threshold,
174            mut modules,
175            schema,
176        } = self;
177
178        info!("Starting JSON-RPC service on {rpc_listen_address}",);
179        info!("Serving schema: {}", serde_json::to_string_pretty(&schema)?);
180
181        // Add a method to serve the schema to clients.
182        modules
183            .register_method("rpc.discover", move |_, _, _| json!(schema.clone()))
184            .context("Failed to add schema discovery method")?;
185
186        let middleware = RpcServiceBuilder::new()
187            .layer(TimeoutLayer::new(request_timeout))
188            .layer(MetricsLayer::new(
189                metrics.clone(),
190                modules.method_names().map(|n| n.to_owned()).collect(),
191                slow_request_threshold,
192            ));
193
194        let handle = server
195            .set_rpc_middleware(middleware)
196            .set_http_middleware(
197                tower::builder::ServiceBuilder::new()
198                    .layer(
199                        tower_http::cors::CorsLayer::new()
200                            .allow_methods([http::Method::GET, http::Method::POST])
201                            .allow_origin(tower_http::cors::Any)
202                            .allow_headers(tower_http::cors::Any),
203                    )
204                    .layer(catch_panic::CatchPanicLayer::custom(PanicHandler::new(
205                        metrics,
206                    ))),
207            )
208            .build(rpc_listen_address)
209            .await
210            .context("Failed to bind JSON-RPC service")?
211            .start(modules);
212
213        let signal = handle.clone();
214        Ok(Service::new()
215            .with_shutdown_signal(async move {
216                let _ = signal.stop();
217            })
218            .spawn(async move {
219                handle.stopped().await;
220                Ok(())
221            }))
222    }
223}
224
225impl Default for RpcArgs {
226    fn default() -> Self {
227        Self {
228            rpc_listen_address: "0.0.0.0:6000".parse().unwrap(),
229            max_in_flight_requests: 2000,
230            request_timeout_ms: 60_000,
231            slow_request_threshold_ms: 15_000,
232        }
233    }
234}
235
236#[derive(clap::Args, Debug, Clone, Default)]
237pub struct NodeArgs {
238    /// The URL of the fullnode RPC we connect to for transaction execution,
239    /// dry-running, and delegation coin queries etc.
240    #[arg(long)]
241    pub fullnode_rpc_url: Option<url::Url>,
242}
243
244/// Set-up and run the RPC service, using the provided arguments (expected to be extracted from the
245/// command-line).
246///
247/// Access to most reads is controlled by the `database_url` -- if it is `None`, reads will not
248/// work. The only exception is the `DelegationGovernance` module, which is controlled by
249/// `node_args.fullnode_rpc_url`, which can be omitted to disable reads from this RPC.
250///
251/// KV queries can optionally be served by a Bigtable instance, if `bigtable_instance` is provided.
252/// Otherwise these requests are served by the database. If a `bigtable_instance` is provided, the
253/// `GOOGLE_APPLICATION_CREDENTIALS` environment variable must point to the credentials JSON file.
254///
255/// Access to writes (executing and dry-running transactions) is controlled by
256/// `node_args.fullnode_rpc_url`, which can be omitted to disable writes from this RPC.
257///
258/// The service may spin up auxiliary services (such as the system package task) to support itself,
259/// and will clean these up on shutdown as well.
260pub async fn start_rpc(
261    database_url: Option<Url>,
262    bigtable_instance: Option<String>,
263    db_args: DbArgs,
264    bigtable_args: BigtableArgs,
265    consistent_reader_args: ConsistentReaderArgs,
266    rpc_args: RpcArgs,
267    node_args: NodeArgs,
268    system_package_task_args: SystemPackageTaskArgs,
269    rpc_config: RpcConfig,
270    registry: &Registry,
271) -> anyhow::Result<Service> {
272    let mut rpc = RpcService::new(rpc_args, registry).context("Failed to create RPC service")?;
273
274    let context = Context::new(
275        database_url,
276        bigtable_instance,
277        db_args,
278        bigtable_args,
279        consistent_reader_args,
280        rpc_config,
281        rpc.metrics(),
282        registry,
283    )
284    .await?;
285
286    let system_package_task = SystemPackageTask::new(
287        system_package_task_args,
288        context.pg_reader().clone(),
289        context.package_resolver().package_store().clone(),
290    );
291
292    rpc.add_module(Checkpoints(context.clone()))?;
293    rpc.add_module(Coins(context.clone()))?;
294    rpc.add_module(DynamicFields(context.clone()))?;
295    rpc.add_module(Governance(context.clone()))?;
296    rpc.add_module(MoveUtils(context.clone()))?;
297    rpc.add_module(NameService(context.clone()))?;
298    rpc.add_module(Objects(context.clone()))?;
299    rpc.add_module(QueryObjects(context.clone()))?;
300    rpc.add_module(QueryTransactions(context.clone()))?;
301    rpc.add_module(Transactions(context.clone()))?;
302
303    if let Some(fullnode_rpc_url) = node_args.fullnode_rpc_url {
304        let client = context.config().node.client(fullnode_rpc_url)?;
305        rpc.add_module(DelegationGovernance::new(client.clone()))?;
306        rpc.add_module(Write::new(client))?;
307    } else {
308        warn!(
309            "No fullnode rpc url provided, DelegationGovernance and Write modules will not be added."
310        );
311    }
312
313    let s_rpc = rpc.run().await.context("Failed to start RPC service")?;
314    let s_system_package_task = system_package_task.run();
315
316    Ok(s_rpc.attach(s_system_package_task))
317}
318
319#[cfg(test)]
320mod tests {
321    use std::collections::BTreeSet;
322    use std::net::IpAddr;
323    use std::net::Ipv4Addr;
324    use std::net::SocketAddr;
325    use std::time::Duration;
326
327    use jsonrpsee::core::RpcResult;
328    use jsonrpsee::proc_macros::rpc;
329    use jsonrpsee::types::error::INTERNAL_ERROR_CODE;
330    use jsonrpsee::types::error::METHOD_NOT_FOUND_CODE;
331    use reqwest::Client;
332    use serde_json::Value;
333    use serde_json::json;
334    use sui_open_rpc::Module;
335    use sui_open_rpc_macros::open_rpc;
336    use sui_pg_db::temp::get_available_port;
337
338    use super::*;
339
340    #[tokio::test]
341    async fn test_add_module() {
342        let mut rpc = test_service().await;
343
344        rpc.add_module(Foo).unwrap();
345
346        assert_eq!(
347            BTreeSet::from_iter(rpc.modules.method_names()),
348            BTreeSet::from_iter(["test_bar"]),
349        )
350    }
351
352    #[tokio::test]
353    async fn test_add_module_multiple_methods() {
354        let mut rpc = test_service().await;
355
356        rpc.add_module(Bar).unwrap();
357
358        assert_eq!(
359            BTreeSet::from_iter(rpc.modules.method_names()),
360            BTreeSet::from_iter(["test_bar", "test_baz"]),
361        )
362    }
363
364    #[tokio::test]
365    async fn test_add_multiple_modules() {
366        let mut rpc = test_service().await;
367
368        rpc.add_module(Foo).unwrap();
369        rpc.add_module(Baz).unwrap();
370
371        assert_eq!(
372            BTreeSet::from_iter(rpc.modules.method_names()),
373            BTreeSet::from_iter(["test_bar", "test_baz"]),
374        )
375    }
376
377    #[tokio::test]
378    async fn test_add_module_conflict() {
379        let mut rpc = test_service().await;
380
381        rpc.add_module(Foo).unwrap();
382        assert!(rpc.add_module(Bar).is_err(),)
383    }
384
385    #[tokio::test]
386    async fn test_graceful_shutdown() {
387        let rpc = test_service().await;
388        let svc = rpc.run().await.unwrap();
389
390        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
391            .await
392            .expect("Shutdown should not timeout")
393            .expect("Shutdown should succeed");
394    }
395
396    #[tokio::test]
397    async fn test_rpc_discovery() {
398        let rpc_listen_address = test_listen_address();
399        let mut rpc = RpcService::new(
400            RpcArgs {
401                rpc_listen_address,
402                ..Default::default()
403            },
404            &Registry::new(),
405        )
406        .unwrap();
407
408        rpc.add_module(Foo).unwrap();
409        rpc.add_module(Baz).unwrap();
410
411        let svc = rpc.run().await.unwrap();
412
413        let url = format!("http://{rpc_listen_address}/");
414        let client = Client::new();
415
416        let resp: Value = client
417            .post(&url)
418            .json(&json!({
419                "jsonrpc": "2.0",
420                "method": "rpc.discover",
421                "id": 1,
422            }))
423            .send()
424            .await
425            .expect("Request should succeed")
426            .json()
427            .await
428            .expect("Deserialization should succeed");
429
430        assert_eq!(resp["result"]["info"]["title"], "Sui JSON-RPC");
431        assert_eq!(
432            resp["result"]["methods"],
433            json!([
434                {
435                    "name": "test_bar",
436                    "tags": [{
437                        "name": "Test API"
438                    }],
439                    "params": [],
440                    "result": {
441                        "name": "u64",
442                        "required": true,
443                        "schema": {
444                            "type": "integer",
445                            "format": "uint64",
446                            "minimum": 0.0
447                        }
448                    }
449                },
450                {
451                    "name": "test_baz",
452                    "tags": [{
453                        "name": "Test API"
454                    }],
455                    "params": [],
456                    "result": {
457                        "name": "u64",
458                        "required": true,
459                        "schema": {
460                            "type": "integer",
461                            "format": "uint64",
462                            "minimum": 0.0
463                        }
464                    }
465                }
466            ])
467        );
468
469        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
470            .await
471            .expect("Shutdown should not timeout")
472            .expect("Shutdown should succeed");
473    }
474
475    #[tokio::test]
476    async fn test_request_metrics() {
477        let rpc_listen_address = test_listen_address();
478        let mut rpc = RpcService::new(
479            RpcArgs {
480                rpc_listen_address,
481                ..Default::default()
482            },
483            &Registry::new(),
484        )
485        .unwrap();
486
487        rpc.add_module(Foo).unwrap();
488
489        let metrics = rpc.metrics();
490        let svc = rpc.run().await.unwrap();
491
492        let url = format!("http://{rpc_listen_address}/");
493        let client = Client::new();
494
495        client
496            .post(&url)
497            .json(&json!({
498                "jsonrpc": "2.0",
499                "method": "test_bar",
500                "id": 1,
501            }))
502            .send()
503            .await
504            .expect("Request should succeed");
505
506        client
507            .post(&url)
508            .json(&json!({
509                "jsonrpc": "2.0",
510                "method": "test_baz",
511                "id": 1,
512            }))
513            .send()
514            .await
515            .expect("Request should succeed");
516
517        assert_eq!(
518            metrics
519                .requests_received
520                .with_label_values(&["test_bar"])
521                .get(),
522            1
523        );
524
525        assert_eq!(
526            metrics
527                .requests_succeeded
528                .with_label_values(&["test_bar"])
529                .get(),
530            1
531        );
532
533        assert_eq!(
534            metrics
535                .requests_received
536                .with_label_values(&["<UNKNOWN>"])
537                .get(),
538            1
539        );
540
541        assert_eq!(
542            metrics
543                .requests_succeeded
544                .with_label_values(&["<UNKNOWN>"])
545                .get(),
546            0
547        );
548
549        assert_eq!(
550            metrics
551                .requests_failed
552                .with_label_values(&["<UNKNOWN>", &format!("{METHOD_NOT_FOUND_CODE}")])
553                .get(),
554            1
555        );
556
557        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
558            .await
559            .expect("Shutdown should not timeout")
560            .expect("Shutdown should succeed");
561    }
562
563    #[tokio::test]
564    async fn test_panic_handling() {
565        let rpc_listen_address = test_listen_address();
566        let mut rpc = RpcService::new(
567            RpcArgs {
568                rpc_listen_address,
569                ..Default::default()
570            },
571            &Registry::new(),
572        )
573        .unwrap();
574
575        rpc.add_module(Panic).unwrap();
576
577        let metrics = rpc.metrics();
578        let svc = rpc.run().await.unwrap();
579
580        let url = format!("http://{rpc_listen_address}/");
581        let client = Client::new();
582
583        let resp = client
584            .post(&url)
585            .json(&json!({
586                "jsonrpc": "2.0",
587                "method": "test_panic",
588                "id": 1,
589            }))
590            .send()
591            .await
592            .expect("Request should succeed");
593
594        let body: Value = resp.json().await.expect("Response should be JSON");
595
596        // Verify the response is a JSON-RPC error
597        assert_eq!(body["jsonrpc"], "2.0");
598        assert_eq!(body["error"]["code"], INTERNAL_ERROR_CODE);
599        assert!(body["error"]["message"].as_str().unwrap().contains("Boom!"));
600
601        // Verify the panic is recorded in metrics
602        assert_eq!(metrics.requests_panicked.get(), 1);
603
604        tokio::time::timeout(Duration::from_millis(500), svc.shutdown())
605            .await
606            .expect("Shutdown should not timeout")
607            .expect("Shutdown should succeed");
608    }
609
610    // Test Helpers
611
612    #[open_rpc(namespace = "test", tag = "Test API")]
613    #[rpc(server, namespace = "test")]
614    trait FooApi {
615        #[method(name = "bar")]
616        fn bar(&self) -> RpcResult<u64>;
617    }
618
619    #[open_rpc(namespace = "test", tag = "Test API")]
620    #[rpc(server, namespace = "test")]
621    trait BarApi {
622        #[method(name = "bar")]
623        fn bar(&self) -> RpcResult<u64>;
624
625        #[method(name = "baz")]
626        fn baz(&self) -> RpcResult<u64>;
627    }
628
629    #[open_rpc(namespace = "test", tag = "Test API")]
630    #[rpc(server, namespace = "test")]
631    trait BazApi {
632        #[method(name = "baz")]
633        fn baz(&self) -> RpcResult<u64>;
634    }
635
636    #[open_rpc(namespace = "test", tag = "Test API")]
637    #[rpc(server, namespace = "test")]
638    trait PanicApi {
639        #[method(name = "panic")]
640        fn panic(&self) -> RpcResult<u64>;
641    }
642
643    struct Foo;
644    struct Bar;
645    struct Baz;
646    struct Panic;
647
648    impl FooApiServer for Foo {
649        fn bar(&self) -> RpcResult<u64> {
650            Ok(42)
651        }
652    }
653
654    impl BarApiServer for Bar {
655        fn bar(&self) -> RpcResult<u64> {
656            Ok(43)
657        }
658
659        fn baz(&self) -> RpcResult<u64> {
660            Ok(44)
661        }
662    }
663
664    impl BazApiServer for Baz {
665        fn baz(&self) -> RpcResult<u64> {
666            Ok(45)
667        }
668    }
669
670    impl PanicApiServer for Panic {
671        fn panic(&self) -> RpcResult<u64> {
672            panic!("Boom!");
673        }
674    }
675
676    impl RpcModule for Foo {
677        fn schema(&self) -> Module {
678            FooApiOpenRpc::module_doc()
679        }
680
681        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
682            self.into_rpc()
683        }
684    }
685
686    impl RpcModule for Bar {
687        fn schema(&self) -> Module {
688            BarApiOpenRpc::module_doc()
689        }
690
691        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
692            self.into_rpc()
693        }
694    }
695
696    impl RpcModule for Baz {
697        fn schema(&self) -> Module {
698            BazApiOpenRpc::module_doc()
699        }
700
701        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
702            self.into_rpc()
703        }
704    }
705
706    impl RpcModule for Panic {
707        fn schema(&self) -> Module {
708            PanicApiOpenRpc::module_doc()
709        }
710
711        fn into_impl(self) -> jsonrpsee::RpcModule<Self> {
712            self.into_rpc()
713        }
714    }
715
716    fn test_listen_address() -> SocketAddr {
717        let port = get_available_port();
718        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
719    }
720
721    async fn test_service() -> RpcService {
722        RpcService::new(
723            RpcArgs {
724                rpc_listen_address: test_listen_address(),
725                ..Default::default()
726            },
727            &Registry::new(),
728        )
729        .expect("Failed to create test JSON-RPC service")
730    }
731}