sui_rpc_api/grpc/alpha/
list_authenticated_events.rs1#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9 AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; #[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23 stream_id: SuiAddress,
24 start_checkpoint: u64,
25 last_event_checkpoint: u64,
26 last_event_transaction_idx: u32,
27 last_event_index: u32,
28}
29
30fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
31 let mut bcs = Bcs::default();
32 bcs.value = Some(ev.contents.clone().into());
33
34 let mut event = Event::default();
35 event.package_id = Some(ev.package_id.to_canonical_string(true));
36 event.module = Some(ev.transaction_module.to_string());
37 event.sender = Some(ev.sender.to_string());
38 event.event_type = Some(ev.type_.to_canonical_string(true));
39 event.contents = Some(bcs);
40 event
41}
42
43fn to_authenticated_event(
44 stream_id: &str,
45 cp: u64,
46 transaction_idx: u32,
47 idx: u32,
48 ev: &sui_types::event::Event,
49) -> AuthenticatedEvent {
50 let mut authenticated_event = AuthenticatedEvent::default();
51 authenticated_event.checkpoint = Some(cp);
52 authenticated_event.transaction_idx = Some(transaction_idx);
53 authenticated_event.event_idx = Some(idx);
54 authenticated_event.event = Some(to_grpc_event(ev));
55 authenticated_event.stream_id = Some(stream_id.to_string());
56 authenticated_event
57}
58
59fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
60 bcs::from_bytes(page_token).map_err(|_| {
61 RpcError::new(
62 tonic::Code::InvalidArgument,
63 "invalid page_token".to_string(),
64 )
65 })
66}
67
68fn encode_page_token(page_token: PageToken) -> Bytes {
69 bcs::to_bytes(&page_token).unwrap().into()
70}
71
72#[tracing::instrument(skip(service))]
73pub fn list_authenticated_events(
74 service: &RpcService,
75 request: ListAuthenticatedEventsRequest,
76) -> Result<ListAuthenticatedEventsResponse, RpcError> {
77 if !service.config.authenticated_events_indexing() {
78 return Err(RpcError::new(
79 tonic::Code::Unimplemented,
80 "Authenticated events indexing is disabled".to_string(),
81 ));
82 }
83 let stream_id = request.stream_id.ok_or_else(|| {
84 RpcError::new(
85 tonic::Code::InvalidArgument,
86 "missing stream_id".to_string(),
87 )
88 })?;
89
90 if stream_id.trim().is_empty() {
91 return Err(RpcError::new(
92 tonic::Code::InvalidArgument,
93 "stream_id cannot be empty".to_string(),
94 ));
95 }
96
97 let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
98 RpcError::new(
99 tonic::Code::InvalidArgument,
100 format!("invalid stream_id: {e}"),
101 )
102 })?;
103
104 let page_size = request
105 .page_size
106 .map(|s| s.clamp(1, MAX_PAGE_SIZE))
107 .unwrap_or(DEFAULT_PAGE_SIZE);
108
109 let page_token = request
110 .page_token
111 .as_ref()
112 .map(|token| decode_page_token(token))
113 .transpose()?;
114
115 if let Some(token) = &page_token
116 && token.stream_id != stream_addr
117 {
118 return Err(RpcError::new(
119 tonic::Code::InvalidArgument,
120 "page_token stream_id mismatch".to_string(),
121 ));
122 }
123
124 let start = request.start_checkpoint.unwrap_or(0);
125
126 let reader = service.reader.inner();
127 let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
128
129 let highest_indexed = indexes
130 .get_highest_indexed_checkpoint_seq_number()
131 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
132 .unwrap_or(0);
133
134 let lowest_available = reader
135 .get_lowest_available_checkpoint_objects()
136 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
137
138 if start < lowest_available {
139 return Err(RpcError::new(
140 tonic::Code::InvalidArgument,
141 format!(
142 "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
143 start, lowest_available
144 ),
145 ));
146 }
147
148 if start > highest_indexed {
149 let mut response = ListAuthenticatedEventsResponse::default();
150 response.events = vec![];
151 response.highest_indexed_checkpoint = Some(highest_indexed);
152 response.next_page_token = None;
153 return Ok(response);
154 }
155
156 let start_transaction_idx = page_token.as_ref().map(|t| t.last_event_transaction_idx);
157 let start_event_idx = page_token.as_ref().map(|t| t.last_event_index);
158
159 let iter = indexes
160 .authenticated_event_iter(
161 stream_addr,
162 start,
163 start_transaction_idx,
164 start_event_idx,
165 highest_indexed,
166 page_size,
167 )
168 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
169
170 let mut events = Vec::new();
171 let mut size_bytes = 0;
172 let mut events_processed: u32 = 0;
173 let mut last_event_info: Option<(u64, u32, u32)> = None;
174
175 for event_result in iter {
176 let (cp, transaction_idx, event_idx, ev) =
177 event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
178
179 if size_bytes >= MAX_PAGE_SIZE_BYTES {
181 break;
182 }
183
184 let authenticated_event =
185 to_authenticated_event(&stream_id, cp, transaction_idx, event_idx, &ev);
186 size_bytes += authenticated_event.encoded_len();
187 events.push(authenticated_event);
188 last_event_info = Some((cp, transaction_idx, event_idx));
189 events_processed += 1;
190 }
191
192 let next_page_token = if events_processed == page_size {
193 last_event_info.map(|(last_cp, last_tx_idx, last_ev_idx)| {
194 encode_page_token(PageToken {
195 stream_id: stream_addr,
196 start_checkpoint: start,
197 last_event_checkpoint: last_cp,
198 last_event_transaction_idx: last_tx_idx,
199 last_event_index: last_ev_idx,
200 })
201 })
202 } else {
203 None
204 };
205
206 let mut response = ListAuthenticatedEventsResponse::default();
207 response.events = events;
208 response.highest_indexed_checkpoint = Some(highest_indexed);
209 response.next_page_token = next_page_token.map(|token| token.to_vec());
210 Ok(response)
211}