pull_grafana_logs/
pull_grafana_logs.rs1use reqwest::header::ACCEPT;
7use serde::Deserialize;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::env;
11use std::error::Error;
12use std::fs::File;
13use std::io::{BufWriter, Write};
14use std::process;
15use tracing::{debug, error, info};
16
17const MAX_LOGS_PER_REQUEST: u64 = 10000;
19
20#[derive(Debug, Deserialize)]
22struct LokiResponse {
23 data: LokiData,
24}
25
26#[derive(Debug, Deserialize)]
27struct LokiData {
28 result: Vec<LokiResult>,
29}
30
31#[derive(Debug, Deserialize)]
32struct LokiResult {
33 values: Vec<(String, String)>,
34}
35
36#[derive(Debug, Deserialize)]
37struct GrafanaLog {
38 message: String,
39}
40
41#[derive(Debug)]
42struct LogEntry {
43 timestamp: String,
44 host: String,
45 method: String,
46 body: String,
47}
48
49fn extract_from_message(message: &str) -> Option<LogEntry> {
52 let timestamp = message.split_whitespace().next()?.to_string();
53
54 let headers_start = message.find("headers=")?;
55 let headers_str = &message[headers_start..];
56 let headers_json_str = headers_str
57 .trim_start_matches("headers=")
58 .split_once(" body=")?
59 .0;
60 let headers: Value = serde_json::from_str(headers_json_str).ok()?;
61 let host = headers
62 .get("host")
63 .and_then(|h| h.as_str())
64 .unwrap_or("unknown_host")
65 .to_string();
66
67 if let Some(body_start) = message.find("body=")
68 && let Some(peer_type_start) = message.find(" peer_type=")
69 {
70 let raw_body = &message[(body_start + 5)..peer_type_start].trim();
71 if raw_body.starts_with('b') {
72 let trimmed = raw_body.trim_start_matches('b').trim_matches('"');
73 let unescaped = trimmed.replace("\\\"", "\"");
74
75 if let Ok(parsed) = serde_json::from_str::<Value>(&unescaped) {
76 let method = parsed
77 .get("method")
78 .and_then(|m| m.as_str())
79 .unwrap_or("unknown_method")
80 .to_string();
81 return Some(LogEntry {
82 timestamp,
83 host,
84 method,
85 body: unescaped,
86 });
87 }
88 }
89 }
90 None
91}
92
93fn extract_from_proxy_message(message: &str) -> Option<LogEntry> {
96 let timestamp = message.split_whitespace().next()?.to_string();
98
99 let request_start = message.find("Request: ")?;
101 let request_str = &message[request_start..];
102 let request_split: Vec<&str> = request_str.split(" | Response body (UTF-8): ").collect();
103
104 if request_split.len() == 2 {
105 let request_details = request_split[0];
106 let response_body = request_split[1];
107
108 let method_start = request_details.find("Method: ")?;
110 let method_str = &request_details[method_start + 8..]; let method = method_str.split(',').next()?.trim().to_string();
112
113 let params_start = request_details.find("Params: ")?;
115 let params_str = &request_details[params_start + 8..]; let params = params_str.trim();
117
118 let id = if let Ok(response_json) = serde_json::from_str::<Value>(response_body) {
120 response_json.get("id").cloned().unwrap_or(Value::Null)
121 } else {
122 Value::Null
123 };
124
125 let request_body = format!(
127 r#"{{"jsonrpc":"2.0","id":{},"method":"{}","params":{}}}"#,
128 id, method, params
129 );
130
131 let host = "unknown".to_string();
133
134 return Some(LogEntry {
135 timestamp,
136 host,
137 method,
138 body: request_body,
139 });
140 }
141
142 None
143}
144
145async fn fetch_logs(
146 client: &reqwest::Client,
147 url: &str,
148 query: &str,
149 start: &str,
150 end: &str,
151 limit: u64,
152 offset: Option<u64>,
153) -> Result<LokiResponse, Box<dyn Error>> {
154 let mut params = vec![
155 ("query".to_string(), query.to_string()),
156 ("start".to_string(), start.to_string()),
157 ("end".to_string(), end.to_string()),
158 ("limit".to_string(), limit.to_string()),
159 ];
160 if let Some(o) = offset {
161 params.push(("start_from".to_string(), o.to_string()));
162 }
163
164 info!("Fetching logs from {} with params: {:?}", url, params);
165
166 let resp = client
167 .get(url)
168 .header(ACCEPT, "application/json")
169 .header("X-Scope-OrgID", "sui-fleet")
170 .query(¶ms)
171 .send()
172 .await?;
173
174 let status = resp.status();
175 if !status.is_success() {
176 let error_body = resp.text().await?;
177 error!("Error response: {}", error_body);
178 return Err(format!("Request failed with status: {}", status).into());
179 }
180 Ok(resp.json().await?)
181}
182
183#[tokio::main]
184async fn main() {
185 let _guard = telemetry_subscribers::TelemetryConfig::new()
186 .with_env()
187 .init();
188 if let Err(e) = run().await {
189 error!("Error: {}", e);
190 process::exit(1);
191 }
192}
193
194async fn run() -> Result<(), Box<dyn Error>> {
195 let grafana_url = env::var("GRAFANA_LOGS_URL")
196 .unwrap_or_else(|_| "https://metrics.sui.io/loki/api/v1/query_range".to_string());
197 let net = env::var("NET").unwrap_or_else(|_| "mainnet".to_string());
198 let namespace = if net == "testnet" {
199 "rpc-testnet".to_string()
200 } else if net == "mainnet" {
201 "rpc-mainnet".to_string()
202 } else if net == "mainnet-proxy" {
203 "sui-indexer-alt-jsonrpc-proxy-mainnet".to_string()
204 } else {
205 "UNKNOWN_NET".to_string()
206 };
207
208 let default_substring = if net == "mainnet-proxy" {
209 "Request:"
210 } else {
211 "Sampled read request"
212 };
213 let substring = env::var("SUBSTRING").unwrap_or_else(|_| default_substring.to_string());
214 let query = format!(r#"{{namespace="{}"}} |= "{}""#, namespace, substring);
215 debug!("Query: {}", query);
216
217 let now = chrono::Utc::now();
218 let one_day_ago = now - chrono::Duration::days(1);
219 let start = env::var("START").unwrap_or(one_day_ago.format("%Y-%m-%dT%H:%M:%SZ").to_string());
220 let end = env::var("END").unwrap_or(now.format("%Y-%m-%dT%H:%M:%SZ").to_string());
221 let limit: Option<u64> = env::var("LIMIT").ok().and_then(|l| l.parse().ok());
222 let client = reqwest::Client::new();
223
224 let mut all_logs = Vec::new();
225 let mut offset = None;
226 loop {
227 let chunk_limit = match limit {
228 Some(l) => {
229 let fetched = all_logs.len() as u64;
230 if fetched >= l {
231 break;
232 }
233 std::cmp::min(MAX_LOGS_PER_REQUEST, l - fetched)
234 }
235 None => MAX_LOGS_PER_REQUEST,
236 };
237
238 let response = fetch_logs(
239 &client,
240 &grafana_url,
241 &query,
242 &start,
243 &end,
244 chunk_limit,
245 offset,
246 )
247 .await?;
248
249 let batch: Vec<_> = response
250 .data
251 .result
252 .into_iter()
253 .flat_map(|result| {
254 result
255 .values
256 .into_iter()
257 .map(|(_, message)| GrafanaLog { message })
258 })
259 .collect();
260 if batch.is_empty() {
262 break;
263 }
264
265 let batch_len = batch.len();
266 all_logs.extend(batch);
267 offset = Some(offset.unwrap_or(0) + batch_len as u64);
268 }
269 info!("Found {} logs.", all_logs.len());
270
271 let mut method_map: HashMap<String, usize> = HashMap::new();
273 let mut asc_log_entries = Vec::new();
274 for log_entry in all_logs.into_iter().rev() {
275 if let Some(entry) = if net == "mainnet-proxy" {
276 extract_from_proxy_message(&log_entry.message)
277 } else {
278 extract_from_message(&log_entry.message)
279 } {
280 *method_map.entry(entry.method.clone()).or_default() += 1;
281 asc_log_entries.push(entry);
282 }
283 }
284 for (method, count) in &method_map {
285 info!("Found {} logs for method: {}", count, method);
286 }
287
288 let output_dir = env::var("OUTPUT_DIR").unwrap_or_else(|_| ".".to_string());
289 let output_file = format!("{}/sampled_read_requests.jsonl", output_dir);
290 if let Some(parent) = std::path::Path::new(&output_file).parent()
291 && !parent.exists()
292 {
293 std::fs::create_dir_all(parent)?;
294 }
295
296 let file = File::create(&output_file)?;
297 let mut writer = BufWriter::new(file);
298 for entry in asc_log_entries {
299 let line = format!(
300 r#"{{"timestamp":"{}", "host":"{}", "method":"{}", "body":{}}}"#,
301 entry.timestamp, entry.host, entry.method, entry.body
302 );
303 writer.write_all(line.as_bytes())?;
304 writer.write_all(b"\n")?;
305 }
306 writer.flush()?;
307 info!("Done! Wrote grouped logs to {}", output_file);
308 Ok(())
309}