sui_bridge/
metered_eth_provider.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::metrics::BridgeMetrics;
5use crate::utils::EthProvider;
6use alloy::providers::RootProvider;
7use alloy::rpc::client::RpcClient;
8use alloy::rpc::json_rpc::{RequestPacket, ResponsePacket};
9use alloy::transports::http::{Http, reqwest};
10use alloy::transports::{Transport, TransportError};
11use std::fmt::Debug;
12use std::future::Future;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::{Context, Poll};
16use std::time::Duration;
17use tower::Service;
18use url::{ParseError, Url};
19
20#[derive(Debug, Clone)]
21pub struct MeteredHttpService<S> {
22    inner: S,
23    metrics: Arc<BridgeMetrics>,
24}
25
26impl<S> MeteredHttpService<S> {
27    pub fn new(inner: S, metrics: Arc<BridgeMetrics>) -> Self {
28        Self { inner, metrics }
29    }
30}
31
32impl<S> Service<RequestPacket> for MeteredHttpService<S>
33where
34    S: Transport + Clone,
35    S::Future: Send + 'static,
36{
37    type Response = ResponsePacket;
38    type Error = TransportError;
39    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
40
41    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42        self.inner.poll_ready(cx)
43    }
44
45    fn call(&mut self, req: RequestPacket) -> Self::Future {
46        let method_name = match &req {
47            RequestPacket::Single(req) => req.method().to_string(),
48            RequestPacket::Batch(_) => "batch".to_string(),
49        };
50
51        self.metrics
52            .eth_rpc_queries
53            .with_label_values(&[&method_name])
54            .inc();
55
56        let timer = self
57            .metrics
58            .eth_rpc_queries_latency
59            .with_label_values(&[&method_name])
60            .start_timer();
61
62        let future = self.inner.call(req);
63
64        // Wrap the future to ensure the timer is dropped when the future completes
65        Box::pin(async move {
66            let result = future.await;
67            // Dropping the timer records the duration in the histogram
68            drop(timer);
69            result
70        })
71    }
72}
73
74pub fn new_metered_eth_provider(
75    url: &str,
76    metrics: Arc<BridgeMetrics>,
77) -> Result<EthProvider, ParseError> {
78    let url: Url = url.parse()?;
79    let client = reqwest::Client::builder()
80        .timeout(std::time::Duration::from_secs(30))
81        .build()
82        .expect("Failed to create reqwest client");
83    let http_transport = Http::with_client(client, url);
84    let metered_transport = MeteredHttpService::new(http_transport, metrics);
85    let rpc_client =
86        RpcClient::new(metered_transport, false).with_poll_interval(Duration::from_millis(2000));
87    Ok(Arc::new(RootProvider::new(rpc_client)))
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use alloy::providers::Provider;
94    use prometheus::Registry;
95
96    #[tokio::test]
97    async fn test_metered_eth_provider() {
98        let metrics = Arc::new(BridgeMetrics::new(&Registry::new()));
99        let provider = new_metered_eth_provider("http://localhost:9876", metrics.clone()).unwrap();
100
101        assert_eq!(
102            metrics
103                .eth_rpc_queries
104                .get_metric_with_label_values(&["eth_blockNumber"])
105                .unwrap()
106                .get(),
107            0
108        );
109        assert_eq!(
110            metrics
111                .eth_rpc_queries_latency
112                .get_metric_with_label_values(&["eth_blockNumber"])
113                .unwrap()
114                .get_sample_count(),
115            0
116        );
117
118        provider.get_block_number().await.unwrap_err(); // the rpc cal will fail but we don't care
119
120        assert_eq!(
121            metrics
122                .eth_rpc_queries
123                .get_metric_with_label_values(&["eth_blockNumber"])
124                .unwrap()
125                .get(),
126            1
127        );
128        assert_eq!(
129            metrics
130                .eth_rpc_queries_latency
131                .get_metric_with_label_values(&["eth_blockNumber"])
132                .unwrap()
133                .get_sample_count(),
134            1
135        );
136    }
137}