import_trace/
import-trace.rs1use bytes::Buf;
5use bytes_varint::VarIntSupport;
6use clap::*;
7use opentelemetry_proto::tonic::{
8 collector::trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
9 common::v1::{AnyValue, KeyValue, any_value},
10};
11use prost::Message;
12use std::io::{self, Cursor, Read};
13use tonic::Request;
14
15#[derive(Parser, Debug)]
16#[command(author, version, about, long_about = None)]
17struct Args {
18 #[arg(long)]
19 trace_file: String,
20
21 #[arg(long, default_value = "http://localhost:4317")]
22 otlp_endpoint: String,
23
24 #[arg(long)]
25 service_name: Option<String>,
26
27 #[arg(long)]
28 dump_spans: bool,
29}
30
31#[tokio::main]
32async fn main() {
33 let args = Args::parse();
34 let file = std::fs::File::open(args.trace_file).unwrap();
35
36 let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();
37
38 if args.dump_spans {
39 for message in messages.iter() {
40 for span in &message.resource_spans {
41 println!("{:#?}", span);
42 }
43 }
44 return;
45 }
46
47 let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
48 let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();
49
50 let service_name = args.service_name.unwrap_or_else(|| {
51 let timestamp = std::time::SystemTime::now()
52 .duration_since(std::time::UNIX_EPOCH)
53 .unwrap()
54 .as_secs();
55 format!("sui-node-{}", timestamp)
56 });
57
58 println!("importing trace with service name {:?}", service_name);
59
60 for mut message in messages {
61 let mut span_count = 0;
62
63 for resource_span in message.resource_spans.iter_mut() {
65 for scope_span in resource_span.scope_spans.iter() {
66 span_count += scope_span.spans.len();
67 }
68
69 if let Some(resource) = resource_span.resource.as_mut() {
70 let mut service_name_found = false;
71 for attr in resource.attributes.iter_mut() {
72 if attr.key == "service.name" {
73 service_name_found = true;
74 attr.value = Some(AnyValue {
75 value: Some(any_value::Value::StringValue(service_name.clone())),
76 });
77 }
78 }
79 if !service_name_found {
80 resource.attributes.push(KeyValue {
81 key: "service.name".to_string(),
82 value: Some(AnyValue {
83 value: Some(any_value::Value::StringValue(service_name.clone())),
84 }),
85 });
86 }
87 }
88 }
89
90 println!("sending {} spans to otlp collector", span_count);
91 trace_exporter.export(Request::new(message)).await.unwrap();
92 }
93 println!("all spans imported");
94}
95
96fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
97where
98 R: Read,
99 M: Message + Default,
100{
101 let mut messages = Vec::new();
102 let mut buffer = Vec::new();
103 reader.read_to_end(&mut buffer)?;
104 let mut cursor = Cursor::new(buffer);
105
106 while cursor.has_remaining() {
107 let len = cursor.try_get_u64_varint().unwrap() as usize;
108
109 if cursor.remaining() < len {
110 return Err(io::Error::new(
111 io::ErrorKind::UnexpectedEof,
112 "Incomplete message",
113 ));
114 }
115
116 let msg_bytes = cursor
118 .chunk()
119 .get(..len)
120 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;
121
122 let msg = M::decode(msg_bytes).map_err(|e| {
123 io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {}", e))
124 })?;
125 messages.push(msg);
126
127 cursor.advance(len);
129 }
130
131 Ok(messages)
132}