1use super::object_change::{AccumulatorWriteV1, ObjectIn, ObjectOut};
5use super::{EffectsObjectChange, IDOperation, ObjectChange};
6use crate::accumulator_event::AccumulatorEvent;
7use crate::accumulator_root::AccumulatorObjId;
8use crate::base_types::{
9 EpochId, ObjectDigest, ObjectID, ObjectRef, SequenceNumber, SuiAddress, TransactionDigest,
10 VersionDigest,
11};
12use crate::digests::{EffectsAuxDataDigest, TransactionEventsDigest};
13use crate::effects::{InputConsensusObject, TransactionEffectsAPI};
14use crate::execution::SharedInput;
15use crate::execution_status::{ExecutionFailureStatus, ExecutionStatus, MoveLocation};
16use crate::gas::GasCostSummary;
17#[cfg(debug_assertions)]
18use crate::is_system_package;
19use crate::object::{OBJECT_START_VERSION, Owner};
20use crate::transaction::SharedObjectMutability;
21use serde::{Deserialize, Serialize};
22#[cfg(debug_assertions)]
23use std::collections::HashSet;
24use std::collections::{BTreeMap, BTreeSet};
25
26#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
28pub struct TransactionEffectsV2 {
29 pub(crate) status: ExecutionStatus,
31 pub(crate) executed_epoch: EpochId,
33 pub(crate) gas_used: GasCostSummary,
34 pub(crate) transaction_digest: TransactionDigest,
36 pub(crate) gas_object_index: Option<u32>,
40 pub(crate) events_digest: Option<TransactionEventsDigest>,
43 pub(crate) dependencies: Vec<TransactionDigest>,
45
46 pub(crate) lamport_version: SequenceNumber,
48 pub(crate) changed_objects: Vec<(ObjectID, EffectsObjectChange)>,
56 pub(crate) unchanged_consensus_objects: Vec<(ObjectID, UnchangedConsensusKind)>,
61 pub(crate) aux_data_digest: Option<EffectsAuxDataDigest>,
65}
66
67impl TransactionEffectsAPI for TransactionEffectsV2 {
68 fn status(&self) -> &ExecutionStatus {
69 &self.status
70 }
71
72 fn into_status(self) -> ExecutionStatus {
73 self.status
74 }
75
76 fn executed_epoch(&self) -> EpochId {
77 self.executed_epoch
78 }
79
80 fn modified_at_versions(&self) -> Vec<(ObjectID, SequenceNumber)> {
81 self.changed_objects
82 .iter()
83 .filter_map(|(id, change)| {
84 if let ObjectIn::Exist(((version, _digest), _owner)) = &change.input_state {
85 Some((*id, *version))
86 } else {
87 None
88 }
89 })
90 .collect()
91 }
92
93 fn move_abort(&self) -> Option<(MoveLocation, u64)> {
94 let ExecutionStatus::Failure {
95 error: ExecutionFailureStatus::MoveAbort(move_location, code),
96 ..
97 } = self.status()
98 else {
99 return None;
100 };
101 Some((move_location.clone(), *code))
102 }
103
104 fn lamport_version(&self) -> SequenceNumber {
105 self.lamport_version
106 }
107
108 fn old_object_metadata(&self) -> Vec<(ObjectRef, Owner)> {
109 self.changed_objects
110 .iter()
111 .filter_map(|(id, change)| {
112 if let ObjectIn::Exist(((version, digest), owner)) = &change.input_state {
113 Some(((*id, *version, *digest), owner.clone()))
114 } else {
115 None
116 }
117 })
118 .collect()
119 }
120
121 fn input_consensus_objects(&self) -> Vec<InputConsensusObject> {
122 self.changed_objects
123 .iter()
124 .filter_map(|(id, change)| match &change.input_state {
125 ObjectIn::Exist(((version, digest), owner)) if owner.is_consensus() => {
126 Some(InputConsensusObject::Mutate((*id, *version, *digest)))
127 }
128 _ => None,
129 })
130 .chain(
131 self.unchanged_consensus_objects
132 .iter()
133 .filter_map(|(id, change_kind)| match change_kind {
134 UnchangedConsensusKind::ReadOnlyRoot((version, digest)) => {
135 Some(InputConsensusObject::ReadOnly((*id, *version, *digest)))
136 }
137 UnchangedConsensusKind::MutateConsensusStreamEnded(seqno) => Some(
138 InputConsensusObject::MutateConsensusStreamEnded(*id, *seqno),
139 ),
140 UnchangedConsensusKind::ReadConsensusStreamEnded(seqno) => {
141 Some(InputConsensusObject::ReadConsensusStreamEnded(*id, *seqno))
142 }
143 UnchangedConsensusKind::Cancelled(seqno) => {
144 Some(InputConsensusObject::Cancelled(*id, *seqno))
145 }
146 UnchangedConsensusKind::PerEpochConfig => None,
150 }),
151 )
152 .collect()
153 }
154
155 fn created(&self) -> Vec<(ObjectRef, Owner)> {
156 self.changed_objects
157 .iter()
158 .filter_map(|(id, change)| {
159 match (
160 &change.input_state,
161 &change.output_state,
162 &change.id_operation,
163 ) {
164 (
165 ObjectIn::NotExist,
166 ObjectOut::ObjectWrite((digest, owner)),
167 IDOperation::Created,
168 ) => Some(((*id, self.lamport_version, *digest), owner.clone())),
169 (
170 ObjectIn::NotExist,
171 ObjectOut::PackageWrite((version, digest)),
172 IDOperation::Created,
173 ) => Some(((*id, *version, *digest), Owner::Immutable)),
174 _ => None,
175 }
176 })
177 .collect()
178 }
179
180 fn mutated(&self) -> Vec<(ObjectRef, Owner)> {
181 self.changed_objects
182 .iter()
183 .filter_map(
184 |(id, change)| match (&change.input_state, &change.output_state) {
185 (ObjectIn::Exist(_), ObjectOut::ObjectWrite((digest, owner))) => {
186 Some(((*id, self.lamport_version, *digest), owner.clone()))
187 }
188 (ObjectIn::Exist(_), ObjectOut::PackageWrite((version, digest))) => {
189 Some(((*id, *version, *digest), Owner::Immutable))
190 }
191 _ => None,
192 },
193 )
194 .collect()
195 }
196
197 fn unwrapped(&self) -> Vec<(ObjectRef, Owner)> {
198 self.changed_objects
199 .iter()
200 .filter_map(|(id, change)| {
201 match (
202 &change.input_state,
203 &change.output_state,
204 &change.id_operation,
205 ) {
206 (
207 ObjectIn::NotExist,
208 ObjectOut::ObjectWrite((digest, owner)),
209 IDOperation::None,
210 ) => Some(((*id, self.lamport_version, *digest), owner.clone())),
211 _ => None,
212 }
213 })
214 .collect()
215 }
216
217 fn deleted(&self) -> Vec<ObjectRef> {
218 self.changed_objects
219 .iter()
220 .filter_map(|(id, change)| {
221 match (
222 &change.input_state,
223 &change.output_state,
224 &change.id_operation,
225 ) {
226 (ObjectIn::Exist(_), ObjectOut::NotExist, IDOperation::Deleted) => Some((
227 *id,
228 self.lamport_version,
229 ObjectDigest::OBJECT_DIGEST_DELETED,
230 )),
231 _ => None,
232 }
233 })
234 .collect()
235 }
236
237 fn unwrapped_then_deleted(&self) -> Vec<ObjectRef> {
238 self.changed_objects
239 .iter()
240 .filter_map(|(id, change)| {
241 match (
242 &change.input_state,
243 &change.output_state,
244 &change.id_operation,
245 ) {
246 (ObjectIn::NotExist, ObjectOut::NotExist, IDOperation::Deleted) => Some((
247 *id,
248 self.lamport_version,
249 ObjectDigest::OBJECT_DIGEST_DELETED,
250 )),
251 _ => None,
252 }
253 })
254 .collect()
255 }
256
257 fn wrapped(&self) -> Vec<ObjectRef> {
258 self.changed_objects
259 .iter()
260 .filter_map(|(id, change)| {
261 match (
262 &change.input_state,
263 &change.output_state,
264 &change.id_operation,
265 ) {
266 (ObjectIn::Exist(_), ObjectOut::NotExist, IDOperation::None) => Some((
267 *id,
268 self.lamport_version,
269 ObjectDigest::OBJECT_DIGEST_WRAPPED,
270 )),
271 _ => None,
272 }
273 })
274 .collect()
275 }
276
277 fn written(&self) -> Vec<ObjectRef> {
278 self.changed_objects
279 .iter()
280 .filter_map(
281 |(id, change)| match (&change.output_state, &change.id_operation) {
282 (ObjectOut::NotExist, IDOperation::Deleted) => Some((
283 *id,
284 self.lamport_version,
285 ObjectDigest::OBJECT_DIGEST_DELETED,
286 )),
287 (ObjectOut::NotExist, IDOperation::None) => Some((
288 *id,
289 self.lamport_version,
290 ObjectDigest::OBJECT_DIGEST_WRAPPED,
291 )),
292 (ObjectOut::ObjectWrite((d, _)), _) => Some((*id, self.lamport_version, *d)),
293 (ObjectOut::PackageWrite(vd), _) => Some((*id, vd.0, vd.1)),
294 (ObjectOut::AccumulatorWriteV1(_), _) => None,
295 _ => None,
296 },
297 )
298 .collect()
299 }
300
301 fn transferred_from_consensus(&self) -> Vec<ObjectRef> {
302 self.changed_objects
303 .iter()
304 .filter_map(|(id, change)| {
305 match (
306 &change.input_state,
307 &change.output_state,
308 &change.id_operation,
309 ) {
310 (
311 ObjectIn::Exist((_, Owner::ConsensusAddressOwner { .. })),
312 ObjectOut::ObjectWrite((
313 object_digest,
314 Owner::AddressOwner(_) | Owner::ObjectOwner(_) | Owner::Immutable,
315 )),
316 IDOperation::None,
317 ) => Some((*id, self.lamport_version, *object_digest)),
318 _ => None,
319 }
320 })
321 .collect()
322 }
323
324 fn transferred_to_consensus(&self) -> Vec<ObjectRef> {
325 self.changed_objects
326 .iter()
327 .filter_map(|(id, change)| {
328 match (
329 &change.input_state,
330 &change.output_state,
331 &change.id_operation,
332 ) {
333 (
334 ObjectIn::Exist((_, Owner::AddressOwner(_) | Owner::ObjectOwner(_))),
335 ObjectOut::ObjectWrite((
336 object_digest,
337 Owner::ConsensusAddressOwner { .. },
338 )),
339 IDOperation::None,
340 ) => Some((*id, self.lamport_version, *object_digest)),
341 _ => None,
342 }
343 })
344 .collect()
345 }
346
347 fn consensus_owner_changed(&self) -> Vec<ObjectRef> {
348 self.changed_objects
349 .iter()
350 .filter_map(|(id, change)| {
351 match (
352 &change.input_state,
353 &change.output_state,
354 &change.id_operation,
355 ) {
356 (
357 ObjectIn::Exist((
358 _,
359 Owner::ConsensusAddressOwner {
360 owner: old_owner, ..
361 },
362 )),
363 ObjectOut::ObjectWrite((
364 object_digest,
365 Owner::ConsensusAddressOwner {
366 owner: new_owner, ..
367 },
368 )),
369 IDOperation::None,
370 ) if old_owner != new_owner => {
371 Some((*id, self.lamport_version, *object_digest))
372 }
373 _ => None,
374 }
375 })
376 .collect()
377 }
378
379 fn object_changes(&self) -> Vec<ObjectChange> {
380 self.changed_objects
381 .iter()
382 .filter_map(|(id, change)| {
383 let input_version_digest = match &change.input_state {
384 ObjectIn::NotExist => None,
385 ObjectIn::Exist((vd, _)) => Some(*vd),
386 };
387
388 let output_version_digest = match &change.output_state {
389 ObjectOut::NotExist => None,
390 ObjectOut::ObjectWrite((d, _)) => Some((self.lamport_version, *d)),
391 ObjectOut::PackageWrite(vd) => Some(*vd),
392 ObjectOut::AccumulatorWriteV1(_) => {
393 return None;
394 }
395 };
396
397 Some(ObjectChange {
398 id: *id,
399
400 input_version: input_version_digest.map(|k| k.0),
401 input_digest: input_version_digest.map(|k| k.1),
402
403 output_version: output_version_digest.map(|k| k.0),
404 output_digest: output_version_digest.map(|k| k.1),
405
406 id_operation: change.id_operation,
407 })
408 })
409 .collect()
410 }
411
412 fn published_packages(&self) -> Vec<ObjectID> {
413 self.changed_objects
414 .iter()
415 .filter_map(|(id, change)| {
416 if matches!(&change.output_state, ObjectOut::PackageWrite(_)) {
417 Some(*id)
418 } else {
419 None
420 }
421 })
422 .collect()
423 }
424
425 fn accumulator_events(&self) -> Vec<AccumulatorEvent> {
426 self.changed_objects
427 .iter()
428 .filter_map(|(id, change)| match &change.output_state {
429 ObjectOut::AccumulatorWriteV1(write) => Some(AccumulatorEvent::new(
430 AccumulatorObjId::new_unchecked(*id),
431 write.clone(),
432 )),
433 _ => None,
434 })
435 .collect()
436 }
437
438 fn gas_object(&self) -> (ObjectRef, Owner) {
439 if let Some(gas_object_index) = self.gas_object_index {
440 let entry = &self.changed_objects[gas_object_index as usize];
441 match &entry.1.output_state {
442 ObjectOut::ObjectWrite((digest, owner)) => {
443 ((entry.0, self.lamport_version, *digest), owner.clone())
444 }
445 _ => panic!("Gas object must be an ObjectWrite in changed_objects"),
446 }
447 } else {
448 (
449 (ObjectID::ZERO, SequenceNumber::default(), ObjectDigest::MIN),
450 Owner::AddressOwner(SuiAddress::default()),
451 )
452 }
453 }
454
455 fn events_digest(&self) -> Option<&TransactionEventsDigest> {
456 self.events_digest.as_ref()
457 }
458
459 fn dependencies(&self) -> &[TransactionDigest] {
460 &self.dependencies
461 }
462
463 fn transaction_digest(&self) -> &TransactionDigest {
464 &self.transaction_digest
465 }
466
467 fn gas_cost_summary(&self) -> &GasCostSummary {
468 &self.gas_used
469 }
470
471 fn unchanged_consensus_objects(&self) -> Vec<(ObjectID, UnchangedConsensusKind)> {
472 self.unchanged_consensus_objects.clone()
473 }
474
475 fn accumulator_updates(&self) -> Vec<(ObjectID, AccumulatorWriteV1)> {
476 self.changed_objects
477 .iter()
478 .filter_map(|(id, change)| match &change.output_state {
479 ObjectOut::AccumulatorWriteV1(update) => Some((*id, update.clone())),
480 _ => None,
481 })
482 .collect()
483 }
484
485 fn status_mut_for_testing(&mut self) -> &mut ExecutionStatus {
486 &mut self.status
487 }
488
489 fn gas_cost_summary_mut_for_testing(&mut self) -> &mut GasCostSummary {
490 &mut self.gas_used
491 }
492
493 fn transaction_digest_mut_for_testing(&mut self) -> &mut TransactionDigest {
494 &mut self.transaction_digest
495 }
496
497 fn dependencies_mut_for_testing(&mut self) -> &mut Vec<TransactionDigest> {
498 &mut self.dependencies
499 }
500
501 fn unsafe_add_input_consensus_object_for_testing(&mut self, kind: InputConsensusObject) {
502 match kind {
503 InputConsensusObject::Mutate(obj_ref) => self.changed_objects.push((
504 obj_ref.0,
505 EffectsObjectChange {
506 input_state: ObjectIn::Exist((
507 (obj_ref.1, obj_ref.2),
508 Owner::Shared {
509 initial_shared_version: OBJECT_START_VERSION,
510 },
511 )),
512 output_state: ObjectOut::ObjectWrite((
513 obj_ref.2,
514 Owner::Shared {
515 initial_shared_version: obj_ref.1,
516 },
517 )),
518 id_operation: IDOperation::None,
519 },
520 )),
521 InputConsensusObject::ReadOnly(obj_ref) => self.unchanged_consensus_objects.push((
522 obj_ref.0,
523 UnchangedConsensusKind::ReadOnlyRoot((obj_ref.1, obj_ref.2)),
524 )),
525 InputConsensusObject::ReadConsensusStreamEnded(obj_id, seqno) => {
526 self.unchanged_consensus_objects.push((
527 obj_id,
528 UnchangedConsensusKind::ReadConsensusStreamEnded(seqno),
529 ))
530 }
531 InputConsensusObject::MutateConsensusStreamEnded(obj_id, seqno) => {
532 self.unchanged_consensus_objects.push((
533 obj_id,
534 UnchangedConsensusKind::MutateConsensusStreamEnded(seqno),
535 ))
536 }
537 InputConsensusObject::Cancelled(obj_id, seqno) => self
538 .unchanged_consensus_objects
539 .push((obj_id, UnchangedConsensusKind::Cancelled(seqno))),
540 }
541 }
542
543 fn unsafe_add_deleted_live_object_for_testing(&mut self, obj_ref: ObjectRef) {
544 self.changed_objects.push((
545 obj_ref.0,
546 EffectsObjectChange {
547 input_state: ObjectIn::Exist((
548 (obj_ref.1, obj_ref.2),
549 Owner::AddressOwner(SuiAddress::default()),
550 )),
551 output_state: ObjectOut::ObjectWrite((
552 obj_ref.2,
553 Owner::AddressOwner(SuiAddress::default()),
554 )),
555 id_operation: IDOperation::None,
556 },
557 ))
558 }
559
560 fn unsafe_add_object_tombstone_for_testing(&mut self, obj_ref: ObjectRef) {
561 self.changed_objects.push((
562 obj_ref.0,
563 EffectsObjectChange {
564 input_state: ObjectIn::Exist((
565 (obj_ref.1, obj_ref.2),
566 Owner::AddressOwner(SuiAddress::default()),
567 )),
568 output_state: ObjectOut::NotExist,
569 id_operation: IDOperation::Deleted,
570 },
571 ))
572 }
573}
574
575impl TransactionEffectsV2 {
576 pub fn new(
577 status: ExecutionStatus,
578 executed_epoch: EpochId,
579 gas_used: GasCostSummary,
580 shared_objects: Vec<SharedInput>,
581 loaded_per_epoch_config_objects: BTreeSet<ObjectID>,
582 transaction_digest: TransactionDigest,
583 lamport_version: SequenceNumber,
584 changed_objects: BTreeMap<ObjectID, EffectsObjectChange>,
585 gas_object: Option<ObjectID>,
586 events_digest: Option<TransactionEventsDigest>,
587 dependencies: Vec<TransactionDigest>,
588 ) -> Self {
589 let unchanged_consensus_objects = shared_objects
590 .into_iter()
591 .filter_map(|shared_input| match shared_input {
592 SharedInput::Existing((id, version, digest)) => {
593 if changed_objects.contains_key(&id) {
594 None
595 } else {
596 Some((id, UnchangedConsensusKind::ReadOnlyRoot((version, digest))))
597 }
598 }
599 SharedInput::ConsensusStreamEnded((id, version, mutability, _)) => {
600 debug_assert!(!changed_objects.contains_key(&id));
601 match mutability {
602 SharedObjectMutability::Mutable => Some((
603 id,
604 UnchangedConsensusKind::MutateConsensusStreamEnded(version),
605 )),
606 SharedObjectMutability::Immutable => Some((
607 id,
608 UnchangedConsensusKind::ReadConsensusStreamEnded(version),
609 )),
610 SharedObjectMutability::NonExclusiveWrite => Some((
613 id,
614 UnchangedConsensusKind::MutateConsensusStreamEnded(version),
615 )),
616 }
617 }
618 SharedInput::Cancelled((id, version)) => {
619 debug_assert!(!changed_objects.contains_key(&id));
620 Some((id, UnchangedConsensusKind::Cancelled(version)))
621 }
622 })
623 .chain(
624 loaded_per_epoch_config_objects
625 .into_iter()
626 .map(|id| (id, UnchangedConsensusKind::PerEpochConfig)),
627 )
628 .collect();
629 let changed_objects: Vec<_> = changed_objects.into_iter().collect();
630
631 let gas_object_index = gas_object.map(|gas_id| {
632 changed_objects
633 .iter()
634 .position(|(id, _)| id == &gas_id)
635 .unwrap() as u32
636 });
637
638 let result = Self {
639 status,
640 executed_epoch,
641 gas_used,
642 transaction_digest,
643 lamport_version,
644 changed_objects,
645 unchanged_consensus_objects,
646 gas_object_index,
647 events_digest,
648 dependencies,
649 aux_data_digest: None,
650 };
651 #[cfg(debug_assertions)]
652 result.check_invariant();
653
654 result
655 }
656
657 #[cfg(debug_assertions)]
660 fn check_invariant(&self) {
661 let mut unique_ids = HashSet::new();
662 for (id, change) in &self.changed_objects {
663 assert!(unique_ids.insert(*id));
664 match (
665 &change.input_state,
666 &change.output_state,
667 &change.id_operation,
668 ) {
669 (ObjectIn::NotExist, ObjectOut::NotExist, IDOperation::Created) => {
670 }
672 (ObjectIn::NotExist, ObjectOut::NotExist, IDOperation::Deleted) => {
673 }
675 (ObjectIn::NotExist, ObjectOut::ObjectWrite((_, owner)), IDOperation::None) => {
676 assert!(!owner.is_shared());
679 }
680 (ObjectIn::NotExist, ObjectOut::ObjectWrite(..), IDOperation::Created) => {
681 }
683 (ObjectIn::NotExist, ObjectOut::PackageWrite(_), IDOperation::Created) => {
684 }
686 (
687 ObjectIn::Exist(((old_version, _), old_owner)),
688 ObjectOut::NotExist,
689 IDOperation::None,
690 ) => {
691 assert!(old_version.value() < self.lamport_version.value());
693 assert!(
694 !old_owner.is_shared() && !old_owner.is_immutable(),
695 "Cannot wrap shared or immutable object"
696 );
697 }
698 (
699 ObjectIn::Exist(((old_version, _), old_owner)),
700 ObjectOut::NotExist,
701 IDOperation::Deleted,
702 ) => {
703 assert!(old_version.value() < self.lamport_version.value());
705 assert!(!old_owner.is_immutable(), "Cannot delete immutable object");
706 }
707 (
708 ObjectIn::Exist(((old_version, old_digest), old_owner)),
709 ObjectOut::ObjectWrite((new_digest, new_owner)),
710 IDOperation::None,
711 ) => {
712 assert!(old_version.value() < self.lamport_version.value());
714 assert_ne!(old_digest, new_digest);
715 assert!(!old_owner.is_immutable(), "Cannot mutate immutable object");
716 if old_owner.is_shared() {
717 assert!(new_owner.is_shared(), "Cannot un-share an object");
718 } else {
719 assert!(!new_owner.is_shared(), "Cannot share an existing object");
720 }
721 }
722 (
723 ObjectIn::Exist(((old_version, old_digest), old_owner)),
724 ObjectOut::PackageWrite((new_version, new_digest)),
725 IDOperation::None,
726 ) => {
727 assert!(
729 old_owner.is_immutable() && is_system_package(*id),
730 "Must be a system package"
731 );
732 assert_eq!(old_version.value() + 1, new_version.value());
733 assert_ne!(old_digest, new_digest);
734 }
735 (ObjectIn::NotExist, ObjectOut::AccumulatorWriteV1(_), IDOperation::None) => {
736 }
738 _ => {
739 panic!("Impossible object change: {:?}, {:?}", id, change);
740 }
741 }
742 }
743 let (_, owner) = self.gas_object();
745 assert!(matches!(owner, Owner::AddressOwner(_)));
746
747 for (id, _) in &self.unchanged_consensus_objects {
748 assert!(
749 unique_ids.insert(*id),
750 "Duplicate object id: {:?}\n{:#?}",
751 id,
752 self
753 );
754 }
755 }
756}
757
758impl Default for TransactionEffectsV2 {
759 fn default() -> Self {
760 Self {
761 status: ExecutionStatus::Success,
762 executed_epoch: 0,
763 gas_used: GasCostSummary::default(),
764 transaction_digest: TransactionDigest::default(),
765 lamport_version: SequenceNumber::default(),
766 changed_objects: vec![],
767 unchanged_consensus_objects: vec![],
768 gas_object_index: None,
769 events_digest: None,
770 dependencies: vec![],
771 aux_data_digest: None,
772 }
773 }
774}
775
776#[derive(Eq, PartialEq, Clone, Debug, Serialize, Deserialize)]
777pub enum UnchangedConsensusKind {
778 ReadOnlyRoot(VersionDigest),
781 MutateConsensusStreamEnded(SequenceNumber),
783 ReadConsensusStreamEnded(SequenceNumber),
785 Cancelled(SequenceNumber),
787 PerEpochConfig,
789}