1use mysten_network::callback::CallbackLayer;
5use reader::StateReader;
6use std::sync::Arc;
7use subscription::SubscriptionServiceHandle;
8use sui_types::storage::RpcStateReader;
9use sui_types::transaction_executor::TransactionExecutor;
10use tap::Pipe;
11
12pub mod client;
13mod config;
14mod error;
15pub mod grpc;
16mod metrics;
17mod reader;
18mod response;
19mod service;
20pub mod subscription;
21
22pub use client::Client;
23pub use config::Config;
24pub use error::{
25 CheckpointNotFoundError, ErrorDetails, ErrorReason, ObjectNotFoundError, Result, RpcError,
26};
27pub use metrics::{RpcMetrics, RpcMetricsMakeCallbackHandler};
28pub use reader::TransactionNotFoundError;
29pub use sui_rpc::proto;
30
31#[derive(Clone)]
32pub struct ServerVersion {
33 pub bin: &'static str,
34 pub version: &'static str,
35}
36
37impl ServerVersion {
38 pub fn new(bin: &'static str, version: &'static str) -> Self {
39 Self { bin, version }
40 }
41}
42
43impl std::fmt::Display for ServerVersion {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.write_str(self.bin)?;
46 f.write_str("/")?;
47 f.write_str(self.version)
48 }
49}
50
51#[derive(Clone)]
52pub struct RpcService {
53 reader: StateReader,
54 executor: Option<Arc<dyn TransactionExecutor>>,
55 subscription_service_handle: Option<SubscriptionServiceHandle>,
56 chain_id: sui_types::digests::ChainIdentifier,
57 server_version: Option<ServerVersion>,
58 metrics: Option<Arc<RpcMetrics>>,
59 config: Config,
60}
61
62impl RpcService {
63 pub fn new(reader: Arc<dyn RpcStateReader>) -> Self {
64 let chain_id = reader.get_chain_identifier().unwrap();
65 Self {
66 reader: StateReader::new(reader),
67 executor: None,
68 subscription_service_handle: None,
69 chain_id,
70 server_version: None,
71 metrics: None,
72 config: Config::default(),
73 }
74 }
75
76 pub fn with_server_version(&mut self, server_version: ServerVersion) -> &mut Self {
77 self.server_version = Some(server_version);
78 self
79 }
80
81 pub fn with_config(&mut self, config: Config) {
82 self.config = config;
83 }
84
85 pub fn with_executor(&mut self, executor: Arc<dyn TransactionExecutor + Send + Sync>) {
86 self.executor = Some(executor);
87 }
88
89 pub fn with_subscription_service(
90 &mut self,
91 subscription_service_handle: SubscriptionServiceHandle,
92 ) {
93 self.subscription_service_handle = Some(subscription_service_handle);
94 }
95
96 pub fn with_metrics(&mut self, metrics: RpcMetrics) {
97 self.metrics = Some(Arc::new(metrics));
98 }
99
100 pub fn chain_id(&self) -> sui_types::digests::ChainIdentifier {
101 self.chain_id
102 }
103
104 pub fn server_version(&self) -> Option<&ServerVersion> {
105 self.server_version.as_ref()
106 }
107
108 pub async fn into_router(self) -> axum::Router {
109 let metrics = self.metrics.clone();
110
111 let router = {
112 let ledger_service =
113 sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(
114 self.clone(),
115 )
116 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
117 let transaction_execution_service = sui_rpc::proto::sui::rpc::v2::transaction_execution_service_server::TransactionExecutionServiceServer::new(self.clone())
118 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
119 let state_service =
120 sui_rpc::proto::sui::rpc::v2::state_service_server::StateServiceServer::new(
121 self.clone(),
122 )
123 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
124 let signature_verification_service = sui_rpc::proto::sui::rpc::v2::signature_verification_service_server::SignatureVerificationServiceServer::new(self.clone())
125 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
126 let move_package_service = sui_rpc::proto::sui::rpc::v2::move_package_service_server::MovePackageServiceServer::new(self.clone())
127 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
128 let name_service =
129 sui_rpc::proto::sui::rpc::v2::name_service_server::NameServiceServer::new(
130 self.clone(),
131 )
132 .send_compressed(tonic::codec::CompressionEncoding::Zstd);
133
134 let event_service_alpha =
135 crate::grpc::alpha::event_service_proto::event_service_server::EventServiceServer::new(
136 self.clone(),
137 );
138 let proof_service_alpha =
139 crate::grpc::alpha::proof_service_proto::proof_service_server::ProofServiceServer::new(
140 crate::grpc::alpha::proof_service::ProofServiceImpl::new(self.clone()),
141 );
142
143 let (health_reporter, health_service) = tonic_health::server::health_reporter();
144
145 let reflection_v1 = tonic_reflection::server::Builder::configure()
146 .register_encoded_file_descriptor_set(
147 crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
148 )
149 .register_encoded_file_descriptor_set(
150 crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
151 )
152 .register_encoded_file_descriptor_set(
153 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
154 )
155 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
156 .build_v1()
157 .unwrap();
158
159 let reflection_v1alpha = tonic_reflection::server::Builder::configure()
160 .register_encoded_file_descriptor_set(
161 crate::proto::google::protobuf::FILE_DESCRIPTOR_SET,
162 )
163 .register_encoded_file_descriptor_set(
164 crate::proto::google::rpc::FILE_DESCRIPTOR_SET,
165 )
166 .register_encoded_file_descriptor_set(
167 sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET,
168 )
169 .register_encoded_file_descriptor_set(tonic_health::pb::FILE_DESCRIPTOR_SET)
170 .build_v1alpha()
171 .unwrap();
172
173 fn service_name<S: tonic::server::NamedService>(_service: &S) -> &'static str {
174 S::NAME
175 }
176
177 for service_name in [
178 service_name(&ledger_service),
179 service_name(&transaction_execution_service),
180 service_name(&state_service),
181 service_name(&signature_verification_service),
182 service_name(&move_package_service),
183 service_name(&name_service),
184 service_name(&event_service_alpha),
185 service_name(&proof_service_alpha),
186 service_name(&reflection_v1),
187 service_name(&reflection_v1alpha),
188 ] {
189 health_reporter
190 .set_service_status(service_name, tonic_health::ServingStatus::Serving)
191 .await;
192 }
193
194 let mut services = grpc::Services::new()
195 .add_service(ledger_service)
197 .add_service(transaction_execution_service)
198 .add_service(state_service)
199 .add_service(signature_verification_service)
200 .add_service(move_package_service)
201 .add_service(name_service)
202 .add_service(event_service_alpha)
204 .add_service(proof_service_alpha)
205 .add_service(reflection_v1)
207 .add_service(reflection_v1alpha);
208
209 if self.subscription_service_handle.is_some() {
210 let subscription_service =
211sui_rpc::proto::sui::rpc::v2::subscription_service_server::SubscriptionServiceServer::new(self.clone());
212 health_reporter
213 .set_service_status(
214 service_name(&subscription_service),
215 tonic_health::ServingStatus::Serving,
216 )
217 .await;
218
219 services = services.add_service(subscription_service);
220 }
221
222 services.add_service(health_service).into_router()
223 };
224
225 let health_endpoint = axum::Router::new()
226 .route("/health", axum::routing::get(service::health::health))
227 .with_state(self.clone());
228
229 router
230 .merge(health_endpoint)
231 .layer(axum::middleware::map_response_with_state(
232 self,
233 response::append_info_headers,
234 ))
235 .pipe(|router| {
236 if let Some(metrics) = metrics {
237 router.layer(CallbackLayer::new(
238 metrics::RpcMetricsMakeCallbackHandler::new(metrics),
239 ))
240 } else {
241 router
242 }
243 })
244 }
245
246 pub async fn start_service(self, socket_address: std::net::SocketAddr) {
247 let listener = tokio::net::TcpListener::bind(socket_address).await.unwrap();
248 axum::serve(listener, self.into_router().await)
249 .await
250 .unwrap();
251 }
252}
253
254#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
255#[serde(rename_all = "lowercase")]
256pub enum Direction {
257 Ascending,
258 Descending,
259}
260
261impl Direction {
262 pub fn is_descending(self) -> bool {
263 matches!(self, Self::Descending)
264 }
265}