sui_rpc_api/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::convert::Infallible;
5use std::sync::Arc;
6
7use mysten_network::callback::CallbackLayer;
8use reader::StateReader;
9use subscription::SubscriptionServiceHandle;
10use sui_types::storage::RpcStateReader;
11use sui_types::transaction_executor::TransactionExecutor;
12use tap::Pipe;
13use tonic::server::NamedService;
14use tower::Service;
15
16pub mod client;
17mod config;
18mod error;
19pub mod grpc;
20pub mod ledger_history;
21mod metrics;
22pub mod read_mask_defaults;
23mod reader;
24mod response;
25mod service;
26pub mod subscription;
27
28pub use client::Client;
29pub use config::Config;
30pub use error::{
31    CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
32};
33pub use metrics::{
34    GrpcMethodAllowlist, RpcMetrics, RpcMetricsMakeCallbackHandler,
35    grpc_method_paths_from_file_descriptor_sets,
36};
37pub use reader::TransactionNotFoundError;
38pub use sui_rpc::proto;
39
40#[derive(Clone)]
41pub struct ServerVersion {
42    pub bin: &'static str,
43    pub version: &'static str,
44}
45
46impl ServerVersion {
47    pub fn new(bin: &'static str, version: &'static str) -> Self {
48        Self { bin, version }
49    }
50}
51
52impl std::fmt::Display for ServerVersion {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.write_str(self.bin)?;
55        f.write_str("/")?;
56        f.write_str(self.version)
57    }
58}
59
60#[derive(Clone)]
61pub struct RpcService {
62    reader: StateReader,
63    executor: Option<Arc<dyn TransactionExecutor>>,
64    subscription_service_handle: Option<SubscriptionServiceHandle>,
65    chain_id: sui_types::digests::ChainIdentifier,
66    server_version: Option<ServerVersion>,
67    metrics: Option<Arc<RpcMetrics>>,
68    config: Config,
69    extra_routes: axum::Router,
70    extra_service_names: Vec<&'static str>,
71    extra_file_descriptor_sets: Vec<&'static [u8]>,
72}
73
74impl RpcService {
75    pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
76        let chain_id = reader.get_chain_identifier().unwrap();
77        Self {
78            reader: StateReader::new(reader),
79            executor: None,
80            subscription_service_handle: None,
81            chain_id,
82            server_version: None,
83            metrics: None,
84            config: Config::default(),
85            extra_routes: axum::Router::new(),
86            extra_service_names: Vec::new(),
87            extra_file_descriptor_sets: Vec::new(),
88        }
89    }
90
91    pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
92        self.server_version = Some(server_version);
93        self
94    }
95
96    pub fn with_config(&mut self, config: Config) {
97        self.config = config;
98    }
99
100    pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
101        self.executor = Some(executor);
102    }
103
104    pub fn with_subscription_service(
105        &mut self,
106        subscription_service_handle: SubscriptionServiceHandle,
107    ) {
108        self.subscription_service_handle = Some(subscription_service_handle);
109    }
110
111    pub fn with_metrics(&mut self, metrics: RpcMetrics) {
112        self.metrics = Some(Arc::new(metrics));
113    }
114
115    pub fn with_custom_service<S>(&mut self, svc: S)
116    where
117        S: Service<
118                axum::extract::Request,
119                Response: axum::response::IntoResponse,
120                Error = Infallible,
121            > + NamedService
122            + Clone
123            + Send
124            + Sync
125            + 'static,
126        S::Future: Send + 'static,
127        S::Error: Into<grpc::BoxError> + Send,
128    {
129        self.extra_service_names.push(S::NAME);
130        self.extra_routes = std::mem::take(&mut self.extra_routes)
131            .route_service(&format!("/{}/{{*rest}}", S::NAME), svc);
132    }
133
134    pub fn with_file_descriptor_set(&mut self, encoded_fds: &'static [u8]) {
135        self.extra_file_descriptor_sets.push(encoded_fds);
136    }
137
138    pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
139        self.chain_id
140    }
141
142    pub fn server_version(&self) -> Option<&ServerVersion> {
143        self.server_version.as_ref()
144    }
145
146    pub async fn into_router(mut self) -> axum::Router {
147        let metrics = self.metrics.clone();
148        let extra_routes = std::mem::take(&mut self.extra_routes);
149        let extra_service_names = std::mem::take(&mut self.extra_service_names);
150
151        // Single source of truth for every encoded FileDescriptorSet that
152        // backs a gRPC service mounted below. Consumed by both the
153        // reflection services and the metrics allowlist so they cannot drift
154        // out of sync.
155        let file_descriptor_sets: Vec<&[u8]> = [
156            crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
157            crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
158            sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
159            sui_rpc::proto::sui::rpc::v2alpha::FILE_DESCRIPTOR_SET,
160            tonic_health::pb::FILE_DESCRIPTOR_SET,
161        ]
162        .into_iter()
163        .chain(std::mem::take(&mut self.extra_file_descriptor_sets))
164        .collect();
165
166        // Allowlist of `/Service/Method` paths used by the metrics middleware
167        // to bound prometheus label cardinality.
168        let grpc_method_allowlist = Arc::new(
169            metrics::grpc_method_paths_from_file_descriptor_sets(&file_descriptor_sets)
170                .expect("registered FileDescriptorSet bytes must be valid protobuf"),
171        );
172
173        let router = {
174            let ledger_service =
175                sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
176                    self.clone(),
177                )
178                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
179            let ledger_service_v2alpha =
180                sui_rpc::proto::sui::rpc::v2alpha::ledger_service_server::LedgerServiceServer::new(
181                    self.clone(),
182                )
183                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
184            let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
185                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
186            let state_service =
187                sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
188                    self.clone(),
189                )
190                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
191            let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
192                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
193            let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
194                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
195            let name_service =
196                sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
197                    self.clone(),
198                )
199                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
200
201            let event_service_alpha =
202                crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
203                    self.clone(),
204                );
205            let proof_service_alpha =
206                crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
207                    crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
208                );
209
210            let (health_reporter, health_service) = tonic_health::server::health_reporter();
211
212            let mut reflection_v1_builder = tonic_reflection::server::Builder::configure();
213            let mut reflection_v1alpha_builder = tonic_reflection::server::Builder::configure();
214            for fds in &file_descriptor_sets {
215                reflection_v1_builder =
216                    reflection_v1_builder.register_encoded_file_descriptor_set(fds);
217                reflection_v1alpha_builder =
218                    reflection_v1alpha_builder.register_encoded_file_descriptor_set(fds);
219            }
220
221            let reflection_v1 = reflection_v1_builder.build_v1().unwrap();
222            let reflection_v1alpha = reflection_v1alpha_builder.build_v1alpha().unwrap();
223
224            fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
225                S::NAME
226            }
227
228            for service_name in [
229                service_name(&ledger_service),
230                service_name(&transaction_execution_service),
231                service_name(&state_service),
232                service_name(&signature_verification_service),
233                service_name(&move_package_service),
234                service_name(&name_service),
235                service_name(&ledger_service_v2alpha),
236                service_name(&event_service_alpha),
237                service_name(&proof_service_alpha),
238                service_name(&reflection_v1),
239                service_name(&reflection_v1alpha),
240            ] {
241                health_reporter
242                    .set_service_status(service_name, tonic_health::ServingStatus::Serving)
243                    .await;
244            }
245
246            let mut services = grpc::Services::new()
247                // V2
248                .add_service(ledger_service)
249                .add_service(transaction_execution_service)
250                .add_service(state_service)
251                .add_service(signature_verification_service)
252                .add_service(move_package_service)
253                .add_service(name_service)
254                // V2alpha
255                .add_service(ledger_service_v2alpha)
256                // alpha
257                .add_service(event_service_alpha)
258                .add_service(proof_service_alpha)
259                // Reflection
260                .add_service(reflection_v1)
261                .add_service(reflection_v1alpha);
262
263            if self.subscription_service_handle.is_some() {
264                let subscription_service =
265sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
266                health_reporter
267                    .set_service_status(
268                        service_name(&subscription_service),
269                        tonic_health::ServingStatus::Serving,
270                    )
271                    .await;
272
273                services = services.add_service(subscription_service);
274            }
275
276            for name in &extra_service_names {
277                health_reporter
278                    .set_service_status(*name, tonic_health::ServingStatus::Serving)
279                    .await;
280            }
281
282            services
283                .merge_router(extra_routes)
284                .add_service(health_service)
285                .into_router()
286        };
287
288        let health_endpoint = axum::Router::new()
289            .route("/health", axum::routing::get(service::health::health))
290            .with_state(self.clone());
291
292        router
293            .merge(health_endpoint)
294            .layer(axum::middleware::map_response_with_state(
295                self,
296                response::append_info_headers,
297            ))
298            .pipe(|router| {
299                if let Some(metrics) = metrics {
300                    router.layer(CallbackLayer::new(
301                        metrics::RpcMetricsMakeCallbackHandler::with_grpc_method_allowlist(
302                            metrics,
303                            grpc_method_allowlist,
304                        ),
305                    ))
306                } else {
307                    router
308                }
309            })
310    }
311
312    pub async fn start_service(self, socket_address: std::net::SocketAddr) {
313        let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
314        axum::serve(listener, self.into_router().await)
315            .await
316            .unwrap();
317    }
318}
319
320#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
321#[serde(rename_all = "lowercase")]
322pub enum Direction {
323    Ascending,
324    Descending,
325}
326
327impl Direction {
328    pub fn is_descending(self) -> bool {
329        matches!(self, Self::Descending)
330    }
331}