sui_bridge/
metered_eth_provider.rs1use crate::metrics::BridgeMetrics;
5use crate::utils::EthProvider;
6use alloy::providers::RootProvider;
7use alloy::rpc::client::RpcClient;
8use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
9use alloy::transports::http::{Http, reqwest};
10use alloy::transports::{Transport, TransportError};
11use std::fmt::Debug;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tower::Service;
18use url::{ParseError, Url};
19
20#[derive(Debug, Clone)]
21pub struct MeteredHttpService<S> {
22 inner: S,
23 metrics: Arc<BridgeMetrics>,
24}
25
26impl<S> MeteredHttpService<S> {
27 pub fn new(inner: S, metrics: Arc<BridgeMetrics>) -> Self {
28 Self { inner, metrics }
29 }
30}
31
32impl<S> Service<RequestPacket> for MeteredHttpService<S>
33where
34 S: Transport + Clone,
35 S::Future: Send + 'static,
36{
37 type Response = ResponsePacket;
38 type Error = TransportError;
39 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
40
41 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 self.inner.poll_ready(cx)
43 }
44
45 fn call(&mut self, req: RequestPacket) -> Self::Future {
46 let method_name = match &req {
47 RequestPacket::Single(req) => req.method().to_string(),
48 RequestPacket::Batch(_) => "batch".to_string(),
49 };
50
51 self.metrics
52 .eth_rpc_queries
53 .with_label_values(&[&method_name])
54 .inc();
55
56 let timer = self
57 .metrics
58 .eth_rpc_queries_latency
59 .with_label_values(&[&method_name])
60 .start_timer();
61
62 let future = self.inner.call(req);
63
64 Box::pin(async move {
66 let result = future.await;
67 drop(timer);
69 result
70 })
71 }
72}
73
74pub fn new_metered_eth_provider(
75 url: &str,
76 metrics: Arc<BridgeMetrics>,
77) -> Result<EthProvider, ParseError> {
78 let url: Url = url.parse()?;
79 let client = reqwest::Client::builder()
80 .timeout(std::time::Duration::from_secs(30))
81 .build()
82 .expect("Failed to create reqwest client");
83 let http_transport = Http::with_client(client, url);
84 let metered_transport = MeteredHttpService::new(http_transport, metrics);
85 let rpc_client =
86 RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
87 Ok(Arc::new(RootProvider::new(rpc_client)))
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use alloy::providers::Provider;
94 use prometheus::Registry;
95
96 #[tokio::test]
97 async fn test_metered_eth_provider() {
98 let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
99 let provider = new_metered_eth_provider("http://localhost:9876", metrics.clone()).unwrap();
100
101 assert_eq!(
102 metrics
103 .eth_rpc_queries
104 .get_metric_with_label_values(&["eth_blockNumber"])
105 .unwrap()
106 .get(),
107 0
108 );
109 assert_eq!(
110 metrics
111 .eth_rpc_queries_latency
112 .get_metric_with_label_values(&["eth_blockNumber"])
113 .unwrap()
114 .get_sample_count(),
115 0
116 );
117
118 provider.get_block_number().await.unwrap_err(); assert_eq!(
121 metrics
122 .eth_rpc_queries
123 .get_metric_with_label_values(&["eth_blockNumber"])
124 .unwrap()
125 .get(),
126 1
127 );
128 assert_eq!(
129 metrics
130 .eth_rpc_queries_latency
131 .get_metric_with_label_values(&["eth_blockNumber"])
132 .unwrap()
133 .get_sample_count(),
134 1
135 );
136 }
137}