1use std::convert::Infallible;
5use std::sync::Arc;
6
7use mysten_network::callback::CallbackLayer;
8use reader::StateReader;
9use subscription::SubscriptionServiceHandle;
10use sui_types::storage::RpcStateReader;
11use sui_types::transaction_executor::TransactionExecutor;
12use tap::Pipe;
13use tonic::server::NamedService;
14use tower::Service;
15
16pub mod client;
17mod config;
18mod error;
19pub mod grpc;
20pub mod ledger_history;
21mod metrics;
22pub mod read_mask_defaults;
23mod reader;
24mod response;
25mod service;
26pub mod subscription;
27
28pub use client::Client;
29pub use config::Config;
30pub use error::{
31 CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
32};
33pub use metrics::{
34 GrpcMethodAllowlist, RpcMetrics, RpcMetricsMakeCallbackHandler,
35 grpc_method_paths_from_file_descriptor_sets,
36};
37pub use reader::TransactionNotFoundError;
38pub use sui_rpc::proto;
39
40#[derive(Clone)]
41pub struct ServerVersion {
42 pub bin: &'static str,
43 pub version: &'static str,
44}
45
46impl ServerVersion {
47 pub fn new(bin: &'static str, version: &'static str) -> Self {
48 Self { bin, version }
49 }
50}
51
52impl std::fmt::Display for ServerVersion {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.write_str(self.bin)?;
55 f.write_str("/")?;
56 f.write_str(self.version)
57 }
58}
59
60#[derive(Clone)]
61pub struct RpcService {
62 reader: StateReader,
63 executor: Option<Arc<dyn TransactionExecutor>>,
64 subscription_service_handle: Option<SubscriptionServiceHandle>,
65 chain_id: sui_types::digests::ChainIdentifier,
66 server_version: Option<ServerVersion>,
67 metrics: Option<Arc<RpcMetrics>>,
68 config: Config,
69 extra_routes: axum::Router,
70 extra_service_names: Vec<&'static str>,
71 extra_file_descriptor_sets: Vec<&'static [u8]>,
72}
73
74impl RpcService {
75 pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
76 let chain_id = reader.get_chain_identifier().unwrap();
77 Self {
78 reader: StateReader::new(reader),
79 executor: None,
80 subscription_service_handle: None,
81 chain_id,
82 server_version: None,
83 metrics: None,
84 config: Config::default(),
85 extra_routes: axum::Router::new(),
86 extra_service_names: Vec::new(),
87 extra_file_descriptor_sets: Vec::new(),
88 }
89 }
90
91 pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
92 self.server_version = Some(server_version);
93 self
94 }
95
96 pub fn with_config(&mut self, config: Config) {
97 self.config = config;
98 }
99
100 pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
101 self.executor = Some(executor);
102 }
103
104 pub fn with_subscription_service(
105 &mut self,
106 subscription_service_handle: SubscriptionServiceHandle,
107 ) {
108 self.subscription_service_handle = Some(subscription_service_handle);
109 }
110
111 pub fn with_metrics(&mut self, metrics: RpcMetrics) {
112 self.metrics = Some(Arc::new(metrics));
113 }
114
115 pub fn with_custom_service<S>(&mut self, svc: S)
116 where
117 S: Service<
118 axum::extract::Request,
119 Response: axum::response::IntoResponse,
120 Error = Infallible,
121 > + NamedService
122 + Clone
123 + Send
124 + Sync
125 + 'static,
126 S::Future: Send + 'static,
127 S::Error: Into<grpc::BoxError> + Send,
128 {
129 self.extra_service_names.push(S::NAME);
130 self.extra_routes = std::mem::take(&mut self.extra_routes)
131 .route_service(&format!("/{}/{{*rest}}", S::NAME), svc);
132 }
133
134 pub fn with_file_descriptor_set(&mut self, encoded_fds: &'static [u8]) {
135 self.extra_file_descriptor_sets.push(encoded_fds);
136 }
137
138 pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
139 self.chain_id
140 }
141
142 pub fn server_version(&self) -> Option<&ServerVersion> {
143 self.server_version.as_ref()
144 }
145
146 pub async fn into_router(mut self) -> axum::Router {
147 let metrics = self.metrics.clone();
148 let extra_routes = std::mem::take(&mut self.extra_routes);
149 let extra_service_names = std::mem::take(&mut self.extra_service_names);
150
151 let file_descriptor_sets: Vec<&[u8]> = [
156 sui_rpc::proto::google::protobuf::FILE_DESCRIPTOR_SET,
157 sui_rpc::proto::google::rpc::FILE_DESCRIPTOR_SET,
158 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
159 sui_rpc::proto::sui::rpc::v2alpha::FILE_DESCRIPTOR_SET,
160 tonic_health::pb::FILE_DESCRIPTOR_SET,
161 ]
162 .into_iter()
163 .chain(std::mem::take(&mut self.extra_file_descriptor_sets))
164 .collect();
165
166 let grpc_method_allowlist = Arc::new(
169 metrics::grpc_method_paths_from_file_descriptor_sets(&file_descriptor_sets)
170 .expect("registered FileDescriptorSet bytes must be valid protobuf"),
171 );
172
173 let router = {
174 let ledger_service =
175 sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
176 self.clone(),
177 )
178 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
179 let ledger_service_v2alpha =
180 sui_rpc::proto::sui::rpc::v2alpha::ledger_service_server::LedgerServiceServer::new(
181 self.clone(),
182 )
183 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
184 let proof_service_v2alpha =
185 sui_rpc::proto::sui::rpc::v2alpha::proof_service_server::ProofServiceServer::new(
186 self.clone(),
187 )
188 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
189 let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
190 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
191 let state_service =
192 sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
193 self.clone(),
194 )
195 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
196 let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
197 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
198 let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
199 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
200 let name_service =
201 sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
202 self.clone(),
203 )
204 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
205
206 let (health_reporter, health_service) = tonic_health::server::health_reporter();
207
208 let mut reflection_v1_builder = tonic_reflection::server::Builder::configure();
209 let mut reflection_v1alpha_builder = tonic_reflection::server::Builder::configure();
210 for fds in &file_descriptor_sets {
211 reflection_v1_builder =
212 reflection_v1_builder.register_encoded_file_descriptor_set(fds);
213 reflection_v1alpha_builder =
214 reflection_v1alpha_builder.register_encoded_file_descriptor_set(fds);
215 }
216
217 let reflection_v1 = reflection_v1_builder.build_v1().unwrap();
218 let reflection_v1alpha = reflection_v1alpha_builder.build_v1alpha().unwrap();
219
220 fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
221 S::NAME
222 }
223
224 for service_name in [
225 service_name(&ledger_service),
226 service_name(&transaction_execution_service),
227 service_name(&state_service),
228 service_name(&signature_verification_service),
229 service_name(&move_package_service),
230 service_name(&name_service),
231 service_name(&ledger_service_v2alpha),
232 service_name(&proof_service_v2alpha),
233 service_name(&reflection_v1),
234 service_name(&reflection_v1alpha),
235 ] {
236 health_reporter
237 .set_service_status(service_name, tonic_health::ServingStatus::Serving)
238 .await;
239 }
240
241 let mut services = grpc::Services::new()
242 .add_service(ledger_service)
244 .add_service(transaction_execution_service)
245 .add_service(state_service)
246 .add_service(signature_verification_service)
247 .add_service(move_package_service)
248 .add_service(name_service)
249 .add_service(ledger_service_v2alpha)
251 .add_service(proof_service_v2alpha)
252 .add_service(reflection_v1)
254 .add_service(reflection_v1alpha);
255
256 if self.subscription_service_handle.is_some() {
257 let subscription_service =
258sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
259 health_reporter
260 .set_service_status(
261 service_name(&subscription_service),
262 tonic_health::ServingStatus::Serving,
263 )
264 .await;
265
266 services = services.add_service(subscription_service);
267 }
268
269 for name in &extra_service_names {
270 health_reporter
271 .set_service_status(*name, tonic_health::ServingStatus::Serving)
272 .await;
273 }
274
275 services
276 .merge_router(extra_routes)
277 .add_service(health_service)
278 .into_router()
279 };
280
281 let health_endpoint = axum::Router::new()
282 .route("/health", axum::routing::get(service::health::health))
283 .with_state(self.clone());
284
285 router
286 .merge(health_endpoint)
287 .layer(axum::middleware::map_response_with_state(
288 self,
289 response::append_info_headers,
290 ))
291 .pipe(|router| {
292 if let Some(metrics) = metrics {
293 router.layer(CallbackLayer::new(
294 metrics::RpcMetricsMakeCallbackHandler::with_grpc_method_allowlist(
295 metrics,
296 grpc_method_allowlist,
297 ),
298 ))
299 } else {
300 router
301 }
302 })
303 }
304
305 pub async fn start_service(self, socket_address: std::net::SocketAddr) {
306 let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
307 axum::serve(listener, self.into_router().await)
308 .await
309 .unwrap();
310 }
311}
312
313#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
314#[serde(rename_all = "lowercase")]
315pub enum Direction {
316 Ascending,
317 Descending,
318}
319
320impl Direction {
321 pub fn is_descending(self) -> bool {
322 matches!(self, Self::Descending)
323 }
324}