sui_bridge/
metered_eth_provider.rs1use crate::metrics::BridgeMetrics;
5use crate::utils::EthProvider;
6use alloy::providers::RootProvider;
7use alloy::rpc::client::RpcClient;
8use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
9use alloy::transports::http::{Http, reqwest};
10use alloy::transports::{Transport, TransportError};
11use std::fmt::Debug;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tower::Service;
18use url::{ParseError, Url};
19
20#[derive(Debug, Clone)]
21pub struct MeteredHttpService<S> {
22 inner: S,
23 metrics: Arc<BridgeMetrics>,
24}
25
26impl<S> MeteredHttpService<S> {
27 pub fn new(inner: S, metrics: Arc<BridgeMetrics>) -> Self {
28 Self { inner, metrics }
29 }
30}
31
32impl<S> Service<RequestPacket> for MeteredHttpService<S>
33where
34 S: Transport + Clone,
35 S::Future: Send + 'static,
36{
37 type Response = ResponsePacket;
38 type Error = TransportError;
39 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
40
41 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 self.inner.poll_ready(cx)
43 }
44
45 fn call(&mut self, req: RequestPacket) -> Self::Future {
46 let method_name = match &req {
47 RequestPacket::Single(req) => req.method().to_string(),
48 RequestPacket::Batch(_) => "batch".to_string(),
49 };
50
51 self.metrics
52 .eth_rpc_queries
53 .with_label_values(&[&method_name])
54 .inc();
55
56 let timer = self
57 .metrics
58 .eth_rpc_queries_latency
59 .with_label_values(&[&method_name])
60 .start_timer();
61
62 let future = self.inner.call(req);
63
64 Box::pin(async move {
66 let result = future.await;
67 drop(timer);
69 result
70 })
71 }
72}
73
74pub fn new_metered_eth_provider(
75 url: &str,
76 metrics: Arc<BridgeMetrics>,
77) -> Result<EthProvider, ParseError> {
78 let url: Url = url.parse()?;
79 let client = reqwest::Client::builder()
80 .timeout(std::time::Duration::from_secs(30))
81 .build()
82 .expect("Failed to create reqwest client");
83 let http_transport = Http::with_client(client, url);
84 let metered_transport = MeteredHttpService::new(http_transport, metrics);
85 let rpc_client =
86 RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
87 Ok(Arc::new(RootProvider::new(rpc_client)))
88}
89
90pub async fn new_metered_eth_multi_provider(
91 urls: Vec<String>,
92 quorum: usize,
93 health_check_interval_secs: u64,
94 metrics: Arc<BridgeMetrics>,
95) -> anyhow::Result<EthProvider> {
96 use alloy_multiprovider_strategy::{MultiProviderConfig, QuorumTransport};
97
98 let config = MultiProviderConfig::new(urls, quorum)
99 .with_health_check_interval(Duration::from_secs(health_check_interval_secs))
100 .with_request_timeout(Duration::from_secs(30))
101 .with_start_health_check_on_init(false);
102
103 let transport = QuorumTransport::new(config)
104 .map_err(|e| anyhow::anyhow!("Failed to create QuorumTransport: {}", e))?;
105
106 transport.run_health_check().await;
107 transport.start_health_check_task();
108
109 let metered_transport = MeteredHttpService::new(transport, metrics);
110 let rpc_client =
111 RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
112 Ok(Arc::new(RootProvider::new(rpc_client)))
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use alloy::providers::Provider;
119 use prometheus::Registry;
120
121 async fn test_provider(metrics: &BridgeMetrics, provider: &EthProvider) {
122 assert_eq!(
123 metrics
124 .eth_rpc_queries
125 .get_metric_with_label_values(&["eth_blockNumber"])
126 .unwrap()
127 .get(),
128 0
129 );
130 assert_eq!(
131 metrics
132 .eth_rpc_queries_latency
133 .get_metric_with_label_values(&["eth_blockNumber"])
134 .unwrap()
135 .get_sample_count(),
136 0
137 );
138
139 provider.get_block_number().await.unwrap_err(); assert_eq!(
142 metrics
143 .eth_rpc_queries
144 .get_metric_with_label_values(&["eth_blockNumber"])
145 .unwrap()
146 .get(),
147 1
148 );
149 assert_eq!(
150 metrics
151 .eth_rpc_queries_latency
152 .get_metric_with_label_values(&["eth_blockNumber"])
153 .unwrap()
154 .get_sample_count(),
155 1
156 );
157 }
158
159 #[tokio::test]
160 async fn test_metered_eth_multi_provider() {
161 let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
162 let provider = new_metered_eth_multi_provider(
163 vec!["http://localhost:9876".to_string()],
164 1,
165 300,
166 metrics.clone(),
167 )
168 .await
169 .unwrap();
170
171 test_provider(&metrics, &provider).await;
172 }
173
174 #[tokio::test]
175 async fn test_metered_eth_provider() {
176 let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
177 let provider = new_metered_eth_provider("http://localhost:9876", metrics.clone()).unwrap();
178
179 test_provider(&metrics, &provider).await;
180 }
181}