sui_bridge/
metered_eth_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::metrics::BridgeMetrics;
5use crate::utils::EthProvider;
6use alloy::providers::RootProvider;
7use alloy::rpc::client::RpcClient;
8use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
9use alloy::transports::http::{Http, reqwest};
10use alloy::transports::{Transport, TransportError};
11use std::fmt::Debug;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tower::Service;
18use url::{ParseError, Url};
19
20#[derive(Debug, Clone)]
21pub struct MeteredHttpService<S> {
22    inner: S,
23    metrics: Arc<BridgeMetrics>,
24}
25
26impl<S> MeteredHttpService<S> {
27    pub fn new(inner: S, metrics: Arc<BridgeMetrics>) -> Self {
28        Self { inner, metrics }
29    }
30}
31
32impl<S> Service<RequestPacket> for MeteredHttpService<S>
33where
34    S: Transport + Clone,
35    S::Future: Send + 'static,
36{
37    type Response = ResponsePacket;
38    type Error = TransportError;
39    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
40
41    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42        self.inner.poll_ready(cx)
43    }
44
45    fn call(&mut self, req: RequestPacket) -> Self::Future {
46        let method_name = match &req {
47            RequestPacket::Single(req) => req.method().to_string(),
48            RequestPacket::Batch(_) => "batch".to_string(),
49        };
50
51        self.metrics
52            .eth_rpc_queries
53            .with_label_values(&[&method_name])
54            .inc();
55
56        let timer = self
57            .metrics
58            .eth_rpc_queries_latency
59            .with_label_values(&[&method_name])
60            .start_timer();
61
62        let future = self.inner.call(req);
63
64        // Wrap the future to ensure the timer is dropped when the future completes
65        Box::pin(async move {
66            let result = future.await;
67            // Dropping the timer records the duration in the histogram
68            drop(timer);
69            result
70        })
71    }
72}
73
74pub fn new_metered_eth_provider(
75    url: &str,
76    metrics: Arc<BridgeMetrics>,
77) -> Result<EthProvider, ParseError> {
78    let url: Url = url.parse()?;
79    let client = reqwest::Client::builder()
80        .timeout(std::time::Duration::from_secs(30))
81        .build()
82        .expect("Failed to create reqwest client");
83    let http_transport = Http::with_client(client, url);
84    let metered_transport = MeteredHttpService::new(http_transport, metrics);
85    let rpc_client =
86        RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
87    Ok(Arc::new(RootProvider::new(rpc_client)))
88}
89
90pub async fn new_metered_eth_multi_provider(
91    urls: Vec<String>,
92    quorum: usize,
93    health_check_interval_secs: u64,
94    metrics: Arc<BridgeMetrics>,
95) -> anyhow::Result<EthProvider> {
96    use alloy_multiprovider_strategy::{MultiProviderConfig, QuorumTransport};
97
98    let config = MultiProviderConfig::new(urls, quorum)
99        .with_health_check_interval(Duration::from_secs(health_check_interval_secs))
100        .with_request_timeout(Duration::from_secs(30))
101        .with_start_health_check_on_init(false);
102
103    let transport = QuorumTransport::new(config)
104        .map_err(|e| anyhow::anyhow!("Failed to create QuorumTransport: {}", e))?;
105
106    transport.run_health_check().await;
107    transport.start_health_check_task();
108
109    let metered_transport = MeteredHttpService::new(transport, metrics);
110    let rpc_client =
111        RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
112    Ok(Arc::new(RootProvider::new(rpc_client)))
113}
114
115#[cfg(test)]
116mod tests {
117    use super::*;
118    use alloy::providers::Provider;
119    use prometheus::Registry;
120
121    async fn test_provider(metrics: &BridgeMetrics, provider: &EthProvider) {
122        assert_eq!(
123            metrics
124                .eth_rpc_queries
125                .get_metric_with_label_values(&["eth_blockNumber"])
126                .unwrap()
127                .get(),
128            0
129        );
130        assert_eq!(
131            metrics
132                .eth_rpc_queries_latency
133                .get_metric_with_label_values(&["eth_blockNumber"])
134                .unwrap()
135                .get_sample_count(),
136            0
137        );
138
139        provider.get_block_number().await.unwrap_err(); // the rpc call will fail but we don't care
140
141        assert_eq!(
142            metrics
143                .eth_rpc_queries
144                .get_metric_with_label_values(&["eth_blockNumber"])
145                .unwrap()
146                .get(),
147            1
148        );
149        assert_eq!(
150            metrics
151                .eth_rpc_queries_latency
152                .get_metric_with_label_values(&["eth_blockNumber"])
153                .unwrap()
154                .get_sample_count(),
155            1
156        );
157    }
158
159    #[tokio::test]
160    async fn test_metered_eth_multi_provider() {
161        let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
162        let provider = new_metered_eth_multi_provider(
163            vec!["http://localhost:9876".to_string()],
164            1,
165            300,
166            metrics.clone(),
167        )
168        .await
169        .unwrap();
170
171        test_provider(&metrics, &provider).await;
172    }
173
174    #[tokio::test]
175    async fn test_metered_eth_provider() {
176        let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
177        let provider = new_metered_eth_provider("http://localhost:9876", metrics.clone()).unwrap();
178
179        test_provider(&metrics, &provider).await;
180    }
181}