pull_grafana_logs/
pull_grafana_logs.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4/// This script pulls sampled JSON RPC read requests from Grafana, extracts JSON bodies,
5/// and groups them by RPC "method" for later replay and analysis.
6use reqwest::header::ACCEPT;
7use serde::Deserialize;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::env;
11use std::error::Error;
12use std::fs::File;
13use std::io::{BufWriter, Write};
14use std::process;
15use tracing::{debug, error, info};
16
17// Loki has a limit of 10000 logs per request.
18const MAX_LOGS_PER_REQUEST: u64 = 10000;
19
20/// structs below are to mimic the parsed structure of LokiResponse.
21#[derive(Debug, Deserialize)]
22struct LokiResponse {
23    data: LokiData,
24}
25
26#[derive(Debug, Deserialize)]
27struct LokiData {
28    result: Vec<LokiResult>,
29}
30
31#[derive(Debug, Deserialize)]
32struct LokiResult {
33    values: Vec<(String, String)>,
34}
35
36#[derive(Debug, Deserialize)]
37struct GrafanaLog {
38    message: String,
39}
40
41#[derive(Debug)]
42struct LogEntry {
43    timestamp: String,
44    host: String,
45    method: String,
46    body: String,
47}
48
49/// One example message is:
50/// 2025-02-11T23:15:17.944697206Z stderr F 2025-02-11T23:15:17.944501Z  INFO sui_edge_proxy::handlers: Sampled read request headers={"host": "wallet-rpc.mainnet.sui.io", "client-sdk-type": "typescript", "client-sdk-version": "1.17.0", "client-target-api-version": "1.40.0", "client-request-method": "suix_getBalance", "content-type": "application/json", "content-length": "152", "accept-encoding": "gzip", "user-agent": "okhttp/4.9.2", "x-cloud-trace-context": "31caa7db658044d850a002ccf4ff15b1/8018737809747708392", "cookie": "_cfuvid=h0GD1bYot45Ln6kVCdL4qsFCCyw3h2cLw3caDNmhWNw-1739262948231-0.0.1.1-604800000", "via": "1.1 google", "x-forwarded-for": "171.236.184.3, 34.8.28.138", "x-forwarded-proto": "https", "connection": "Keep-Alive"} body=b"{\"jsonrpc\":\"2.0\",\"id\":189393,\"method\":\"suix_getBalance\",\"params\":[\"0x23cad599a375b9c2cedd62fa20112526c90a71764230425cb7f557c0c0b3b150\",\"0x2::sui::SUI\"]}" peer_type=Read
51fn extract_from_message(message: &str) -> Option<LogEntry> {
52    let timestamp = message.split_whitespace().next()?.to_string();
53
54    let headers_start = message.find("headers=")?;
55    let headers_str = &message[headers_start..];
56    let headers_json_str = headers_str
57        .trim_start_matches("headers=")
58        .split_once(" body=")?
59        .0;
60    let headers: Value = serde_json::from_str(headers_json_str).ok()?;
61    let host = headers
62        .get("host")
63        .and_then(|h| h.as_str())
64        .unwrap_or("unknown_host")
65        .to_string();
66
67    if let Some(body_start) = message.find("body=")
68        && let Some(peer_type_start) = message.find(" peer_type=")
69    {
70        let raw_body = &message[(body_start + 5)..peer_type_start].trim();
71        if raw_body.starts_with('b') {
72            let trimmed = raw_body.trim_start_matches('b').trim_matches('"');
73            let unescaped = trimmed.replace("\\\"", "\"");
74
75            if let Ok(parsed) = serde_json::from_str::<Value>(&unescaped) {
76                let method = parsed
77                    .get("method")
78                    .and_then(|m| m.as_str())
79                    .unwrap_or("unknown_method")
80                    .to_string();
81                return Some(LogEntry {
82                    timestamp,
83                    host,
84                    method,
85                    body: unescaped,
86                });
87            }
88        }
89    }
90    None
91}
92
93/// Example log format:
94/// 2025-08-10T00:05:45.423808Z  INFO sui_indexer_alt_jsonrpc_proxy::handlers: Request: Method: suix_getAllBalances, Params: ["0x10e8a57972082d89f8e2a31589a96da4a0ade2ac003e4f41a0f7b77dbfd752ba"] | Response body (UTF-8): {"jsonrpc":"2.0","id":47539,"result":[...]}
95fn extract_from_proxy_message(message: &str) -> Option<LogEntry> {
96    // Extract timestamp (first token in the message)
97    let timestamp = message.split_whitespace().next()?.to_string();
98
99    // Find the request info
100    let request_start = message.find("Request: ")?;
101    let request_str = &message[request_start..];
102    let request_split: Vec<&str> = request_str.split(" | Response body (UTF-8): ").collect();
103
104    if request_split.len() == 2 {
105        let request_details = request_split[0];
106        let response_body = request_split[1];
107
108        // Extract method from "Request: Method: suix_getAllBalances, Params: ..."
109        let method_start = request_details.find("Method: ")?;
110        let method_str = &request_details[method_start + 8..]; // Skip "Method: "
111        let method = method_str.split(',').next()?.trim().to_string();
112
113        // Extract params from "Params: [...]"
114        let params_start = request_details.find("Params: ")?;
115        let params_str = &request_details[params_start + 8..]; // Skip "Params: "
116        let params = params_str.trim();
117
118        // Extract the id from the response body to reconstruct the request
119        let id = if let Ok(response_json) = serde_json::from_str::<Value>(response_body) {
120            response_json.get("id").cloned().unwrap_or(Value::Null)
121        } else {
122            Value::Null
123        };
124
125        // Construct the JSON-RPC request body
126        let request_body = format!(
127            r#"{{"jsonrpc":"2.0","id":{},"method":"{}","params":{}}}"#,
128            id, method, params
129        );
130
131        // Host is not in this format, use a placeholder
132        let host = "unknown".to_string();
133
134        return Some(LogEntry {
135            timestamp,
136            host,
137            method,
138            body: request_body,
139        });
140    }
141
142    None
143}
144
145async fn fetch_logs(
146    client: &reqwest::Client,
147    url: &str,
148    query: &str,
149    start: &str,
150    end: &str,
151    limit: u64,
152    offset: Option<u64>,
153) -> Result<LokiResponse, Box<dyn Error>> {
154    let mut params = vec![
155        ("query".to_string(), query.to_string()),
156        ("start".to_string(), start.to_string()),
157        ("end".to_string(), end.to_string()),
158        ("limit".to_string(), limit.to_string()),
159    ];
160    if let Some(o) = offset {
161        params.push(("start_from".to_string(), o.to_string()));
162    }
163
164    info!("Fetching logs from {} with params: {:?}", url, params);
165
166    let resp = client
167        .get(url)
168        .header(ACCEPT, "application/json")
169        .header("X-Scope-OrgID", "sui-fleet")
170        .query(&params)
171        .send()
172        .await?;
173
174    let status = resp.status();
175    if !status.is_success() {
176        let error_body = resp.text().await?;
177        error!("Error response: {}", error_body);
178        return Err(format!("Request failed with status: {}", status).into());
179    }
180    Ok(resp.json().await?)
181}
182
183#[tokio::main]
184async fn main() {
185    let _guard = telemetry_subscribers::TelemetryConfig::new()
186        .with_env()
187        .init();
188    if let Err(e) = run().await {
189        error!("Error: {}", e);
190        process::exit(1);
191    }
192}
193
194async fn run() -> Result<(), Box<dyn Error>> {
195    let grafana_url = env::var("GRAFANA_LOGS_URL")
196        .unwrap_or_else(|_| "https://metrics.sui.io/loki/api/v1/query_range".to_string());
197    let net = env::var("NET").unwrap_or_else(|_| "mainnet".to_string());
198    let namespace = if net == "testnet" {
199        "rpc-testnet".to_string()
200    } else if net == "mainnet" {
201        "rpc-mainnet".to_string()
202    } else if net == "mainnet-proxy" {
203        "sui-indexer-alt-jsonrpc-proxy-mainnet".to_string()
204    } else {
205        "UNKNOWN_NET".to_string()
206    };
207
208    let default_substring = if net == "mainnet-proxy" {
209        "Request:"
210    } else {
211        "Sampled read request"
212    };
213    let substring = env::var("SUBSTRING").unwrap_or_else(|_| default_substring.to_string());
214    let query = format!(r#"{{namespace="{}"}} |= "{}""#, namespace, substring);
215    debug!("Query: {}", query);
216
217    let now = chrono::Utc::now();
218    let one_day_ago = now - chrono::Duration::days(1);
219    let start = env::var("START").unwrap_or(one_day_ago.format("%Y-%m-%dT%H:%M:%SZ").to_string());
220    let end = env::var("END").unwrap_or(now.format("%Y-%m-%dT%H:%M:%SZ").to_string());
221    let limit: Option<u64> = env::var("LIMIT").ok().and_then(|l| l.parse().ok());
222    let client = reqwest::Client::new();
223
224    let mut all_logs = Vec::new();
225    let mut offset = None;
226    loop {
227        let chunk_limit = match limit {
228            Some(l) => {
229                let fetched = all_logs.len() as u64;
230                if fetched >= l {
231                    break;
232                }
233                std::cmp::min(MAX_LOGS_PER_REQUEST, l - fetched)
234            }
235            None => MAX_LOGS_PER_REQUEST,
236        };
237
238        let response = fetch_logs(
239            &client,
240            &grafana_url,
241            &query,
242            &start,
243            &end,
244            chunk_limit,
245            offset,
246        )
247        .await?;
248
249        let batch: Vec<_> = response
250            .data
251            .result
252            .into_iter()
253            .flat_map(|result| {
254                result
255                    .values
256                    .into_iter()
257                    .map(|(_, message)| GrafanaLog { message })
258            })
259            .collect();
260        // If we have no logs, break
261        if batch.is_empty() {
262            break;
263        }
264
265        let batch_len = batch.len();
266        all_logs.extend(batch);
267        offset = Some(offset.unwrap_or(0) + batch_len as u64);
268    }
269    info!("Found {} logs.", all_logs.len());
270
271    // Gather method statistics
272    let mut method_map: HashMap<String, usize> = HashMap::new();
273    let mut asc_log_entries = Vec::new();
274    for log_entry in all_logs.into_iter().rev() {
275        if let Some(entry) = if net == "mainnet-proxy" {
276            extract_from_proxy_message(&log_entry.message)
277        } else {
278            extract_from_message(&log_entry.message)
279        } {
280            *method_map.entry(entry.method.clone()).or_default() += 1;
281            asc_log_entries.push(entry);
282        }
283    }
284    for (method, count) in &method_map {
285        info!("Found {} logs for method: {}", count, method);
286    }
287
288    let output_dir = env::var("OUTPUT_DIR").unwrap_or_else(|_| ".".to_string());
289    let output_file = format!("{}/sampled_read_requests.jsonl", output_dir);
290    if let Some(parent) = std::path::Path::new(&output_file).parent()
291        && !parent.exists()
292    {
293        std::fs::create_dir_all(parent)?;
294    }
295
296    let file = File::create(&output_file)?;
297    let mut writer = BufWriter::new(file);
298    for entry in asc_log_entries {
299        let line = format!(
300            r#"{{"timestamp":"{}", "host":"{}", "method":"{}", "body":{}}}"#,
301            entry.timestamp, entry.host, entry.method, entry.body
302        );
303        writer.write_all(line.as_bytes())?;
304        writer.write_all(b"\n")?;
305    }
306    writer.flush()?;
307    info!("Done! Wrote grouped logs to {}", output_file);
308    Ok(())
309}