sui_rpc_api/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use mysten_network::callback::CallbackLayer;
5use reader::StateReader;
6use std::sync::Arc;
7use subscription::SubscriptionServiceHandle;
8use sui_types::storage::RpcStateReader;
9use sui_types::transaction_executor::TransactionExecutor;
10use tap::Pipe;
11
12pub mod client;
13mod config;
14mod error;
15pub mod grpc;
16mod metrics;
17mod reader;
18mod response;
19mod service;
20pub mod subscription;
21
22pub use client::Client;
23pub use config::Config;
24pub use error::{
25    CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
26};
27pub use metrics::{RpcMetrics, RpcMetricsMakeCallbackHandler};
28pub use reader::TransactionNotFoundError;
29pub use sui_rpc::proto;
30
31#[derive(Clone)]
32pub struct ServerVersion {
33    pub bin: &'static str,
34    pub version: &'static str,
35}
36
37impl ServerVersion {
38    pub fn new(bin: &'static str, version: &'static str) -> Self {
39        Self { bin, version }
40    }
41}
42
43impl std::fmt::Display for ServerVersion {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(self.bin)?;
46        f.write_str("/")?;
47        f.write_str(self.version)
48    }
49}
50
51#[derive(Clone)]
52pub struct RpcService {
53    reader: StateReader,
54    executor: Option<Arc<dyn TransactionExecutor>>,
55    subscription_service_handle: Option<SubscriptionServiceHandle>,
56    chain_id: sui_types::digests::ChainIdentifier,
57    server_version: Option<ServerVersion>,
58    metrics: Option<Arc<RpcMetrics>>,
59    config: Config,
60}
61
62impl RpcService {
63    pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
64        let chain_id = reader.get_chain_identifier().unwrap();
65        Self {
66            reader: StateReader::new(reader),
67            executor: None,
68            subscription_service_handle: None,
69            chain_id,
70            server_version: None,
71            metrics: None,
72            config: Config::default(),
73        }
74    }
75
76    pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
77        self.server_version = Some(server_version);
78        self
79    }
80
81    pub fn with_config(&mut self, config: Config) {
82        self.config = config;
83    }
84
85    pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
86        self.executor = Some(executor);
87    }
88
89    pub fn with_subscription_service(
90        &mut self,
91        subscription_service_handle: SubscriptionServiceHandle,
92    ) {
93        self.subscription_service_handle = Some(subscription_service_handle);
94    }
95
96    pub fn with_metrics(&mut self, metrics: RpcMetrics) {
97        self.metrics = Some(Arc::new(metrics));
98    }
99
100    pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
101        self.chain_id
102    }
103
104    pub fn server_version(&self) -> Option<&ServerVersion> {
105        self.server_version.as_ref()
106    }
107
108    pub async fn into_router(self) -> axum::Router {
109        let metrics = self.metrics.clone();
110
111        let router = {
112            let ledger_service =
113                sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
114                    self.clone(),
115                )
116                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
117            let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
118                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
119            let state_service =
120                sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
121                    self.clone(),
122                )
123                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
124            let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
125                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
126            let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
127                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
128            let name_service =
129                sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
130                    self.clone(),
131                )
132                .send_compressed(tonic::codec::CompressionEncoding::Zstd);
133
134            let event_service_alpha =
135                crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
136                    self.clone(),
137                );
138            let proof_service_alpha =
139                crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
140                    crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
141                );
142
143            let (health_reporter, health_service) = tonic_health::server::health_reporter();
144
145            let reflection_v1 = tonic_reflection::server::Builder::configure()
146                .register_encoded_file_descriptor_set(
147                    crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
148                )
149                .register_encoded_file_descriptor_set(
150                    crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
151                )
152                .register_encoded_file_descriptor_set(
153                    sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
154                )
155                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
156                .build_v1()
157                .unwrap();
158
159            let reflection_v1alpha = tonic_reflection::server::Builder::configure()
160                .register_encoded_file_descriptor_set(
161                    crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
162                )
163                .register_encoded_file_descriptor_set(
164                    crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
165                )
166                .register_encoded_file_descriptor_set(
167                    sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
168                )
169                .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
170                .build_v1alpha()
171                .unwrap();
172
173            fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
174                S::NAME
175            }
176
177            for service_name in [
178                service_name(&ledger_service),
179                service_name(&transaction_execution_service),
180                service_name(&state_service),
181                service_name(&signature_verification_service),
182                service_name(&move_package_service),
183                service_name(&name_service),
184                service_name(&event_service_alpha),
185                service_name(&proof_service_alpha),
186                service_name(&reflection_v1),
187                service_name(&reflection_v1alpha),
188            ] {
189                health_reporter
190                    .set_service_status(service_name, tonic_health::ServingStatus::Serving)
191                    .await;
192            }
193
194            let mut services = grpc::Services::new()
195                // V2
196                .add_service(ledger_service)
197                .add_service(transaction_execution_service)
198                .add_service(state_service)
199                .add_service(signature_verification_service)
200                .add_service(move_package_service)
201                .add_service(name_service)
202                // alpha
203                .add_service(event_service_alpha)
204                .add_service(proof_service_alpha)
205                // Reflection
206                .add_service(reflection_v1)
207                .add_service(reflection_v1alpha);
208
209            if self.subscription_service_handle.is_some() {
210                let subscription_service =
211sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
212                health_reporter
213                    .set_service_status(
214                        service_name(&subscription_service),
215                        tonic_health::ServingStatus::Serving,
216                    )
217                    .await;
218
219                services = services.add_service(subscription_service);
220            }
221
222            services.add_service(health_service).into_router()
223        };
224
225        let health_endpoint = axum::Router::new()
226            .route("/health", axum::routing::get(service::health::health))
227            .with_state(self.clone());
228
229        router
230            .merge(health_endpoint)
231            .layer(axum::middleware::map_response_with_state(
232                self,
233                response::append_info_headers,
234            ))
235            .pipe(|router| {
236                if let Some(metrics) = metrics {
237                    router.layer(CallbackLayer::new(
238                        metrics::RpcMetricsMakeCallbackHandler::new(metrics),
239                    ))
240                } else {
241                    router
242                }
243            })
244    }
245
246    pub async fn start_service(self, socket_address: std::net::SocketAddr) {
247        let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
248        axum::serve(listener, self.into_router().await)
249            .await
250            .unwrap();
251    }
252}
253
254#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
255#[serde(rename_all = "lowercase")]
256pub enum Direction {
257    Ascending,
258    Descending,
259}
260
261impl Direction {
262    pub fn is_descending(self) -> bool {
263        matches!(self, Self::Descending)
264    }
265}