import_trace/
import-trace.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use bytes::Buf;
5use bytes_varint::VarIntSupport;
6use clap::*;
7use opentelemetry_proto::tonic::{
8    collector::trace::v1::{ExportTraceServiceRequest, trace_service_client::TraceServiceClient},
9    common::v1::{AnyValue, KeyValue, any_value},
10};
11use prost::Message;
12use std::io::{self, Cursor, Read};
13use tonic::Request;
14
15#[derive(Parser, Debug)]
16#[command(author, version, about, long_about = None)]
17struct Args {
18    #[arg(long)]
19    trace_file: String,
20
21    #[arg(long, default_value = "http://localhost:4317")]
22    otlp_endpoint: String,
23
24    #[arg(long)]
25    service_name: Option<String>,
26
27    #[arg(long)]
28    dump_spans: bool,
29}
30
31#[tokio::main]
32async fn main() {
33    let args = Args::parse();
34    let file = std::fs::File::open(args.trace_file).unwrap();
35
36    let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();
37
38    if args.dump_spans {
39        for message in messages.iter() {
40            for span in &message.resource_spans {
41                println!("{:#?}", span);
42            }
43        }
44        return;
45    }
46
47    let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
48    let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();
49
50    let service_name = args.service_name.unwrap_or_else(|| {
51        let timestamp = std::time::SystemTime::now()
52            .duration_since(std::time::UNIX_EPOCH)
53            .unwrap()
54            .as_secs();
55        format!("sui-node-{}", timestamp)
56    });
57
58    println!("importing trace with service name {:?}", service_name);
59
60    for mut message in messages {
61        let mut span_count = 0;
62
63        // Rewrite the service name to separate the imported trace from other traces
64        for resource_span in message.resource_spans.iter_mut() {
65            for scope_span in resource_span.scope_spans.iter() {
66                span_count += scope_span.spans.len();
67            }
68
69            if let Some(resource) = resource_span.resource.as_mut() {
70                let mut service_name_found = false;
71                for attr in resource.attributes.iter_mut() {
72                    if attr.key == "service.name" {
73                        service_name_found = true;
74                        attr.value = Some(AnyValue {
75                            value: Some(any_value::Value::StringValue(service_name.clone())),
76                        });
77                    }
78                }
79                if !service_name_found {
80                    resource.attributes.push(KeyValue {
81                        key: "service.name".to_string(),
82                        value: Some(AnyValue {
83                            value: Some(any_value::Value::StringValue(service_name.clone())),
84                        }),
85                    });
86                }
87            }
88        }
89
90        println!("sending {} spans to otlp collector", span_count);
91        trace_exporter.export(Request::new(message)).await.unwrap();
92    }
93    println!("all spans imported");
94}
95
96fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
97where
98    R: Read,
99    M: Message + Default,
100{
101    let mut messages = Vec::new();
102    let mut buffer = Vec::new();
103    reader.read_to_end(&mut buffer)?;
104    let mut cursor = Cursor::new(buffer);
105
106    while cursor.has_remaining() {
107        let len = cursor.try_get_u64_varint().unwrap() as usize;
108
109        if cursor.remaining() < len {
110            return Err(io::Error::new(
111                io::ErrorKind::UnexpectedEof,
112                "Incomplete message",
113            ));
114        }
115
116        // Create a slice for just this message
117        let msg_bytes = cursor
118            .chunk()
119            .get(..len)
120            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;
121
122        let msg = M::decode(msg_bytes).map_err(|e| {
123            io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {}", e))
124        })?;
125        messages.push(msg);
126
127        // Advance the cursor
128        cursor.advance(len);
129    }
130
131    Ok(messages)
132}