Skip to main content

sui_rpc/light_client/events/
state.rs

1//! Pure verification helpers for the streaming client.
2//!
3//! Factored out of the spawned task so they can be unit-tested without
4//! standing up an RPC harness. The streaming task in `client` calls
5//! [`buffer_response_batch`] for each incoming page of events and
6//! [`fold_and_reconcile`] each reconciliation tick — passing in the
7//! settlement boundaries fetched from `ListTransactions(affected_object =
8//! event_stream_head)` so events are folded into the local MMR in the
9//! same per-settlement partitions the framework used on chain.
10
11use std::collections::VecDeque;
12
13use sui_sdk_types::Object;
14use sui_sdk_types::framework::EventBatch;
15use sui_sdk_types::framework::EventCommitment;
16use sui_sdk_types::framework::EventStreamHead;
17use sui_sdk_types::framework::apply_stream_updates;
18
19use super::envelope::AuthenticatedEvent;
20use crate::light_client::error::LightClientError;
21use crate::light_client::error::MmrMismatch;
22
23/// In-memory state the streaming task carries across iterations of the
24/// list-events / reconcile loop.
25///
26/// Events are buffered (not folded) as `ListEvents` pages arrive: the
27/// framework can run multiple `settle_events` transactions per
28/// checkpoint (one per consensus commit per stream), and folding events
29/// into the wrong settlement bucket produces an MMR shape that will
30/// never match the on-chain head. The buffer is flushed at
31/// reconciliation time, once `ListTransactions(affected_object =
32/// event_stream_head)` has surfaced the per-settlement boundaries.
33#[derive(Debug, Default)]
34pub(super) struct StreamState {
35    /// Events received from `ListEvents` but not yet folded into
36    /// [`Self::local_head`]. Once a reconciliation tick succeeds, every
37    /// entry with `checkpoint <= reconciled_checkpoint` is folded into
38    /// the local MMR, released to the consumer, and drained from this
39    /// queue.
40    ///
41    /// Items are stored in receipt order — which matches strictly
42    /// ascending `(checkpoint, transaction_offset, event_index)` per the
43    /// v2alpha contract.
44    pub buffer: VecDeque<AuthenticatedEvent>,
45
46    /// The MMR head we've replayed by folding the settlement-partitioned
47    /// event batches. Compared byte-for-byte against the on-chain head
48    /// during reconciliation.
49    pub local_head: EventStreamHead,
50
51    /// The most recent checkpoint at which `local_head` was confirmed
52    /// to match the on-chain head. Subsequent reconciliations only need
53    /// to fold events landed strictly after this point.
54    pub confirmed_through: u64,
55
56    /// Highest checkpoint number whose events `ListEvents` has fully
57    /// emitted to us, derived from the latest `Watermark.checkpoint_hi`
58    /// across both standalone watermarks and per-item watermarks.
59    ///
60    /// Reconciliation uses this as the upper bound when fetching
61    /// settlements: settlements at checkpoints we haven't fully scanned
62    /// for events would otherwise produce an empty fold (with the head
63    /// stamping a different `num_events`) and fail reconciliation.
64    pub events_scanned_through: u64,
65}
66
67impl StreamState {
68    pub(super) fn new(initial_head: EventStreamHead, confirmed_through: u64) -> Self {
69        Self {
70            buffer: VecDeque::new(),
71            local_head: initial_head,
72            confirmed_through,
73            events_scanned_through: confirmed_through,
74        }
75    }
76}
77
78/// Append `events` to `state.buffer` and advance
79/// `state.events_scanned_through` to `watermark_hi.max(events.last())` if
80/// either is greater than the current value.
81///
82/// `events` must be in strictly ascending
83/// `(checkpoint, transaction_offset, event_index)` order with respect to
84/// each other and to whatever's already at the back of `state.buffer` —
85/// the v2alpha contract guarantees this for `ListEvents` responses, and
86/// the fold-time partitioner relies on it. Empty input is a no-op.
87///
88/// `watermark_hi` is the most recent `Watermark.checkpoint_hi` observed
89/// during this page, including the watermark embedded on the last item.
90/// Pass `None` when the page produced neither a standalone watermark nor
91/// an item-embedded one (e.g., an immediate `End` frame at genesis).
92pub(super) fn buffer_response_batch(
93    state: &mut StreamState,
94    events: Vec<AuthenticatedEvent>,
95    watermark_hi: Option<u64>,
96) {
97    let last_event_cp = events.last().map(|e| e.checkpoint);
98    for event in events {
99        state.buffer.push_back(event);
100    }
101
102    if let Some(hi) = watermark_hi {
103        state.events_scanned_through = state.events_scanned_through.max(hi);
104    }
105    if let Some(cp) = last_event_cp {
106        // Defensive: even if no watermark frame surfaced for this page,
107        // every emitted event implies the scan reached its checkpoint.
108        state.events_scanned_through = state.events_scanned_through.max(cp);
109    }
110}
111
112/// Partition the events at the front of `state.buffer` by settlement,
113/// fold each partition into `state.local_head`, and reconcile the
114/// updated head against `chain_head` at the last settlement's
115/// checkpoint.
116///
117/// `settlements` is the ascending sequence of `(checkpoint,
118/// transaction_offset)` pairs from `ListTransactions(affected_object =
119/// event_stream_head)`, covering `(state.confirmed_through,
120/// reconcile_checkpoint]`. `reconcile_checkpoint` is the checkpoint at
121/// which `chain_head` was authenticated — by construction the
122/// checkpoint of the last entry in `settlements`.
123///
124/// Each event in the partitioned prefix is assigned to the next
125/// settlement whose `(cp, tx_offset)` lies at-or-after the event's
126/// position, and a settlement reject is a hard error (it indicates the
127/// stream's `ListEvents` and `ListTransactions` indexes have diverged).
128///
129/// On a matching reconciliation: drains the folded events from
130/// `state.buffer`, advances `state.confirmed_through` to
131/// `reconcile_checkpoint`, and returns the released events in receipt
132/// order. On any mismatch: returns the error without mutating the
133/// observable parts of `state` — the caller should abort the stream.
134pub(super) fn fold_and_reconcile(
135    state: &mut StreamState,
136    settlements: &[(u64, u64)],
137    chain_head: EventStreamHead,
138    reconcile_checkpoint: u64,
139) -> Result<Vec<AuthenticatedEvent>, LightClientError> {
140    // Settlements must be non-empty and the last one's checkpoint must
141    // equal the reconciliation point — that's the load-bearing invariant
142    // the caller is contracted to uphold.
143    let last_settlement =
144        settlements
145            .last()
146            .copied()
147            .ok_or(LightClientError::UnexpectedObjectShape {
148                reason: "fold_and_reconcile called with no settlements; \
149                     reconciliation needs at least one settlement to anchor the chain head",
150            })?;
151    if last_settlement.0 != reconcile_checkpoint {
152        return Err(LightClientError::UnexpectedObjectShape {
153            reason: "settlement boundaries disagree with reconciliation checkpoint",
154        });
155    }
156
157    // Count the buffered events whose checkpoint lies at or below the
158    // reconciliation point — these are the ones we'll partition and
159    // fold. Anything past `reconcile_checkpoint` stays buffered for the
160    // next round.
161    let fold_count = state
162        .buffer
163        .iter()
164        .take_while(|e| e.checkpoint <= reconcile_checkpoint)
165        .count();
166
167    let batches = bucket_events_by_settlement(
168        state.buffer.iter().take(fold_count),
169        settlements,
170        state.confirmed_through,
171    )?;
172
173    let new_head =
174        apply_stream_updates(state.local_head.clone(), &batches).map_err(LightClientError::from)?;
175
176    if new_head != chain_head {
177        return Err(LightClientError::MmrMismatch(Box::new(MmrMismatch {
178            checkpoint: reconcile_checkpoint,
179            expected: chain_head,
180            actual: new_head,
181        })));
182    }
183
184    // Reconciliation succeeded: commit the fold and drain the released
185    // events.
186    state.local_head = new_head;
187    state.confirmed_through = reconcile_checkpoint;
188    let released: Vec<AuthenticatedEvent> = state.buffer.drain(..fold_count).collect();
189    Ok(released)
190}
191
192/// Bucket `events` into per-settlement [`EventBatch`]es using the
193/// ascending `settlements` boundary list. Returns an empty `Vec` only if
194/// `events` is empty after the `floor_checkpoint` filter — every kept
195/// event must map to a settlement at its own checkpoint, or the function
196/// errors.
197///
198/// Mirrors the upstream e2e helper that drives the on-chain
199/// reconciliation: each event maps to the first settlement with `(cp ==
200/// event.cp, tx_offset >= event.tx_offset)`, and events sharing a
201/// settlement key form a single batch.
202fn bucket_events_by_settlement<'a, I>(
203    events: I,
204    settlements: &[(u64, u64)],
205    floor_checkpoint: u64,
206) -> Result<Vec<EventBatch>, LightClientError>
207where
208    I: IntoIterator<Item = &'a AuthenticatedEvent>,
209{
210    let mut batches: Vec<EventBatch> = Vec::new();
211    let mut current_key: Option<(u64, u64)> = None;
212    let mut settlement_idx: usize = 0;
213
214    for event in events {
215        // Skip events that fall at-or-below the floor — those have
216        // already been folded into `state.local_head` by an earlier
217        // reconciliation (or are part of the initial-state head).
218        if event.checkpoint <= floor_checkpoint {
219            continue;
220        }
221
222        // Advance to the first settlement at-or-past the event's
223        // ledger position.
224        while settlement_idx < settlements.len()
225            && (settlements[settlement_idx].0 < event.checkpoint
226                || (settlements[settlement_idx].0 == event.checkpoint
227                    && settlements[settlement_idx].1 < event.transaction_offset))
228        {
229            settlement_idx += 1;
230        }
231
232        if settlement_idx >= settlements.len() || settlements[settlement_idx].0 != event.checkpoint
233        {
234            // The framework guarantees a `settle_events` transaction
235            // for every checkpoint that emitted events on this stream;
236            // a missing settlement means the server's event index and
237            // transaction index have diverged.
238            return Err(LightClientError::UnexpectedObjectShape {
239                reason: "no settlement transaction covering buffered event",
240            });
241        }
242
243        let settlement_key = settlements[settlement_idx];
244        let commitment = EventCommitment {
245            checkpoint_seq: event.checkpoint,
246            transaction_idx: event.transaction_offset,
247            event_idx: event.event_index as u64,
248            digest: event.event.digest(),
249        };
250
251        match current_key {
252            Some(key) if key == settlement_key => {
253                batches
254                    .last_mut()
255                    .expect("current_key set ⇒ at least one batch exists")
256                    .commitments
257                    .push(commitment);
258            }
259            _ => {
260                batches.push(EventBatch {
261                    checkpoint_seq: event.checkpoint,
262                    commitments: vec![commitment],
263                });
264                current_key = Some(settlement_key);
265            }
266        }
267    }
268
269    Ok(batches)
270}
271
272/// Extract the BCS-encoded [`EventStreamHead`] out of an on-chain
273/// `Object` returned by the OCS inclusion proof flow.
274///
275/// The framework stores each stream head as the value of a dynamic
276/// field on the accumulator root object. The Move-side type is
277/// `sui::dynamic_field::Field<sui::accumulator::Key<EventStreamHead>,
278/// EventStreamHead>`, which BCS-serializes as:
279///
280/// ```text
281/// field = uid key value
282///
283/// uid   = address                 ; 32 bytes (Field.id)
284/// key   = address                 ; 32 bytes (AccumulatorKey { owner: address })
285/// value = bytes-of-EventStreamHead
286/// ```
287///
288/// Both the UID and the single-field-struct `AccumulatorKey` are
289/// fixed 32-byte addresses, so the head's BCS bytes start at offset
290/// 64. We BCS-decode just the tail rather than parsing through the
291/// dynamic-field wrapper to keep this independent of any generic
292/// `Field<K, V>` representation in the SDK.
293pub(super) fn extract_event_stream_head(
294    object: &Object,
295) -> Result<EventStreamHead, LightClientError> {
296    const HEAD_OFFSET: usize = 64;
297
298    let move_struct = object
299        .as_struct()
300        .ok_or(LightClientError::UnexpectedObjectShape {
301            reason: "expected a Move struct (dynamic field), got a package",
302        })?;
303    let contents = move_struct.contents();
304    let head_bytes =
305        contents
306            .get(HEAD_OFFSET..)
307            .ok_or(LightClientError::UnexpectedObjectShape {
308                reason: "dynamic-field contents too short for EventStreamHead value",
309            })?;
310    bcs::from_bytes(head_bytes).map_err(LightClientError::from)
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use sui_sdk_types::Address;
317    use sui_sdk_types::Digest;
318    use sui_sdk_types::Event;
319    use sui_sdk_types::Identifier;
320    use sui_sdk_types::StructTag;
321    use sui_sdk_types::U256;
322
323    fn sample_event(checkpoint: u64, tx_idx: u64, event_idx: u32) -> AuthenticatedEvent {
324        AuthenticatedEvent {
325            checkpoint,
326            transaction_offset: tx_idx,
327            event_index: event_idx,
328            transaction_digest: Digest::new([0xaa; 32]),
329            event: Event {
330                package_id: Address::TWO,
331                module: Identifier::from_static("m"),
332                sender: Address::TWO,
333                type_: StructTag::new(
334                    Address::TWO,
335                    Identifier::from_static("m"),
336                    Identifier::from_static("E"),
337                    vec![],
338                ),
339                contents: vec![checkpoint as u8, tx_idx as u8, event_idx as u8],
340            },
341        }
342    }
343
344    /// Build the chain-head state expected after folding `events`
345    /// bucketed against `settlements`. Mirrors what the test cluster
346    /// would produce on chain so reconciliation calls can be exercised
347    /// against an authoritative reference.
348    fn expected_chain_head(
349        events: &[AuthenticatedEvent],
350        settlements: &[(u64, u64)],
351    ) -> EventStreamHead {
352        let batches = bucket_events_by_settlement(events.iter(), settlements, 0).unwrap();
353        apply_stream_updates(EventStreamHead::default(), &batches).unwrap()
354    }
355
356    fn u256_from_decimal(s: &str) -> U256 {
357        s.parse().expect("decimal U256 literal must parse")
358    }
359
360    /// `buffer_response_batch` only buffers — folding is deferred to
361    /// `fold_and_reconcile`. The local head must not move just from
362    /// receiving events.
363    #[test]
364    fn buffer_response_batch_defers_folding() {
365        let mut state = StreamState::new(EventStreamHead::default(), 0);
366        let events = vec![sample_event(7, 0, 0), sample_event(7, 0, 1)];
367
368        buffer_response_batch(&mut state, events, Some(7));
369
370        assert_eq!(state.local_head, EventStreamHead::default());
371        assert_eq!(state.buffer.len(), 2);
372        assert_eq!(state.events_scanned_through, 7);
373    }
374
375    /// Events arriving without a watermark still bump
376    /// `events_scanned_through` to at least the last event's
377    /// checkpoint — otherwise reconciliation would never see those
378    /// checkpoints as "scanned" and skip them when fetching
379    /// settlements.
380    #[test]
381    fn buffer_response_batch_advances_scan_floor_from_events() {
382        let mut state = StreamState::new(EventStreamHead::default(), 0);
383        buffer_response_batch(&mut state, vec![sample_event(11, 0, 0)], None);
384        assert_eq!(state.events_scanned_through, 11);
385    }
386
387    /// `events_scanned_through` is monotonic — a smaller watermark
388    /// (e.g., a retry that re-emits earlier items) does not regress it.
389    #[test]
390    fn buffer_response_batch_does_not_regress_scan_floor() {
391        let mut state = StreamState::new(EventStreamHead::default(), 0);
392        state.events_scanned_through = 20;
393        buffer_response_batch(&mut state, vec![sample_event(5, 0, 0)], Some(5));
394        assert_eq!(state.events_scanned_through, 20);
395    }
396
397    /// Single settlement per checkpoint — the legacy case the previous
398    /// implementation also supported. All events at one checkpoint
399    /// fold into a single batch and the chain head matches.
400    #[test]
401    fn fold_and_reconcile_single_settlement_per_checkpoint() {
402        let events = vec![
403            sample_event(7, 0, 0),
404            sample_event(7, 0, 1),
405            sample_event(7, 1, 0),
406        ];
407        let settlements = vec![(7u64, 2u64)];
408
409        let chain_head = expected_chain_head(&events, &settlements);
410
411        let mut state = StreamState::new(EventStreamHead::default(), 0);
412        buffer_response_batch(&mut state, events.clone(), Some(7));
413
414        let released = fold_and_reconcile(&mut state, &settlements, chain_head.clone(), 7).unwrap();
415
416        assert_eq!(released, events);
417        assert_eq!(state.local_head, chain_head);
418        assert_eq!(state.confirmed_through, 7);
419        assert!(state.buffer.is_empty());
420    }
421
422    /// Multiple settlements per checkpoint — the failure mode that
423    /// motivated this whole module. Three transactions in checkpoint
424    /// 10 settle in two `settle_events` calls, so events 0–1 fold as
425    /// one batch and event 2 folds as another. The chain head must
426    /// match only if both batches are folded separately.
427    #[test]
428    fn fold_and_reconcile_multiple_settlements_per_checkpoint() {
429        let events = vec![
430            sample_event(10, 0, 0),
431            sample_event(10, 1, 0),
432            sample_event(10, 3, 0),
433        ];
434        // First two events settle at txn 2; the third settles at txn 4.
435        let settlements = vec![(10u64, 2u64), (10u64, 4u64)];
436
437        let chain_head = expected_chain_head(&events, &settlements);
438
439        let mut state = StreamState::new(EventStreamHead::default(), 0);
440        buffer_response_batch(&mut state, events.clone(), Some(10));
441
442        // Sanity-check that the two-batch fold differs from the
443        // (incorrect) single-batch fold the old code would have
444        // produced — this is the regression we're guarding against.
445        let single_batch = apply_stream_updates(
446            EventStreamHead::default(),
447            &[EventBatch {
448                checkpoint_seq: 10,
449                commitments: events
450                    .iter()
451                    .map(|e| EventCommitment {
452                        checkpoint_seq: e.checkpoint,
453                        transaction_idx: e.transaction_offset,
454                        event_idx: e.event_index as u64,
455                        digest: e.event.digest(),
456                    })
457                    .collect(),
458            }],
459        )
460        .unwrap();
461        assert_ne!(single_batch, chain_head);
462
463        let released =
464            fold_and_reconcile(&mut state, &settlements, chain_head.clone(), 10).unwrap();
465
466        assert_eq!(released, events);
467        assert_eq!(state.local_head, chain_head);
468        assert_eq!(state.confirmed_through, 10);
469    }
470
471    /// Reconciliation only consumes the buffer prefix at-or-below the
472    /// reconciliation point; events past it stay buffered for the next
473    /// round.
474    #[test]
475    fn fold_and_reconcile_keeps_unfolded_tail_buffered() {
476        let events = vec![
477            sample_event(5, 0, 0),
478            sample_event(5, 0, 1),
479            sample_event(9, 0, 0), // past the reconciliation point
480        ];
481        // Only the cp=5 settlement is in range; the cp=9 settlement
482        // hasn't been queried yet.
483        let settlements = vec![(5u64, 0u64)];
484
485        // Construct a chain head that represents folding only the cp=5
486        // events — this is what `prove_object_at_checkpoint(.., 5)`
487        // would return on chain.
488        let cp5_events: Vec<_> = events.iter().take(2).cloned().collect();
489        let chain_head_at_5 = expected_chain_head(&cp5_events, &settlements);
490
491        let mut state = StreamState::new(EventStreamHead::default(), 0);
492        buffer_response_batch(&mut state, events.clone(), Some(9));
493
494        let released =
495            fold_and_reconcile(&mut state, &settlements, chain_head_at_5.clone(), 5).unwrap();
496
497        assert_eq!(released, cp5_events);
498        assert_eq!(state.local_head, chain_head_at_5);
499        assert_eq!(state.confirmed_through, 5);
500        assert_eq!(state.buffer.len(), 1);
501        assert_eq!(state.buffer.front().unwrap().checkpoint, 9);
502    }
503
504    /// A chain head that doesn't match the locally-folded head returns
505    /// `MmrMismatch` and does NOT advance `confirmed_through` or drain
506    /// the buffer.
507    #[test]
508    fn fold_and_reconcile_rejects_divergence_without_mutating_observable_state() {
509        let events = vec![sample_event(3, 0, 0)];
510        let settlements = vec![(3u64, 0u64)];
511        let bogus = EventStreamHead {
512            mmr: vec![u256_from_decimal("999")],
513            checkpoint_seq: 3,
514            num_events: 1,
515        };
516
517        let mut state = StreamState::new(EventStreamHead::default(), 0);
518        buffer_response_batch(&mut state, events.clone(), Some(3));
519        let pre_local_head = state.local_head.clone();
520        let pre_buffer_len = state.buffer.len();
521        let pre_confirmed = state.confirmed_through;
522
523        let err = fold_and_reconcile(&mut state, &settlements, bogus.clone(), 3).unwrap_err();
524        assert!(matches!(err, LightClientError::MmrMismatch(_)));
525
526        assert_eq!(state.local_head, pre_local_head);
527        assert_eq!(state.buffer.len(), pre_buffer_len);
528        assert_eq!(state.confirmed_through, pre_confirmed);
529    }
530
531    /// Calling `fold_and_reconcile` with no settlements is a hard
532    /// error — the caller is contracted to only invoke it after
533    /// `ListTransactions` has surfaced at least one settlement in the
534    /// scan range.
535    #[test]
536    fn fold_and_reconcile_rejects_empty_settlements() {
537        let mut state = StreamState::new(EventStreamHead::default(), 0);
538        let err = fold_and_reconcile(&mut state, &[], EventStreamHead::default(), 0).unwrap_err();
539        assert!(matches!(
540            err,
541            LightClientError::UnexpectedObjectShape { .. }
542        ));
543    }
544
545    /// The reconciliation checkpoint must match the last settlement's
546    /// checkpoint — otherwise the local fold would terminate at a
547    /// different head than the on-chain proof attests to.
548    #[test]
549    fn fold_and_reconcile_rejects_misaligned_reconcile_checkpoint() {
550        let settlements = vec![(5u64, 0u64)];
551        let mut state = StreamState::new(EventStreamHead::default(), 0);
552        let err = fold_and_reconcile(&mut state, &settlements, EventStreamHead::default(), 7)
553            .unwrap_err();
554        assert!(matches!(
555            err,
556            LightClientError::UnexpectedObjectShape { .. }
557        ));
558    }
559
560    /// A buffered event whose checkpoint has no settlement in range is
561    /// a server-side index inconsistency — surface it as
562    /// `UnexpectedObjectShape` rather than fold silently into the
563    /// wrong bucket.
564    #[test]
565    fn fold_and_reconcile_rejects_event_without_matching_settlement() {
566        // Event at cp=5 but settlement only exists at cp=7.
567        let events = vec![sample_event(5, 0, 0)];
568        let settlements = vec![(7u64, 0u64)];
569        let mut state = StreamState::new(EventStreamHead::default(), 0);
570        buffer_response_batch(&mut state, events, Some(7));
571
572        let err = fold_and_reconcile(&mut state, &settlements, EventStreamHead::default(), 7)
573            .unwrap_err();
574        assert!(matches!(
575            err,
576            LightClientError::UnexpectedObjectShape { .. }
577        ));
578    }
579
580    /// `extract_event_stream_head` skips the dynamic-field UID and
581    /// `AccumulatorKey` prefix (64 bytes total) and BCS-decodes the
582    /// tail. Synthesize a `MoveStruct` whose `contents` start with two
583    /// arbitrary 32-byte addresses followed by the BCS bytes of a
584    /// known `EventStreamHead`, then confirm we recover that exact
585    /// head. The 64-byte offset is the load-bearing assumption here;
586    /// shift it by one byte and the BCS decode fails.
587    #[test]
588    fn extract_event_stream_head_skips_uid_and_key_prefix() {
589        use sui_sdk_types::Object;
590        use sui_sdk_types::ObjectData;
591        use sui_sdk_types::Owner;
592        use sui_sdk_types::Version;
593        use sui_sdk_types::framework::derive_event_stream_head_object_id;
594
595        let head = EventStreamHead {
596            mmr: vec![U256::ZERO, U256::ONE],
597            checkpoint_seq: 17,
598            num_events: 5,
599        };
600        let stream_id = Address::TWO;
601
602        // Synthesize the dynamic-field contents: UID = derived object
603        // id, AccumulatorKey = stream_id, then the EventStreamHead
604        // bytes.
605        let mut contents = Vec::new();
606        let uid = derive_event_stream_head_object_id(stream_id);
607        contents.extend_from_slice(uid.as_bytes());
608        contents.extend_from_slice(stream_id.as_bytes());
609        contents.extend(bcs::to_bytes(&head).unwrap());
610
611        let move_struct = sui_sdk_types::MoveStruct::new(
612            StructTag::new(
613                Address::TWO,
614                Identifier::from_static("dynamic_field"),
615                Identifier::from_static("Field"),
616                vec![],
617            ),
618            true,
619            Version::from(1u64),
620            contents,
621        )
622        .expect("contents are at least 32 bytes");
623        let object = Object::new(
624            ObjectData::Struct(move_struct),
625            Owner::Address(Address::ZERO),
626            Digest::new([0; 32]),
627            0,
628        );
629
630        let recovered = extract_event_stream_head(&object).unwrap();
631        assert_eq!(recovered, head);
632    }
633
634    /// Extracting from a package (not a Move struct) is a clean
635    /// `UnexpectedObjectShape` error rather than a panic.
636    #[test]
637    fn extract_event_stream_head_rejects_non_struct() {
638        use sui_sdk_types::MovePackage;
639        use sui_sdk_types::Object;
640        use sui_sdk_types::ObjectData;
641        use sui_sdk_types::Owner;
642        use sui_sdk_types::Version;
643
644        let pkg = MovePackage {
645            id: Address::TWO,
646            version: Version::from(1u64),
647            modules: Default::default(),
648            type_origin_table: Vec::new(),
649            linkage_table: Default::default(),
650        };
651        let object = Object::new(
652            ObjectData::Package(pkg),
653            Owner::Immutable,
654            Digest::new([0; 32]),
655            0,
656        );
657
658        let err = extract_event_stream_head(&object).unwrap_err();
659        assert!(matches!(
660            err,
661            LightClientError::UnexpectedObjectShape { .. }
662        ));
663    }
664}