1use std::convert::Infallible;
5use std::sync::Arc;
6
7use mysten_network::callback::CallbackLayer;
8use reader::StateReader;
9use subscription::SubscriptionServiceHandle;
10use sui_types::storage::RpcStateReader;
11use sui_types::transaction_executor::TransactionExecutor;
12use tap::Pipe;
13use tonic::server::NamedService;
14use tower::Service;
15
16pub mod client;
17mod config;
18mod error;
19pub mod grpc;
20mod metrics;
21mod reader;
22mod response;
23mod service;
24pub mod subscription;
25
26pub use client::Client;
27pub use config::Config;
28pub use error::{
29 CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
30};
31pub use metrics::{RpcMetrics, RpcMetricsMakeCallbackHandler};
32pub use reader::TransactionNotFoundError;
33pub use sui_rpc::proto;
34
35#[derive(Clone)]
36pub struct ServerVersion {
37 pub bin: &'static str,
38 pub version: &'static str,
39}
40
41impl ServerVersion {
42 pub fn new(bin: &'static str, version: &'static str) -> Self {
43 Self { bin, version }
44 }
45}
46
47impl std::fmt::Display for ServerVersion {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.write_str(self.bin)?;
50 f.write_str("/")?;
51 f.write_str(self.version)
52 }
53}
54
55#[derive(Clone)]
56pub struct RpcService {
57 reader: StateReader,
58 executor: Option<Arc<dyn TransactionExecutor>>,
59 subscription_service_handle: Option<SubscriptionServiceHandle>,
60 chain_id: sui_types::digests::ChainIdentifier,
61 server_version: Option<ServerVersion>,
62 metrics: Option<Arc<RpcMetrics>>,
63 config: Config,
64 extra_routes: axum::Router,
65 extra_service_names: Vec<&'static str>,
66 extra_file_descriptor_sets: Vec<&'static [u8]>,
67}
68
69impl RpcService {
70 pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
71 let chain_id = reader.get_chain_identifier().unwrap();
72 Self {
73 reader: StateReader::new(reader),
74 executor: None,
75 subscription_service_handle: None,
76 chain_id,
77 server_version: None,
78 metrics: None,
79 config: Config::default(),
80 extra_routes: axum::Router::new(),
81 extra_service_names: Vec::new(),
82 extra_file_descriptor_sets: Vec::new(),
83 }
84 }
85
86 pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
87 self.server_version = Some(server_version);
88 self
89 }
90
91 pub fn with_config(&mut self, config: Config) {
92 self.config = config;
93 }
94
95 pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
96 self.executor = Some(executor);
97 }
98
99 pub fn with_subscription_service(
100 &mut self,
101 subscription_service_handle: SubscriptionServiceHandle,
102 ) {
103 self.subscription_service_handle = Some(subscription_service_handle);
104 }
105
106 pub fn with_metrics(&mut self, metrics: RpcMetrics) {
107 self.metrics = Some(Arc::new(metrics));
108 }
109
110 pub fn with_custom_service<S>(&mut self, svc: S)
111 where
112 S: Service<
113 axum::extract::Request,
114 Response: axum::response::IntoResponse,
115 Error = Infallible,
116 > + NamedService
117 + Clone
118 + Send
119 + Sync
120 + 'static,
121 S::Future: Send + 'static,
122 S::Error: Into<grpc::BoxError> + Send,
123 {
124 self.extra_service_names.push(S::NAME);
125 self.extra_routes = std::mem::take(&mut self.extra_routes)
126 .route_service(&format!("/{}/{{*rest}}", S::NAME), svc);
127 }
128
129 pub fn with_file_descriptor_set(&mut self, encoded_fds: &'static [u8]) {
130 self.extra_file_descriptor_sets.push(encoded_fds);
131 }
132
133 pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
134 self.chain_id
135 }
136
137 pub fn server_version(&self) -> Option<&ServerVersion> {
138 self.server_version.as_ref()
139 }
140
141 pub async fn into_router(mut self) -> axum::Router {
142 let metrics = self.metrics.clone();
143 let extra_routes = std::mem::take(&mut self.extra_routes);
144 let extra_service_names = std::mem::take(&mut self.extra_service_names);
145 let extra_file_descriptor_sets = std::mem::take(&mut self.extra_file_descriptor_sets);
146
147 let router = {
148 let ledger_service =
149 sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
150 self.clone(),
151 )
152 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
153 let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
154 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
155 let state_service =
156 sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
157 self.clone(),
158 )
159 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
160 let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
161 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
162 let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
163 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
164 let name_service =
165 sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
166 self.clone(),
167 )
168 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
169
170 let event_service_alpha =
171 crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
172 self.clone(),
173 );
174 let proof_service_alpha =
175 crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
176 crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
177 );
178
179 let (health_reporter, health_service) = tonic_health::server::health_reporter();
180
181 let mut reflection_v1_builder = tonic_reflection::server::Builder::configure()
182 .register_encoded_file_descriptor_set(
183 crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
184 )
185 .register_encoded_file_descriptor_set(
186 crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
187 )
188 .register_encoded_file_descriptor_set(
189 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
190 )
191 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET);
192
193 let mut reflection_v1alpha_builder = tonic_reflection::server::Builder::configure()
194 .register_encoded_file_descriptor_set(
195 crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
196 )
197 .register_encoded_file_descriptor_set(
198 crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
199 )
200 .register_encoded_file_descriptor_set(
201 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
202 )
203 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET);
204
205 for fds in &extra_file_descriptor_sets {
206 reflection_v1_builder =
207 reflection_v1_builder.register_encoded_file_descriptor_set(fds);
208 reflection_v1alpha_builder =
209 reflection_v1alpha_builder.register_encoded_file_descriptor_set(fds);
210 }
211
212 let reflection_v1 = reflection_v1_builder.build_v1().unwrap();
213 let reflection_v1alpha = reflection_v1alpha_builder.build_v1alpha().unwrap();
214
215 fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
216 S::NAME
217 }
218
219 for service_name in [
220 service_name(&ledger_service),
221 service_name(&transaction_execution_service),
222 service_name(&state_service),
223 service_name(&signature_verification_service),
224 service_name(&move_package_service),
225 service_name(&name_service),
226 service_name(&event_service_alpha),
227 service_name(&proof_service_alpha),
228 service_name(&reflection_v1),
229 service_name(&reflection_v1alpha),
230 ] {
231 health_reporter
232 .set_service_status(service_name, tonic_health::ServingStatus::Serving)
233 .await;
234 }
235
236 let mut services = grpc::Services::new()
237 .add_service(ledger_service)
239 .add_service(transaction_execution_service)
240 .add_service(state_service)
241 .add_service(signature_verification_service)
242 .add_service(move_package_service)
243 .add_service(name_service)
244 .add_service(event_service_alpha)
246 .add_service(proof_service_alpha)
247 .add_service(reflection_v1)
249 .add_service(reflection_v1alpha);
250
251 if self.subscription_service_handle.is_some() {
252 let subscription_service =
253sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
254 health_reporter
255 .set_service_status(
256 service_name(&subscription_service),
257 tonic_health::ServingStatus::Serving,
258 )
259 .await;
260
261 services = services.add_service(subscription_service);
262 }
263
264 for name in &extra_service_names {
265 health_reporter
266 .set_service_status(*name, tonic_health::ServingStatus::Serving)
267 .await;
268 }
269
270 services
271 .merge_router(extra_routes)
272 .add_service(health_service)
273 .into_router()
274 };
275
276 let health_endpoint = axum::Router::new()
277 .route("/health", axum::routing::get(service::health::health))
278 .with_state(self.clone());
279
280 router
281 .merge(health_endpoint)
282 .layer(axum::middleware::map_response_with_state(
283 self,
284 response::append_info_headers,
285 ))
286 .pipe(|router| {
287 if let Some(metrics) = metrics {
288 router.layer(CallbackLayer::new(
289 metrics::RpcMetricsMakeCallbackHandler::new(metrics),
290 ))
291 } else {
292 router
293 }
294 })
295 }
296
297 pub async fn start_service(self, socket_address: std::net::SocketAddr) {
298 let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
299 axum::serve(listener, self.into_router().await)
300 .await
301 .unwrap();
302 }
303}
304
305#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
306#[serde(rename_all = "lowercase")]
307pub enum Direction {
308 Ascending,
309 Descending,
310}
311
312impl Direction {
313 pub fn is_descending(self) -> bool {
314 matches!(self, Self::Descending)
315 }
316}