sui_rpc_api/grpc/alpha/
list_authenticated_events.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9    AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; // 512KiB
20
21#[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23    stream_id: SuiAddress,
24    next_checkpoint: u64,
25    next_transaction_idx: u32,
26    next_event_idx: u32,
27}
28
29fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
30    let mut bcs = Bcs::default();
31    bcs.value = Some(ev.contents.clone().into());
32
33    let mut event = Event::default();
34    event.package_id = Some(ev.package_id.to_canonical_string(true));
35    event.module = Some(ev.transaction_module.to_string());
36    event.sender = Some(ev.sender.to_string());
37    event.event_type = Some(ev.type_.to_canonical_string(true));
38    event.contents = Some(bcs);
39    event
40}
41
42fn to_authenticated_event(
43    stream_id: &str,
44    cp: u64,
45    transaction_idx: u32,
46    idx: u32,
47    ev: &sui_types::event::Event,
48) -> AuthenticatedEvent {
49    let mut authenticated_event = AuthenticatedEvent::default();
50    authenticated_event.checkpoint = Some(cp);
51    authenticated_event.transaction_idx = Some(transaction_idx);
52    authenticated_event.event_idx = Some(idx);
53    authenticated_event.event = Some(to_grpc_event(ev));
54    authenticated_event.stream_id = Some(stream_id.to_string());
55    authenticated_event
56}
57
58fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
59    bcs::from_bytes(page_token).map_err(|_| {
60        RpcError::new(
61            tonic::Code::InvalidArgument,
62            "invalid page_token".to_string(),
63        )
64    })
65}
66
67fn encode_page_token(page_token: PageToken) -> Bytes {
68    bcs::to_bytes(&page_token).unwrap().into()
69}
70
71#[tracing::instrument(skip(service))]
72pub fn list_authenticated_events(
73    service: &RpcService,
74    request: ListAuthenticatedEventsRequest,
75) -> Result<ListAuthenticatedEventsResponse, RpcError> {
76    if !service.config.authenticated_events_indexing() {
77        return Err(RpcError::new(
78            tonic::Code::Unimplemented,
79            "Authenticated events indexing is disabled".to_string(),
80        ));
81    }
82    let stream_id = request.stream_id.ok_or_else(|| {
83        RpcError::new(
84            tonic::Code::InvalidArgument,
85            "missing stream_id".to_string(),
86        )
87    })?;
88
89    if stream_id.trim().is_empty() {
90        return Err(RpcError::new(
91            tonic::Code::InvalidArgument,
92            "stream_id cannot be empty".to_string(),
93        ));
94    }
95
96    let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
97        RpcError::new(
98            tonic::Code::InvalidArgument,
99            format!("invalid stream_id: {e}"),
100        )
101    })?;
102
103    let page_size = request
104        .page_size
105        .map(|s| s.clamp(1, MAX_PAGE_SIZE))
106        .unwrap_or(DEFAULT_PAGE_SIZE);
107
108    let page_token = request
109        .page_token
110        .as_ref()
111        .map(|token| decode_page_token(token))
112        .transpose()?;
113
114    if let Some(token) = &page_token
115        && token.stream_id != stream_addr
116    {
117        return Err(RpcError::new(
118            tonic::Code::InvalidArgument,
119            "page_token stream_id mismatch".to_string(),
120        ));
121    }
122
123    let start = request.start_checkpoint.unwrap_or(0);
124
125    let reader = service.reader.inner();
126    let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
127
128    let highest_indexed = indexes
129        .get_highest_indexed_checkpoint_seq_number()
130        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
131        .unwrap_or(0);
132
133    let lowest_available = reader
134        .get_lowest_available_checkpoint_objects()
135        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
136
137    if start < lowest_available {
138        return Err(RpcError::new(
139            tonic::Code::InvalidArgument,
140            format!(
141                "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
142                start, lowest_available
143            ),
144        ));
145    }
146
147    if start > highest_indexed {
148        let mut response = ListAuthenticatedEventsResponse::default();
149        response.events = vec![];
150        response.highest_indexed_checkpoint = Some(highest_indexed);
151        response.next_page_token = None;
152        return Ok(response);
153    }
154
155    let (actual_start, start_transaction_idx, start_event_idx) = if let Some(token) = &page_token {
156        (
157            token.next_checkpoint,
158            Some(token.next_transaction_idx),
159            Some(token.next_event_idx),
160        )
161    } else {
162        (start, None, None)
163    };
164
165    let iter = indexes
166        .authenticated_event_iter(
167            stream_addr,
168            actual_start,
169            start_transaction_idx,
170            start_event_idx,
171            highest_indexed,
172            page_size + 1,
173        )
174        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
175
176    let mut events = Vec::new();
177    let mut size_bytes = 0;
178    let mut next_page_token = None;
179
180    for (i, event_result) in iter.enumerate() {
181        let (cp, transaction_idx, event_idx, ev) =
182            event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
183
184        if i >= page_size as usize {
185            next_page_token = Some(encode_page_token(PageToken {
186                stream_id: stream_addr,
187                next_checkpoint: cp,
188                next_transaction_idx: transaction_idx,
189                next_event_idx: event_idx,
190            }));
191            break;
192        }
193
194        let authenticated_event =
195            to_authenticated_event(&stream_id, cp, transaction_idx, event_idx, &ev);
196        let event_size = authenticated_event.encoded_len();
197
198        if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
199            next_page_token = Some(encode_page_token(PageToken {
200                stream_id: stream_addr,
201                next_checkpoint: cp,
202                next_transaction_idx: transaction_idx,
203                next_event_idx: event_idx,
204            }));
205            break;
206        }
207
208        size_bytes += event_size;
209        events.push(authenticated_event);
210    }
211
212    let mut response = ListAuthenticatedEventsResponse::default();
213    response.events = events;
214    response.highest_indexed_checkpoint = Some(highest_indexed);
215    response.next_page_token = next_page_token.map(|t| t.into());
216    Ok(response)
217}