sui_rpc_api/grpc/alpha/
list_authenticated_events.rs1#![allow(clippy::field_reassign_with_default)]
5
6use crate::RpcError;
7use crate::RpcService;
8use crate::grpc::alpha::event_service_proto::{
9 AuthenticatedEvent, ListAuthenticatedEventsRequest, ListAuthenticatedEventsResponse,
10};
11use bytes::Bytes;
12use prost::Message;
13use std::str::FromStr;
14use sui_rpc::proto::sui::rpc::v2::{Bcs, Event};
15use sui_types::base_types::SuiAddress;
16
17const MAX_PAGE_SIZE: u32 = 1000;
18const DEFAULT_PAGE_SIZE: u32 = 1000;
19const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; #[derive(serde::Serialize, serde::Deserialize)]
22struct PageToken {
23 stream_id: SuiAddress,
24 next_checkpoint: u64,
25 next_transaction_idx: u32,
26 next_event_idx: u32,
27}
28
29fn to_grpc_event(ev: &sui_types::event::Event) -> Event {
30 let mut bcs = Bcs::default();
31 bcs.value = Some(ev.contents.clone().into());
32
33 let mut event = Event::default();
34 event.package_id = Some(ev.package_id.to_canonical_string(true));
35 event.module = Some(ev.transaction_module.to_string());
36 event.sender = Some(ev.sender.to_string());
37 event.event_type = Some(ev.type_.to_canonical_string(true));
38 event.contents = Some(bcs);
39 event
40}
41
42fn to_authenticated_event(
43 stream_id: &str,
44 cp: u64,
45 transaction_idx: u32,
46 idx: u32,
47 ev: &sui_types::event::Event,
48) -> AuthenticatedEvent {
49 let mut authenticated_event = AuthenticatedEvent::default();
50 authenticated_event.checkpoint = Some(cp);
51 authenticated_event.transaction_idx = Some(transaction_idx);
52 authenticated_event.event_idx = Some(idx);
53 authenticated_event.event = Some(to_grpc_event(ev));
54 authenticated_event.stream_id = Some(stream_id.to_string());
55 authenticated_event
56}
57
58fn decode_page_token(page_token: &[u8]) -> Result<PageToken, RpcError> {
59 bcs::from_bytes(page_token).map_err(|_| {
60 RpcError::new(
61 tonic::Code::InvalidArgument,
62 "invalid page_token".to_string(),
63 )
64 })
65}
66
67fn encode_page_token(page_token: PageToken) -> Bytes {
68 bcs::to_bytes(&page_token).unwrap().into()
69}
70
71#[tracing::instrument(skip(service))]
72pub fn list_authenticated_events(
73 service: &RpcService,
74 request: ListAuthenticatedEventsRequest,
75) -> Result<ListAuthenticatedEventsResponse, RpcError> {
76 if !service.config.authenticated_events_indexing() {
77 return Err(RpcError::new(
78 tonic::Code::Unimplemented,
79 "Authenticated events indexing is disabled".to_string(),
80 ));
81 }
82 let stream_id = request.stream_id.ok_or_else(|| {
83 RpcError::new(
84 tonic::Code::InvalidArgument,
85 "missing stream_id".to_string(),
86 )
87 })?;
88
89 if stream_id.trim().is_empty() {
90 return Err(RpcError::new(
91 tonic::Code::InvalidArgument,
92 "stream_id cannot be empty".to_string(),
93 ));
94 }
95
96 let stream_addr = SuiAddress::from_str(&stream_id).map_err(|e| {
97 RpcError::new(
98 tonic::Code::InvalidArgument,
99 format!("invalid stream_id: {e}"),
100 )
101 })?;
102
103 let page_size = request
104 .page_size
105 .map(|s| s.clamp(1, MAX_PAGE_SIZE))
106 .unwrap_or(DEFAULT_PAGE_SIZE);
107
108 let page_token = request
109 .page_token
110 .as_ref()
111 .map(|token| decode_page_token(token))
112 .transpose()?;
113
114 if let Some(token) = &page_token
115 && token.stream_id != stream_addr
116 {
117 return Err(RpcError::new(
118 tonic::Code::InvalidArgument,
119 "page_token stream_id mismatch".to_string(),
120 ));
121 }
122
123 let start = request.start_checkpoint.unwrap_or(0);
124
125 let reader = service.reader.inner();
126 let indexes = reader.indexes().ok_or_else(RpcError::not_found)?;
127
128 let highest_indexed = indexes
129 .get_highest_indexed_checkpoint_seq_number()
130 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?
131 .unwrap_or(0);
132
133 let lowest_available = reader
134 .get_lowest_available_checkpoint_objects()
135 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
136
137 if start < lowest_available {
138 return Err(RpcError::new(
139 tonic::Code::InvalidArgument,
140 format!(
141 "Requested start checkpoint {} has been pruned. Lowest available checkpoint is {}",
142 start, lowest_available
143 ),
144 ));
145 }
146
147 if start > highest_indexed {
148 let mut response = ListAuthenticatedEventsResponse::default();
149 response.events = vec![];
150 response.highest_indexed_checkpoint = Some(highest_indexed);
151 response.next_page_token = None;
152 return Ok(response);
153 }
154
155 let (actual_start, start_transaction_idx, start_event_idx) = if let Some(token) = &page_token {
156 (
157 token.next_checkpoint,
158 Some(token.next_transaction_idx),
159 Some(token.next_event_idx),
160 )
161 } else {
162 (start, None, None)
163 };
164
165 let iter = indexes
166 .authenticated_event_iter(
167 stream_addr,
168 actual_start,
169 start_transaction_idx,
170 start_event_idx,
171 highest_indexed,
172 page_size + 1,
173 )
174 .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
175
176 let mut events = Vec::new();
177 let mut size_bytes = 0;
178 let mut next_page_token = None;
179
180 for (i, event_result) in iter.enumerate() {
181 let (cp, transaction_idx, event_idx, ev) =
182 event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?;
183
184 if i >= page_size as usize {
185 next_page_token = Some(encode_page_token(PageToken {
186 stream_id: stream_addr,
187 next_checkpoint: cp,
188 next_transaction_idx: transaction_idx,
189 next_event_idx: event_idx,
190 }));
191 break;
192 }
193
194 let authenticated_event =
195 to_authenticated_event(&stream_id, cp, transaction_idx, event_idx, &ev);
196 let event_size = authenticated_event.encoded_len();
197
198 if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES {
199 next_page_token = Some(encode_page_token(PageToken {
200 stream_id: stream_addr,
201 next_checkpoint: cp,
202 next_transaction_idx: transaction_idx,
203 next_event_idx: event_idx,
204 }));
205 break;
206 }
207
208 size_bytes += event_size;
209 events.push(authenticated_event);
210 }
211
212 let mut response = ListAuthenticatedEventsResponse::default();
213 response.events = events;
214 response.highest_indexed_checkpoint = Some(highest_indexed);
215 response.next_page_token = next_page_token.map(|t| t.into());
216 Ok(response)
217}