1use std::convert::Infallible;
5use std::sync::Arc;
6
7use mysten_network::callback::CallbackLayer;
8use reader::StateReader;
9use subscription::SubscriptionServiceHandle;
10use sui_types::storage::RpcStateReader;
11use sui_types::transaction_executor::TransactionExecutor;
12use tap::Pipe;
13use tonic::server::NamedService;
14use tower::Service;
15
16pub mod client;
17mod config;
18mod error;
19pub mod grpc;
20pub mod ledger_history;
21mod metrics;
22pub mod read_mask_defaults;
23mod reader;
24mod response;
25mod service;
26pub mod subscription;
27
28pub use client::Client;
29pub use config::Config;
30pub use error::{
31 CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
32};
33pub use metrics::{
34 GrpcMethodAllowlist, RpcMetrics, RpcMetricsMakeCallbackHandler,
35 grpc_method_paths_from_file_descriptor_sets,
36};
37pub use reader::TransactionNotFoundError;
38pub use sui_rpc::proto;
39
40#[derive(Clone)]
41pub struct ServerVersion {
42 pub bin: &'static str,
43 pub version: &'static str,
44}
45
46impl ServerVersion {
47 pub fn new(bin: &'static str, version: &'static str) -> Self {
48 Self { bin, version }
49 }
50}
51
52impl std::fmt::Display for ServerVersion {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.write_str(self.bin)?;
55 f.write_str("/")?;
56 f.write_str(self.version)
57 }
58}
59
60#[derive(Clone)]
61pub struct RpcService {
62 reader: StateReader,
63 executor: Option<Arc<dyn TransactionExecutor>>,
64 subscription_service_handle: Option<SubscriptionServiceHandle>,
65 chain_id: sui_types::digests::ChainIdentifier,
66 server_version: Option<ServerVersion>,
67 metrics: Option<Arc<RpcMetrics>>,
68 config: Config,
69 extra_routes: axum::Router,
70 extra_service_names: Vec<&'static str>,
71 extra_file_descriptor_sets: Vec<&'static [u8]>,
72}
73
74impl RpcService {
75 pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
76 let chain_id = reader.get_chain_identifier().unwrap();
77 Self {
78 reader: StateReader::new(reader),
79 executor: None,
80 subscription_service_handle: None,
81 chain_id,
82 server_version: None,
83 metrics: None,
84 config: Config::default(),
85 extra_routes: axum::Router::new(),
86 extra_service_names: Vec::new(),
87 extra_file_descriptor_sets: Vec::new(),
88 }
89 }
90
91 pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
92 self.server_version = Some(server_version);
93 self
94 }
95
96 pub fn with_config(&mut self, config: Config) {
97 self.config = config;
98 }
99
100 pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
101 self.executor = Some(executor);
102 }
103
104 pub fn with_subscription_service(
105 &mut self,
106 subscription_service_handle: SubscriptionServiceHandle,
107 ) {
108 self.subscription_service_handle = Some(subscription_service_handle);
109 }
110
111 pub fn with_metrics(&mut self, metrics: RpcMetrics) {
112 self.metrics = Some(Arc::new(metrics));
113 }
114
115 pub fn with_custom_service<S>(&mut self, svc: S)
116 where
117 S: Service<
118 axum::extract::Request,
119 Response: axum::response::IntoResponse,
120 Error = Infallible,
121 > + NamedService
122 + Clone
123 + Send
124 + Sync
125 + 'static,
126 S::Future: Send + 'static,
127 S::Error: Into<grpc::BoxError> + Send,
128 {
129 self.extra_service_names.push(S::NAME);
130 self.extra_routes = std::mem::take(&mut self.extra_routes)
131 .route_service(&format!("/{}/{{*rest}}", S::NAME), svc);
132 }
133
134 pub fn with_file_descriptor_set(&mut self, encoded_fds: &'static [u8]) {
135 self.extra_file_descriptor_sets.push(encoded_fds);
136 }
137
138 pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
139 self.chain_id
140 }
141
142 pub fn server_version(&self) -> Option<&ServerVersion> {
143 self.server_version.as_ref()
144 }
145
146 pub async fn into_router(mut self) -> axum::Router {
147 let metrics = self.metrics.clone();
148 let extra_routes = std::mem::take(&mut self.extra_routes);
149 let extra_service_names = std::mem::take(&mut self.extra_service_names);
150
151 let file_descriptor_sets: Vec<&[u8]> = [
156 crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
157 crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
158 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
159 sui_rpc::proto::sui::rpc::v2alpha::FILE_DESCRIPTOR_SET,
160 tonic_health::pb::FILE_DESCRIPTOR_SET,
161 ]
162 .into_iter()
163 .chain(std::mem::take(&mut self.extra_file_descriptor_sets))
164 .collect();
165
166 let grpc_method_allowlist = Arc::new(
169 metrics::grpc_method_paths_from_file_descriptor_sets(&file_descriptor_sets)
170 .expect("registered FileDescriptorSet bytes must be valid protobuf"),
171 );
172
173 let router = {
174 let ledger_service =
175 sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
176 self.clone(),
177 )
178 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
179 let ledger_service_v2alpha =
180 sui_rpc::proto::sui::rpc::v2alpha::ledger_service_server::LedgerServiceServer::new(
181 self.clone(),
182 )
183 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
184 let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
185 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
186 let state_service =
187 sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
188 self.clone(),
189 )
190 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
191 let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
192 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
193 let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
194 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
195 let name_service =
196 sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
197 self.clone(),
198 )
199 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
200
201 let event_service_alpha =
202 crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
203 self.clone(),
204 );
205 let proof_service_alpha =
206 crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
207 crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
208 );
209
210 let (health_reporter, health_service) = tonic_health::server::health_reporter();
211
212 let mut reflection_v1_builder = tonic_reflection::server::Builder::configure();
213 let mut reflection_v1alpha_builder = tonic_reflection::server::Builder::configure();
214 for fds in &file_descriptor_sets {
215 reflection_v1_builder =
216 reflection_v1_builder.register_encoded_file_descriptor_set(fds);
217 reflection_v1alpha_builder =
218 reflection_v1alpha_builder.register_encoded_file_descriptor_set(fds);
219 }
220
221 let reflection_v1 = reflection_v1_builder.build_v1().unwrap();
222 let reflection_v1alpha = reflection_v1alpha_builder.build_v1alpha().unwrap();
223
224 fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
225 S::NAME
226 }
227
228 for service_name in [
229 service_name(&ledger_service),
230 service_name(&transaction_execution_service),
231 service_name(&state_service),
232 service_name(&signature_verification_service),
233 service_name(&move_package_service),
234 service_name(&name_service),
235 service_name(&ledger_service_v2alpha),
236 service_name(&event_service_alpha),
237 service_name(&proof_service_alpha),
238 service_name(&reflection_v1),
239 service_name(&reflection_v1alpha),
240 ] {
241 health_reporter
242 .set_service_status(service_name, tonic_health::ServingStatus::Serving)
243 .await;
244 }
245
246 let mut services = grpc::Services::new()
247 .add_service(ledger_service)
249 .add_service(transaction_execution_service)
250 .add_service(state_service)
251 .add_service(signature_verification_service)
252 .add_service(move_package_service)
253 .add_service(name_service)
254 .add_service(ledger_service_v2alpha)
256 .add_service(event_service_alpha)
258 .add_service(proof_service_alpha)
259 .add_service(reflection_v1)
261 .add_service(reflection_v1alpha);
262
263 if self.subscription_service_handle.is_some() {
264 let subscription_service =
265sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
266 health_reporter
267 .set_service_status(
268 service_name(&subscription_service),
269 tonic_health::ServingStatus::Serving,
270 )
271 .await;
272
273 services = services.add_service(subscription_service);
274 }
275
276 for name in &extra_service_names {
277 health_reporter
278 .set_service_status(*name, tonic_health::ServingStatus::Serving)
279 .await;
280 }
281
282 services
283 .merge_router(extra_routes)
284 .add_service(health_service)
285 .into_router()
286 };
287
288 let health_endpoint = axum::Router::new()
289 .route("/health", axum::routing::get(service::health::health))
290 .with_state(self.clone());
291
292 router
293 .merge(health_endpoint)
294 .layer(axum::middleware::map_response_with_state(
295 self,
296 response::append_info_headers,
297 ))
298 .pipe(|router| {
299 if let Some(metrics) = metrics {
300 router.layer(CallbackLayer::new(
301 metrics::RpcMetricsMakeCallbackHandler::with_grpc_method_allowlist(
302 metrics,
303 grpc_method_allowlist,
304 ),
305 ))
306 } else {
307 router
308 }
309 })
310 }
311
312 pub async fn start_service(self, socket_address: std::net::SocketAddr) {
313 let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
314 axum::serve(listener, self.into_router().await)
315 .await
316 .unwrap();
317 }
318}
319
320#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
321#[serde(rename_all = "lowercase")]
322pub enum Direction {
323 Ascending,
324 Descending,
325}
326
327impl Direction {
328 pub fn is_descending(self) -> bool {
329 matches!(self, Self::Descending)
330 }
331}