import_trace/
import-trace.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use bytes::Buf;
use bytes_varint::VarIntSupport;
use clap::*;
use opentelemetry_proto::tonic::{
    collector::trace::v1::{trace_service_client::TraceServiceClient, ExportTraceServiceRequest},
    common::v1::{any_value, AnyValue, KeyValue},
};
use prost::Message;
use std::io::{self, Cursor, Read};
use tonic::Request;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    #[arg(long)]
    trace_file: String,

    #[arg(long, default_value = "http://localhost:4317")]
    otlp_endpoint: String,

    #[arg(long)]
    service_name: Option<String>,

    #[arg(long)]
    dump_spans: bool,
}

#[tokio::main]
async fn main() {
    let args = Args::parse();
    let file = std::fs::File::open(args.trace_file).unwrap();

    let messages = decode_all_length_delimited::<_, ExportTraceServiceRequest>(file).unwrap();

    if args.dump_spans {
        for message in messages.iter() {
            for span in &message.resource_spans {
                println!("{:#?}", span);
            }
        }
        return;
    }

    let endpoint = format!("{}{}", args.otlp_endpoint, "/v1/traces");
    let mut trace_exporter = TraceServiceClient::connect(endpoint).await.unwrap();

    let service_name = args.service_name.unwrap_or_else(|| {
        let timestamp = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        format!("sui-node-{}", timestamp)
    });

    println!("importing trace with service name {:?}", service_name);

    for mut message in messages {
        let mut span_count = 0;

        // Rewrite the service name to separate the imported trace from other traces
        for resource_span in message.resource_spans.iter_mut() {
            for scope_span in resource_span.scope_spans.iter() {
                span_count += scope_span.spans.len();
            }

            if let Some(resource) = resource_span.resource.as_mut() {
                let mut service_name_found = false;
                for attr in resource.attributes.iter_mut() {
                    if attr.key == "service.name" {
                        service_name_found = true;
                        attr.value = Some(AnyValue {
                            value: Some(any_value::Value::StringValue(service_name.clone())),
                        });
                    }
                }
                if !service_name_found {
                    resource.attributes.push(KeyValue {
                        key: "service.name".to_string(),
                        value: Some(AnyValue {
                            value: Some(any_value::Value::StringValue(service_name.clone())),
                        }),
                    });
                }
            }
        }

        println!("sending {} spans to otlp collector", span_count);
        trace_exporter.export(Request::new(message)).await.unwrap();
    }
    println!("all spans imported");
}

fn decode_all_length_delimited<R, M>(mut reader: R) -> io::Result<Vec<M>>
where
    R: Read,
    M: Message + Default,
{
    let mut messages = Vec::new();
    let mut buffer = Vec::new();
    reader.read_to_end(&mut buffer)?;
    let mut cursor = Cursor::new(buffer);

    while cursor.has_remaining() {
        let len = cursor.get_u64_varint().unwrap() as usize;

        if cursor.remaining() < len {
            return Err(io::Error::new(
                io::ErrorKind::UnexpectedEof,
                "Incomplete message",
            ));
        }

        // Create a slice for just this message
        let msg_bytes = cursor
            .chunk()
            .get(..len)
            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Buffer underflow"))?;

        let msg = M::decode(msg_bytes).map_err(|e| {
            io::Error::new(io::ErrorKind::InvalidData, format!("Decode error: {}", e))
        })?;
        messages.push(msg);

        // Advance the cursor
        cursor.advance(len);
    }

    Ok(messages)
}