sui_rpc_api/grpc/alpha/
list_authenticated_events.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9    AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; // 512KiB
20
21#[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23    stream_id: SuiAddress,
24    next_checkpoint: u64,
25    next_accumulator_version: u64,
26    next_transaction_idx: u32,
27    next_event_idx: u32,
28}
29
30fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
31    let mut bcs = Bcs::default();
32    bcs.value = Some(ev.contents.clone().into());
33
34    let mut event = Event::default();
35    event.package_id = Some(ev.package_id.to_canonical_string(true));
36    event.module = Some(ev.transaction_module.to_string());
37    event.sender = Some(ev.sender.to_string());
38    event.event_type = Some(ev.type_.to_canonical_string(true));
39    event.contents = Some(bcs);
40    event
41}
42
43fn to_authenticated_event(
44    stream_id: &str,
45    cp: u64,
46    accumulator_version: u64,
47    transaction_idx: u32,
48    idx: u32,
49    ev: &sui_types::event::Event,
50) -> AuthenticatedEvent {
51    let mut authenticated_event = AuthenticatedEvent::default();
52    authenticated_event.checkpoint = Some(cp);
53    authenticated_event.accumulator_version = Some(accumulator_version);
54    authenticated_event.transaction_idx = Some(transaction_idx);
55    authenticated_event.event_idx = Some(idx);
56    authenticated_event.event = Some(to_grpc_event(ev));
57    authenticated_event.stream_id = Some(stream_id.to_string());
58    authenticated_event
59}
60
61fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
62    bcs::from_bytes(page_token).map_err(|_| {
63        RpcError::new(
64            tonic::Code::InvalidArgument,
65            "invalid page_token".to_string(),
66        )
67    })
68}
69
70fn encode_page_token(page_token: PageToken) -> Bytes {
71    bcs::to_bytes(&page_token).unwrap().into()
72}
73
74#[tracing::instrument(skip(service))]
75pub fn list_authenticated_events(
76    service: &RpcService,
77    request: ListAuthenticatedEventsRequest,
78) -> Result<ListAuthenticatedEventsResponse, RpcError> {
79    if !service.config.authenticated_events_indexing() {
80        return Err(RpcError::new(
81            tonic::Code::Unimplemented,
82            "Authenticated events indexing is disabled".to_string(),
83        ));
84    }
85    let stream_id = request.stream_id.ok_or_else(|| {
86        RpcError::new(
87            tonic::Code::InvalidArgument,
88            "missing stream_id".to_string(),
89        )
90    })?;
91
92    if stream_id.trim().is_empty() {
93        return Err(RpcError::new(
94            tonic::Code::InvalidArgument,
95            "stream_id cannot be empty".to_string(),
96        ));
97    }
98
99    let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
100        RpcError::new(
101            tonic::Code::InvalidArgument,
102            format!("invalid stream_id: {e}"),
103        )
104    })?;
105
106    let page_size = request
107        .page_size
108        .map(|s| s.clamp(1, MAX_PAGE_SIZE))
109        .unwrap_or(DEFAULT_PAGE_SIZE);
110
111    let page_token = request
112        .page_token
113        .as_ref()
114        .map(|token| decode_page_token(token))
115        .transpose()?;
116
117    if let Some(token) = &page_token
118        && token.stream_id != stream_addr
119    {
120        return Err(RpcError::new(
121            tonic::Code::InvalidArgument,
122            "page_token stream_id mismatch".to_string(),
123        ));
124    }
125
126    let start = request.start_checkpoint.unwrap_or(0);
127
128    let reader = service.reader.inner();
129    let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
130
131    let highest_indexed = indexes
132        .get_highest_indexed_checkpoint_seq_number()
133        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
134        .unwrap_or(0);
135
136    let lowest_available = reader
137        .get_lowest_available_checkpoint_objects()
138        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
139
140    if start < lowest_available {
141        return Err(RpcError::new(
142            tonic::Code::InvalidArgument,
143            format!(
144                "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
145                start, lowest_available
146            ),
147        ));
148    }
149
150    if start > highest_indexed {
151        let mut response = ListAuthenticatedEventsResponse::default();
152        response.events = vec![];
153        response.highest_indexed_checkpoint = Some(highest_indexed);
154        response.next_page_token = None;
155        return Ok(response);
156    }
157
158    let (actual_start, start_accumulator_version, start_transaction_idx, start_event_idx) =
159        if let Some(token) = &page_token {
160            (
161                token.next_checkpoint,
162                Some(token.next_accumulator_version),
163                Some(token.next_transaction_idx),
164                Some(token.next_event_idx),
165            )
166        } else {
167            (start, None, None, None)
168        };
169
170    let iter = indexes
171        .authenticated_event_iter(
172            stream_addr,
173            actual_start,
174            start_accumulator_version,
175            start_transaction_idx,
176            start_event_idx,
177            highest_indexed,
178            page_size + 1,
179        )
180        .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
181
182    let mut events = Vec::new();
183    let mut size_bytes = 0;
184    let mut next_page_token = None;
185
186    for (i, event_result) in iter.enumerate() {
187        let (cp, accumulator_version, transaction_idx, event_idx, ev) =
188            event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
189
190        if i >= page_size as usize {
191            next_page_token = Some(encode_page_token(PageToken {
192                stream_id: stream_addr,
193                next_checkpoint: cp,
194                next_accumulator_version: accumulator_version,
195                next_transaction_idx: transaction_idx,
196                next_event_idx: event_idx,
197            }));
198            break;
199        }
200
201        let authenticated_event = to_authenticated_event(
202            &stream_id,
203            cp,
204            accumulator_version,
205            transaction_idx,
206            event_idx,
207            &ev,
208        );
209        let event_size = authenticated_event.encoded_len();
210
211        if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
212            next_page_token = Some(encode_page_token(PageToken {
213                stream_id: stream_addr,
214                next_checkpoint: cp,
215                next_accumulator_version: accumulator_version,
216                next_transaction_idx: transaction_idx,
217                next_event_idx: event_idx,
218            }));
219            break;
220        }
221
222        size_bytes += event_size;
223        events.push(authenticated_event);
224    }
225
226    let mut response = ListAuthenticatedEventsResponse::default();
227    response.events = events;
228    response.highest_indexed_checkpoint = Some(highest_indexed);
229    response.next_page_token = next_page_token.map(|t| t.into());
230    Ok(response)
231}