sui_rpc_api/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::convert::Infallible;
5use std::sync::Arc;
6
7use mysten_network::callback::CallbackLayer;
8use reader::StateReader;
9use subscription::SubscriptionServiceHandle;
10use sui_types::storage::RpcStateReader;
11use sui_types::transaction_executor::TransactionExecutor;
12use tap::Pipe;
13use tonic::server::NamedService;
14use tower::Service;
15
16pub mod client;
17mod config;
18mod error;
19pub mod grpc;
20mod metrics;
21mod reader;
22mod response;
23mod service;
24pub mod subscription;
25
26pub use client::Client;
27pub use config::Config;
28pub use error::{
29    CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
30};
31pub use metrics::{RpcMetrics, RpcMetricsMakeCallbackHandler};
32pub use reader::TransactionNotFoundError;
33pub use sui_rpc::proto;
34
35#[derive(Clone)]
36pub struct ServerVersion {
37    pub bin: &'static str,
38    pub version: &'static str,
39}
40
41impl ServerVersion {
42    pub fn new(bin: &'static str, version: &'static str) -> Self {
43        Self { bin, version }
44    }
45}
46
47impl std::fmt::Display for ServerVersion {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.write_str(self.bin)?;
50        f.write_str("/")?;
51        f.write_str(self.version)
52    }
53}
54
55#[derive(Clone)]
56pub struct RpcService {
57    reader: StateReader,
58    executor: Option<Arc<dyn TransactionExecutor>>,
59    subscription_service_handle: Option<SubscriptionServiceHandle>,
60    chain_id: sui_types::digests::ChainIdentifier,
61    server_version: Option<ServerVersion>,
62    metrics: Option<Arc<RpcMetrics>>,
63    config: Config,
64    extra_routes: axum::Router,
65    extra_service_names: Vec<&'static str>,
66    extra_file_descriptor_sets: Vec<&'static [u8]>,
67}
68
69impl RpcService {
70    pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
71        let chain_id = reader.get_chain_identifier().unwrap();
72        Self {
73            reader: StateReader::new(reader),
74            executor: None,
75            subscription_service_handle: None,
76            chain_id,
77            server_version: None,
78            metrics: None,
79            config: Config::default(),
80            extra_routes: axum::Router::new(),
81            extra_service_names: Vec::new(),
82            extra_file_descriptor_sets: Vec::new(),
83        }
84    }
85
86    pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
87        self.server_version = Some(server_version);
88        self
89    }
90
91    pub fn with_config(&mut self, config: Config) {
92        self.config = config;
93    }
94
95    pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
96        self.executor = Some(executor);
97    }
98
99    pub fn with_subscription_service(
100        &mut self,
101        subscription_service_handle: SubscriptionServiceHandle,
102    ) {
103        self.subscription_service_handle = Some(subscription_service_handle);
104    }
105
106    pub fn with_metrics(&mut self, metrics: RpcMetrics) {
107        self.metrics = Some(Arc::new(metrics));
108    }
109
110    pub fn with_custom_service<S>(&mut self, svc: S)
111    where
112        S: Service<
113                axum::extract::Request,
114                Response: axum::response::IntoResponse,
115                Error = Infallible,
116            > + NamedService
117            + Clone
118            + Send
119            + Sync
120            + 'static,
121        S::Future: Send + 'static,
122        S::Error: Into<grpc::BoxError> + Send,
123    {
124        self.extra_service_names.push(S::NAME);
125        self.extra_routes = std::mem::take(&mut self.extra_routes)
126            .route_service(&format!("/{}/{{*rest}}", S::NAME), svc);
127    }
128
129    pub fn with_file_descriptor_set(&mut self, encoded_fds: &'static [u8]) {
130        self.extra_file_descriptor_sets.push(encoded_fds);
131    }
132
133    pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
134        self.chain_id
135    }
136
137    pub fn server_version(&self) -> Option<&ServerVersion> {
138        self.server_version.as_ref()
139    }
140
141    pub async fn into_router(mut self) -> axum::Router {
142        let metrics = self.metrics.clone();
143        let extra_routes = std::mem::take(&mut self.extra_routes);
144        let extra_service_names = std::mem::take(&mut self.extra_service_names);
145        let extra_file_descriptor_sets = std::mem::take(&mut self.extra_file_descriptor_sets);
146
147        let router = {
148            let ledger_service =
149                sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
150                    self.clone(),
151                )
152                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
153            let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
154                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
155            let state_service =
156                sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
157                    self.clone(),
158                )
159                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
160            let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
161                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
162            let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
163                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
164            let name_service =
165                sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
166                    self.clone(),
167                )
168                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
169
170            let event_service_alpha =
171                crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
172                    self.clone(),
173                );
174            let proof_service_alpha =
175                crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
176                    crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
177                );
178
179            let (health_reporter, health_service) = tonic_health::server::health_reporter();
180
181            let mut reflection_v1_builder = tonic_reflection::server::Builder::configure()
182                .register_encoded_file_descriptor_set(
183                    crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
184                )
185                .register_encoded_file_descriptor_set(
186                    crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
187                )
188                .register_encoded_file_descriptor_set(
189                    sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
190                )
191                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET);
192
193            let mut reflection_v1alpha_builder = tonic_reflection::server::Builder::configure()
194                .register_encoded_file_descriptor_set(
195                    crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
196                )
197                .register_encoded_file_descriptor_set(
198                    crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
199                )
200                .register_encoded_file_descriptor_set(
201                    sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
202                )
203                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET);
204
205            for fds in &extra_file_descriptor_sets {
206                reflection_v1_builder =
207                    reflection_v1_builder.register_encoded_file_descriptor_set(fds);
208                reflection_v1alpha_builder =
209                    reflection_v1alpha_builder.register_encoded_file_descriptor_set(fds);
210            }
211
212            let reflection_v1 = reflection_v1_builder.build_v1().unwrap();
213            let reflection_v1alpha = reflection_v1alpha_builder.build_v1alpha().unwrap();
214
215            fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
216                S::NAME
217            }
218
219            for service_name in [
220                service_name(&ledger_service),
221                service_name(&transaction_execution_service),
222                service_name(&state_service),
223                service_name(&signature_verification_service),
224                service_name(&move_package_service),
225                service_name(&name_service),
226                service_name(&event_service_alpha),
227                service_name(&proof_service_alpha),
228                service_name(&reflection_v1),
229                service_name(&reflection_v1alpha),
230            ] {
231                health_reporter
232                    .set_service_status(service_name, tonic_health::ServingStatus::Serving)
233                    .await;
234            }
235
236            let mut services = grpc::Services::new()
237                // V2
238                .add_service(ledger_service)
239                .add_service(transaction_execution_service)
240                .add_service(state_service)
241                .add_service(signature_verification_service)
242                .add_service(move_package_service)
243                .add_service(name_service)
244                // alpha
245                .add_service(event_service_alpha)
246                .add_service(proof_service_alpha)
247                // Reflection
248                .add_service(reflection_v1)
249                .add_service(reflection_v1alpha);
250
251            if self.subscription_service_handle.is_some() {
252                let subscription_service =
253sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
254                health_reporter
255                    .set_service_status(
256                        service_name(&subscription_service),
257                        tonic_health::ServingStatus::Serving,
258                    )
259                    .await;
260
261                services = services.add_service(subscription_service);
262            }
263
264            for name in &extra_service_names {
265                health_reporter
266                    .set_service_status(*name, tonic_health::ServingStatus::Serving)
267                    .await;
268            }
269
270            services
271                .merge_router(extra_routes)
272                .add_service(health_service)
273                .into_router()
274        };
275
276        let health_endpoint = axum::Router::new()
277            .route("/health", axum::routing::get(service::health::health))
278            .with_state(self.clone());
279
280        router
281            .merge(health_endpoint)
282            .layer(axum::middleware::map_response_with_state(
283                self,
284                response::append_info_headers,
285            ))
286            .pipe(|router| {
287                if let Some(metrics) = metrics {
288                    router.layer(CallbackLayer::new(
289                        metrics::RpcMetricsMakeCallbackHandler::new(metrics),
290                    ))
291                } else {
292                    router
293                }
294            })
295    }
296
297    pub async fn start_service(self, socket_address: std::net::SocketAddr) {
298        let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
299        axum::serve(listener, self.into_router().await)
300            .await
301            .unwrap();
302    }
303}
304
305#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
306#[serde(rename_all = "lowercase")]
307pub enum Direction {
308    Ascending,
309    Descending,
310}
311
312impl Direction {
313    pub fn is_descending(self) -> bool {
314        matches!(self, Self::Descending)
315    }
316}