sui_rpc/light_client/events/state.rs
1//! Pure verification helpers for the streaming client.
2//!
3//! Factored out of the spawned task so they can be unit-tested without
4//! standing up an RPC harness. The streaming task in `client` calls
5//! [`buffer_response_batch`] for each incoming page of events and
6//! [`fold_and_reconcile`] each reconciliation tick — passing in the
7//! settlement boundaries fetched from `ListTransactions(affected_object =
8//! event_stream_head)` so events are folded into the local MMR in the
9//! same per-settlement partitions the framework used on chain.
10
11use std::collections::VecDeque;
12
13use sui_sdk_types::Object;
14use sui_sdk_types::framework::EventBatch;
15use sui_sdk_types::framework::EventCommitment;
16use sui_sdk_types::framework::EventStreamHead;
17use sui_sdk_types::framework::apply_stream_updates;
18
19use super::envelope::AuthenticatedEvent;
20use crate::light_client::error::LightClientError;
21use crate::light_client::error::MmrMismatch;
22
23/// In-memory state the streaming task carries across iterations of the
24/// list-events / reconcile loop.
25///
26/// Events are buffered (not folded) as `ListEvents` pages arrive: the
27/// framework can run multiple `settle_events` transactions per
28/// checkpoint (one per consensus commit per stream), and folding events
29/// into the wrong settlement bucket produces an MMR shape that will
30/// never match the on-chain head. The buffer is flushed at
31/// reconciliation time, once `ListTransactions(affected_object =
32/// event_stream_head)` has surfaced the per-settlement boundaries.
33#[derive(Debug, Default)]
34pub(super) struct StreamState {
35 /// Events received from `ListEvents` but not yet folded into
36 /// [`Self::local_head`]. Once a reconciliation tick succeeds, every
37 /// entry with `checkpoint <= reconciled_checkpoint` is folded into
38 /// the local MMR, released to the consumer, and drained from this
39 /// queue.
40 ///
41 /// Items are stored in receipt order — which matches strictly
42 /// ascending `(checkpoint, transaction_offset, event_index)` per the
43 /// v2alpha contract.
44 pub buffer: VecDeque<AuthenticatedEvent>,
45
46 /// The MMR head we've replayed by folding the settlement-partitioned
47 /// event batches. Compared byte-for-byte against the on-chain head
48 /// during reconciliation.
49 pub local_head: EventStreamHead,
50
51 /// The most recent checkpoint at which `local_head` was confirmed
52 /// to match the on-chain head. Subsequent reconciliations only need
53 /// to fold events landed strictly after this point.
54 pub confirmed_through: u64,
55
56 /// Highest checkpoint number whose events `ListEvents` has fully
57 /// emitted to us, derived from the latest `Watermark.checkpoint_hi`
58 /// across both standalone watermarks and per-item watermarks.
59 ///
60 /// Reconciliation uses this as the upper bound when fetching
61 /// settlements: settlements at checkpoints we haven't fully scanned
62 /// for events would otherwise produce an empty fold (with the head
63 /// stamping a different `num_events`) and fail reconciliation.
64 pub events_scanned_through: u64,
65}
66
67impl StreamState {
68 pub(super) fn new(initial_head: EventStreamHead, confirmed_through: u64) -> Self {
69 Self {
70 buffer: VecDeque::new(),
71 local_head: initial_head,
72 confirmed_through,
73 events_scanned_through: confirmed_through,
74 }
75 }
76}
77
78/// Append `events` to `state.buffer` and advance
79/// `state.events_scanned_through` to `watermark_hi.max(events.last())` if
80/// either is greater than the current value.
81///
82/// `events` must be in strictly ascending
83/// `(checkpoint, transaction_offset, event_index)` order with respect to
84/// each other and to whatever's already at the back of `state.buffer` —
85/// the v2alpha contract guarantees this for `ListEvents` responses, and
86/// the fold-time partitioner relies on it. Empty input is a no-op.
87///
88/// `watermark_hi` is the most recent `Watermark.checkpoint_hi` observed
89/// during this page, including the watermark embedded on the last item.
90/// Pass `None` when the page produced neither a standalone watermark nor
91/// an item-embedded one (e.g., an immediate `End` frame at genesis).
92pub(super) fn buffer_response_batch(
93 state: &mut StreamState,
94 events: Vec<AuthenticatedEvent>,
95 watermark_hi: Option<u64>,
96) {
97 let last_event_cp = events.last().map(|e| e.checkpoint);
98 for event in events {
99 state.buffer.push_back(event);
100 }
101
102 if let Some(hi) = watermark_hi {
103 state.events_scanned_through = state.events_scanned_through.max(hi);
104 }
105 if let Some(cp) = last_event_cp {
106 // Defensive: even if no watermark frame surfaced for this page,
107 // every emitted event implies the scan reached its checkpoint.
108 state.events_scanned_through = state.events_scanned_through.max(cp);
109 }
110}
111
112/// Partition the events at the front of `state.buffer` by settlement,
113/// fold each partition into `state.local_head`, and reconcile the
114/// updated head against `chain_head` at the last settlement's
115/// checkpoint.
116///
117/// `settlements` is the ascending sequence of `(checkpoint,
118/// transaction_offset)` pairs from `ListTransactions(affected_object =
119/// event_stream_head)`, covering `(state.confirmed_through,
120/// reconcile_checkpoint]`. `reconcile_checkpoint` is the checkpoint at
121/// which `chain_head` was authenticated — by construction the
122/// checkpoint of the last entry in `settlements`.
123///
124/// Each event in the partitioned prefix is assigned to the next
125/// settlement whose `(cp, tx_offset)` lies at-or-after the event's
126/// position, and a settlement reject is a hard error (it indicates the
127/// stream's `ListEvents` and `ListTransactions` indexes have diverged).
128///
129/// On a matching reconciliation: drains the folded events from
130/// `state.buffer`, advances `state.confirmed_through` to
131/// `reconcile_checkpoint`, and returns the released events in receipt
132/// order. On any mismatch: returns the error without mutating the
133/// observable parts of `state` — the caller should abort the stream.
134pub(super) fn fold_and_reconcile(
135 state: &mut StreamState,
136 settlements: &[(u64, u64)],
137 chain_head: EventStreamHead,
138 reconcile_checkpoint: u64,
139) -> Result<Vec<AuthenticatedEvent>, LightClientError> {
140 // Settlements must be non-empty and the last one's checkpoint must
141 // equal the reconciliation point — that's the load-bearing invariant
142 // the caller is contracted to uphold.
143 let last_settlement =
144 settlements
145 .last()
146 .copied()
147 .ok_or(LightClientError::UnexpectedObjectShape {
148 reason: "fold_and_reconcile called with no settlements; \
149 reconciliation needs at least one settlement to anchor the chain head",
150 })?;
151 if last_settlement.0 != reconcile_checkpoint {
152 return Err(LightClientError::UnexpectedObjectShape {
153 reason: "settlement boundaries disagree with reconciliation checkpoint",
154 });
155 }
156
157 // Count the buffered events whose checkpoint lies at or below the
158 // reconciliation point — these are the ones we'll partition and
159 // fold. Anything past `reconcile_checkpoint` stays buffered for the
160 // next round.
161 let fold_count = state
162 .buffer
163 .iter()
164 .take_while(|e| e.checkpoint <= reconcile_checkpoint)
165 .count();
166
167 let batches = bucket_events_by_settlement(
168 state.buffer.iter().take(fold_count),
169 settlements,
170 state.confirmed_through,
171 )?;
172
173 let new_head =
174 apply_stream_updates(state.local_head.clone(), &batches).map_err(LightClientError::from)?;
175
176 if new_head != chain_head {
177 return Err(LightClientError::MmrMismatch(Box::new(MmrMismatch {
178 checkpoint: reconcile_checkpoint,
179 expected: chain_head,
180 actual: new_head,
181 })));
182 }
183
184 // Reconciliation succeeded: commit the fold and drain the released
185 // events.
186 state.local_head = new_head;
187 state.confirmed_through = reconcile_checkpoint;
188 let released: Vec<AuthenticatedEvent> = state.buffer.drain(..fold_count).collect();
189 Ok(released)
190}
191
192/// Bucket `events` into per-settlement [`EventBatch`]es using the
193/// ascending `settlements` boundary list. Returns an empty `Vec` only if
194/// `events` is empty after the `floor_checkpoint` filter — every kept
195/// event must map to a settlement at its own checkpoint, or the function
196/// errors.
197///
198/// Mirrors the upstream e2e helper that drives the on-chain
199/// reconciliation: each event maps to the first settlement with `(cp ==
200/// event.cp, tx_offset >= event.tx_offset)`, and events sharing a
201/// settlement key form a single batch.
202fn bucket_events_by_settlement<'a, I>(
203 events: I,
204 settlements: &[(u64, u64)],
205 floor_checkpoint: u64,
206) -> Result<Vec<EventBatch>, LightClientError>
207where
208 I: IntoIterator<Item = &'a AuthenticatedEvent>,
209{
210 let mut batches: Vec<EventBatch> = Vec::new();
211 let mut current_key: Option<(u64, u64)> = None;
212 let mut settlement_idx: usize = 0;
213
214 for event in events {
215 // Skip events that fall at-or-below the floor — those have
216 // already been folded into `state.local_head` by an earlier
217 // reconciliation (or are part of the initial-state head).
218 if event.checkpoint <= floor_checkpoint {
219 continue;
220 }
221
222 // Advance to the first settlement at-or-past the event's
223 // ledger position.
224 while settlement_idx < settlements.len()
225 && (settlements[settlement_idx].0 < event.checkpoint
226 || (settlements[settlement_idx].0 == event.checkpoint
227 && settlements[settlement_idx].1 < event.transaction_offset))
228 {
229 settlement_idx += 1;
230 }
231
232 if settlement_idx >= settlements.len() || settlements[settlement_idx].0 != event.checkpoint
233 {
234 // The framework guarantees a `settle_events` transaction
235 // for every checkpoint that emitted events on this stream;
236 // a missing settlement means the server's event index and
237 // transaction index have diverged.
238 return Err(LightClientError::UnexpectedObjectShape {
239 reason: "no settlement transaction covering buffered event",
240 });
241 }
242
243 let settlement_key = settlements[settlement_idx];
244 let commitment = EventCommitment {
245 checkpoint_seq: event.checkpoint,
246 transaction_idx: event.transaction_offset,
247 event_idx: event.event_index as u64,
248 digest: event.event.digest(),
249 };
250
251 match current_key {
252 Some(key) if key == settlement_key => {
253 batches
254 .last_mut()
255 .expect("current_key set ⇒ at least one batch exists")
256 .commitments
257 .push(commitment);
258 }
259 _ => {
260 batches.push(EventBatch {
261 checkpoint_seq: event.checkpoint,
262 commitments: vec![commitment],
263 });
264 current_key = Some(settlement_key);
265 }
266 }
267 }
268
269 Ok(batches)
270}
271
272/// Extract the BCS-encoded [`EventStreamHead`] out of an on-chain
273/// `Object` returned by the OCS inclusion proof flow.
274///
275/// The framework stores each stream head as the value of a dynamic
276/// field on the accumulator root object. The Move-side type is
277/// `sui::dynamic_field::Field<sui::accumulator::Key<EventStreamHead>,
278/// EventStreamHead>`, which BCS-serializes as:
279///
280/// ```text
281/// field = uid key value
282///
283/// uid = address ; 32 bytes (Field.id)
284/// key = address ; 32 bytes (AccumulatorKey { owner: address })
285/// value = bytes-of-EventStreamHead
286/// ```
287///
288/// Both the UID and the single-field-struct `AccumulatorKey` are
289/// fixed 32-byte addresses, so the head's BCS bytes start at offset
290/// 64. We BCS-decode just the tail rather than parsing through the
291/// dynamic-field wrapper to keep this independent of any generic
292/// `Field<K, V>` representation in the SDK.
293pub(super) fn extract_event_stream_head(
294 object: &Object,
295) -> Result<EventStreamHead, LightClientError> {
296 const HEAD_OFFSET: usize = 64;
297
298 let move_struct = object
299 .as_struct()
300 .ok_or(LightClientError::UnexpectedObjectShape {
301 reason: "expected a Move struct (dynamic field), got a package",
302 })?;
303 let contents = move_struct.contents();
304 let head_bytes =
305 contents
306 .get(HEAD_OFFSET..)
307 .ok_or(LightClientError::UnexpectedObjectShape {
308 reason: "dynamic-field contents too short for EventStreamHead value",
309 })?;
310 bcs::from_bytes(head_bytes).map_err(LightClientError::from)
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use sui_sdk_types::Address;
317 use sui_sdk_types::Digest;
318 use sui_sdk_types::Event;
319 use sui_sdk_types::Identifier;
320 use sui_sdk_types::StructTag;
321 use sui_sdk_types::U256;
322
323 fn sample_event(checkpoint: u64, tx_idx: u64, event_idx: u32) -> AuthenticatedEvent {
324 AuthenticatedEvent {
325 checkpoint,
326 transaction_offset: tx_idx,
327 event_index: event_idx,
328 transaction_digest: Digest::new([0xaa; 32]),
329 event: Event {
330 package_id: Address::TWO,
331 module: Identifier::from_static("m"),
332 sender: Address::TWO,
333 type_: StructTag::new(
334 Address::TWO,
335 Identifier::from_static("m"),
336 Identifier::from_static("E"),
337 vec![],
338 ),
339 contents: vec![checkpoint as u8, tx_idx as u8, event_idx as u8],
340 },
341 }
342 }
343
344 /// Build the chain-head state expected after folding `events`
345 /// bucketed against `settlements`. Mirrors what the test cluster
346 /// would produce on chain so reconciliation calls can be exercised
347 /// against an authoritative reference.
348 fn expected_chain_head(
349 events: &[AuthenticatedEvent],
350 settlements: &[(u64, u64)],
351 ) -> EventStreamHead {
352 let batches = bucket_events_by_settlement(events.iter(), settlements, 0).unwrap();
353 apply_stream_updates(EventStreamHead::default(), &batches).unwrap()
354 }
355
356 fn u256_from_decimal(s: &str) -> U256 {
357 s.parse().expect("decimal U256 literal must parse")
358 }
359
360 /// `buffer_response_batch` only buffers — folding is deferred to
361 /// `fold_and_reconcile`. The local head must not move just from
362 /// receiving events.
363 #[test]
364 fn buffer_response_batch_defers_folding() {
365 let mut state = StreamState::new(EventStreamHead::default(), 0);
366 let events = vec![sample_event(7, 0, 0), sample_event(7, 0, 1)];
367
368 buffer_response_batch(&mut state, events, Some(7));
369
370 assert_eq!(state.local_head, EventStreamHead::default());
371 assert_eq!(state.buffer.len(), 2);
372 assert_eq!(state.events_scanned_through, 7);
373 }
374
375 /// Events arriving without a watermark still bump
376 /// `events_scanned_through` to at least the last event's
377 /// checkpoint — otherwise reconciliation would never see those
378 /// checkpoints as "scanned" and skip them when fetching
379 /// settlements.
380 #[test]
381 fn buffer_response_batch_advances_scan_floor_from_events() {
382 let mut state = StreamState::new(EventStreamHead::default(), 0);
383 buffer_response_batch(&mut state, vec![sample_event(11, 0, 0)], None);
384 assert_eq!(state.events_scanned_through, 11);
385 }
386
387 /// `events_scanned_through` is monotonic — a smaller watermark
388 /// (e.g., a retry that re-emits earlier items) does not regress it.
389 #[test]
390 fn buffer_response_batch_does_not_regress_scan_floor() {
391 let mut state = StreamState::new(EventStreamHead::default(), 0);
392 state.events_scanned_through = 20;
393 buffer_response_batch(&mut state, vec![sample_event(5, 0, 0)], Some(5));
394 assert_eq!(state.events_scanned_through, 20);
395 }
396
397 /// Single settlement per checkpoint — the legacy case the previous
398 /// implementation also supported. All events at one checkpoint
399 /// fold into a single batch and the chain head matches.
400 #[test]
401 fn fold_and_reconcile_single_settlement_per_checkpoint() {
402 let events = vec![
403 sample_event(7, 0, 0),
404 sample_event(7, 0, 1),
405 sample_event(7, 1, 0),
406 ];
407 let settlements = vec![(7u64, 2u64)];
408
409 let chain_head = expected_chain_head(&events, &settlements);
410
411 let mut state = StreamState::new(EventStreamHead::default(), 0);
412 buffer_response_batch(&mut state, events.clone(), Some(7));
413
414 let released = fold_and_reconcile(&mut state, &settlements, chain_head.clone(), 7).unwrap();
415
416 assert_eq!(released, events);
417 assert_eq!(state.local_head, chain_head);
418 assert_eq!(state.confirmed_through, 7);
419 assert!(state.buffer.is_empty());
420 }
421
422 /// Multiple settlements per checkpoint — the failure mode that
423 /// motivated this whole module. Three transactions in checkpoint
424 /// 10 settle in two `settle_events` calls, so events 0–1 fold as
425 /// one batch and event 2 folds as another. The chain head must
426 /// match only if both batches are folded separately.
427 #[test]
428 fn fold_and_reconcile_multiple_settlements_per_checkpoint() {
429 let events = vec![
430 sample_event(10, 0, 0),
431 sample_event(10, 1, 0),
432 sample_event(10, 3, 0),
433 ];
434 // First two events settle at txn 2; the third settles at txn 4.
435 let settlements = vec![(10u64, 2u64), (10u64, 4u64)];
436
437 let chain_head = expected_chain_head(&events, &settlements);
438
439 let mut state = StreamState::new(EventStreamHead::default(), 0);
440 buffer_response_batch(&mut state, events.clone(), Some(10));
441
442 // Sanity-check that the two-batch fold differs from the
443 // (incorrect) single-batch fold the old code would have
444 // produced — this is the regression we're guarding against.
445 let single_batch = apply_stream_updates(
446 EventStreamHead::default(),
447 &[EventBatch {
448 checkpoint_seq: 10,
449 commitments: events
450 .iter()
451 .map(|e| EventCommitment {
452 checkpoint_seq: e.checkpoint,
453 transaction_idx: e.transaction_offset,
454 event_idx: e.event_index as u64,
455 digest: e.event.digest(),
456 })
457 .collect(),
458 }],
459 )
460 .unwrap();
461 assert_ne!(single_batch, chain_head);
462
463 let released =
464 fold_and_reconcile(&mut state, &settlements, chain_head.clone(), 10).unwrap();
465
466 assert_eq!(released, events);
467 assert_eq!(state.local_head, chain_head);
468 assert_eq!(state.confirmed_through, 10);
469 }
470
471 /// Reconciliation only consumes the buffer prefix at-or-below the
472 /// reconciliation point; events past it stay buffered for the next
473 /// round.
474 #[test]
475 fn fold_and_reconcile_keeps_unfolded_tail_buffered() {
476 let events = vec![
477 sample_event(5, 0, 0),
478 sample_event(5, 0, 1),
479 sample_event(9, 0, 0), // past the reconciliation point
480 ];
481 // Only the cp=5 settlement is in range; the cp=9 settlement
482 // hasn't been queried yet.
483 let settlements = vec![(5u64, 0u64)];
484
485 // Construct a chain head that represents folding only the cp=5
486 // events — this is what `prove_object_at_checkpoint(.., 5)`
487 // would return on chain.
488 let cp5_events: Vec<_> = events.iter().take(2).cloned().collect();
489 let chain_head_at_5 = expected_chain_head(&cp5_events, &settlements);
490
491 let mut state = StreamState::new(EventStreamHead::default(), 0);
492 buffer_response_batch(&mut state, events.clone(), Some(9));
493
494 let released =
495 fold_and_reconcile(&mut state, &settlements, chain_head_at_5.clone(), 5).unwrap();
496
497 assert_eq!(released, cp5_events);
498 assert_eq!(state.local_head, chain_head_at_5);
499 assert_eq!(state.confirmed_through, 5);
500 assert_eq!(state.buffer.len(), 1);
501 assert_eq!(state.buffer.front().unwrap().checkpoint, 9);
502 }
503
504 /// A chain head that doesn't match the locally-folded head returns
505 /// `MmrMismatch` and does NOT advance `confirmed_through` or drain
506 /// the buffer.
507 #[test]
508 fn fold_and_reconcile_rejects_divergence_without_mutating_observable_state() {
509 let events = vec![sample_event(3, 0, 0)];
510 let settlements = vec![(3u64, 0u64)];
511 let bogus = EventStreamHead {
512 mmr: vec![u256_from_decimal("999")],
513 checkpoint_seq: 3,
514 num_events: 1,
515 };
516
517 let mut state = StreamState::new(EventStreamHead::default(), 0);
518 buffer_response_batch(&mut state, events.clone(), Some(3));
519 let pre_local_head = state.local_head.clone();
520 let pre_buffer_len = state.buffer.len();
521 let pre_confirmed = state.confirmed_through;
522
523 let err = fold_and_reconcile(&mut state, &settlements, bogus.clone(), 3).unwrap_err();
524 assert!(matches!(err, LightClientError::MmrMismatch(_)));
525
526 assert_eq!(state.local_head, pre_local_head);
527 assert_eq!(state.buffer.len(), pre_buffer_len);
528 assert_eq!(state.confirmed_through, pre_confirmed);
529 }
530
531 /// Calling `fold_and_reconcile` with no settlements is a hard
532 /// error — the caller is contracted to only invoke it after
533 /// `ListTransactions` has surfaced at least one settlement in the
534 /// scan range.
535 #[test]
536 fn fold_and_reconcile_rejects_empty_settlements() {
537 let mut state = StreamState::new(EventStreamHead::default(), 0);
538 let err = fold_and_reconcile(&mut state, &[], EventStreamHead::default(), 0).unwrap_err();
539 assert!(matches!(
540 err,
541 LightClientError::UnexpectedObjectShape { .. }
542 ));
543 }
544
545 /// The reconciliation checkpoint must match the last settlement's
546 /// checkpoint — otherwise the local fold would terminate at a
547 /// different head than the on-chain proof attests to.
548 #[test]
549 fn fold_and_reconcile_rejects_misaligned_reconcile_checkpoint() {
550 let settlements = vec![(5u64, 0u64)];
551 let mut state = StreamState::new(EventStreamHead::default(), 0);
552 let err = fold_and_reconcile(&mut state, &settlements, EventStreamHead::default(), 7)
553 .unwrap_err();
554 assert!(matches!(
555 err,
556 LightClientError::UnexpectedObjectShape { .. }
557 ));
558 }
559
560 /// A buffered event whose checkpoint has no settlement in range is
561 /// a server-side index inconsistency — surface it as
562 /// `UnexpectedObjectShape` rather than fold silently into the
563 /// wrong bucket.
564 #[test]
565 fn fold_and_reconcile_rejects_event_without_matching_settlement() {
566 // Event at cp=5 but settlement only exists at cp=7.
567 let events = vec![sample_event(5, 0, 0)];
568 let settlements = vec![(7u64, 0u64)];
569 let mut state = StreamState::new(EventStreamHead::default(), 0);
570 buffer_response_batch(&mut state, events, Some(7));
571
572 let err = fold_and_reconcile(&mut state, &settlements, EventStreamHead::default(), 7)
573 .unwrap_err();
574 assert!(matches!(
575 err,
576 LightClientError::UnexpectedObjectShape { .. }
577 ));
578 }
579
580 /// `extract_event_stream_head` skips the dynamic-field UID and
581 /// `AccumulatorKey` prefix (64 bytes total) and BCS-decodes the
582 /// tail. Synthesize a `MoveStruct` whose `contents` start with two
583 /// arbitrary 32-byte addresses followed by the BCS bytes of a
584 /// known `EventStreamHead`, then confirm we recover that exact
585 /// head. The 64-byte offset is the load-bearing assumption here;
586 /// shift it by one byte and the BCS decode fails.
587 #[test]
588 fn extract_event_stream_head_skips_uid_and_key_prefix() {
589 use sui_sdk_types::Object;
590 use sui_sdk_types::ObjectData;
591 use sui_sdk_types::Owner;
592 use sui_sdk_types::Version;
593 use sui_sdk_types::framework::derive_event_stream_head_object_id;
594
595 let head = EventStreamHead {
596 mmr: vec![U256::ZERO, U256::ONE],
597 checkpoint_seq: 17,
598 num_events: 5,
599 };
600 let stream_id = Address::TWO;
601
602 // Synthesize the dynamic-field contents: UID = derived object
603 // id, AccumulatorKey = stream_id, then the EventStreamHead
604 // bytes.
605 let mut contents = Vec::new();
606 let uid = derive_event_stream_head_object_id(stream_id);
607 contents.extend_from_slice(uid.as_bytes());
608 contents.extend_from_slice(stream_id.as_bytes());
609 contents.extend(bcs::to_bytes(&head).unwrap());
610
611 let move_struct = sui_sdk_types::MoveStruct::new(
612 StructTag::new(
613 Address::TWO,
614 Identifier::from_static("dynamic_field"),
615 Identifier::from_static("Field"),
616 vec![],
617 ),
618 true,
619 Version::from(1u64),
620 contents,
621 )
622 .expect("contents are at least 32 bytes");
623 let object = Object::new(
624 ObjectData::Struct(move_struct),
625 Owner::Address(Address::ZERO),
626 Digest::new([0; 32]),
627 0,
628 );
629
630 let recovered = extract_event_stream_head(&object).unwrap();
631 assert_eq!(recovered, head);
632 }
633
634 /// Extracting from a package (not a Move struct) is a clean
635 /// `UnexpectedObjectShape` error rather than a panic.
636 #[test]
637 fn extract_event_stream_head_rejects_non_struct() {
638 use sui_sdk_types::MovePackage;
639 use sui_sdk_types::Object;
640 use sui_sdk_types::ObjectData;
641 use sui_sdk_types::Owner;
642 use sui_sdk_types::Version;
643
644 let pkg = MovePackage {
645 id: Address::TWO,
646 version: Version::from(1u64),
647 modules: Default::default(),
648 type_origin_table: Vec::new(),
649 linkage_table: Default::default(),
650 };
651 let object = Object::new(
652 ObjectData::Package(pkg),
653 Owner::Immutable,
654 Digest::new([0; 32]),
655 0,
656 );
657
658 let err = extract_event_stream_head(&object).unwrap_err();
659 assert!(matches!(
660 err,
661 LightClientError::UnexpectedObjectShape { .. }
662 ));
663 }
664}