1use std::{collections::BTreeMap, sync::Arc};
4
5use consensus_config::Epoch;
6use consensus_types::block::{
7 BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX, Round, TransactionIndex,
8};
9use mysten_common::debug_fatal;
10use mysten_metrics::monitored_mpsc::{Receiver, Sender, channel};
11use parking_lot::Mutex;
12use tap::TapFallible;
13use thiserror::Error;
14use tokio::sync::oneshot;
15use tracing::{error, warn};
16
17use crate::{block::Transaction, context::Context};
18
19const MAX_PENDING_TRANSACTIONS: usize = 2_000;
21
22pub(crate) struct TransactionsGuard {
27 transactions: Vec<Transaction>,
30
31 included_in_block_ack: oneshot::Sender<(
34 BlockRef,
36 Vec<TransactionIndex>,
38 oneshot::Receiver<BlockStatus>,
40 )>,
41}
42
43pub(crate) struct TransactionConsumer {
47 tx_receiver: Receiver<TransactionsGuard>,
48 max_transactions_in_block_bytes: u64,
49 max_num_transactions_in_block: u64,
50 pending_transactions: Option<TransactionsGuard>,
51 block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq)]
55#[allow(unused)]
56pub enum BlockStatus {
57 Sequenced(BlockRef),
60 GarbageCollected(BlockRef),
63}
64
65#[derive(Debug, Clone, Eq, PartialEq)]
66pub enum LimitReached {
67 MaxNumOfTransactions,
69 MaxBytes,
71 AllTransactionsIncluded,
73}
74
75impl TransactionConsumer {
76 pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
77 assert!(
82 context
83 .protocol_config
84 .max_num_transactions_in_block()
85 .saturating_sub(1)
86 < TransactionIndex::MAX.saturating_sub(NUM_RESERVED_TRANSACTION_INDICES) as u64,
87 "Unsupported max_num_transactions_in_block: {}",
88 context.protocol_config.max_num_transactions_in_block()
89 );
90
91 Self {
92 tx_receiver,
93 max_transactions_in_block_bytes: context
94 .protocol_config
95 .max_transactions_in_block_bytes(),
96 max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
97 pending_transactions: None,
98 block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
99 }
100 }
101
102 pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
107 let mut transactions = Vec::new();
108 let mut acks = Vec::new();
109 let mut total_bytes = 0;
110 let mut limit_reached = LimitReached::AllTransactionsIncluded;
111
112 let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
116 let transactions_num = t.transactions.len() as u64;
119 if transactions_num == 0 {
120 acks.push((t.included_in_block_ack, vec![PING_TRANSACTION_INDEX]));
121 return None;
122 }
123
124 let transactions_bytes =
126 t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
127 if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
128 limit_reached = LimitReached::MaxBytes;
129 return Some(t);
130 }
131 if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
132 limit_reached = LimitReached::MaxNumOfTransactions;
133 return Some(t);
134 }
135
136 total_bytes += transactions_bytes;
137
138 let start_idx = transactions.len() as TransactionIndex;
140 let indices: Vec<TransactionIndex> =
141 (start_idx..start_idx + t.transactions.len() as TransactionIndex).collect();
142
143 acks.push((t.included_in_block_ack, indices));
146 transactions.extend(t.transactions);
147 None
148 };
149
150 if let Some(t) = self.pending_transactions.take()
151 && let Some(pending_transactions) = handle_txs(t)
152 {
153 debug_fatal!(
154 "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
155 pending_transactions.transactions
156 );
157 }
158
159 while self.pending_transactions.is_none() {
162 if let Ok(t) = self.tx_receiver.try_recv() {
163 self.pending_transactions = handle_txs(t);
164 } else {
165 break;
166 }
167 }
168
169 let block_status_subscribers = self.block_status_subscribers.clone();
170 (
171 transactions,
172 Box::new(move |block_ref: BlockRef| {
173 let mut block_status_subscribers = block_status_subscribers.lock();
174
175 for (ack, tx_indices) in acks {
176 let (status_tx, status_rx) = oneshot::channel();
177
178 block_status_subscribers
179 .entry(block_ref)
180 .or_default()
181 .push(status_tx);
182
183 let _ = ack.send((block_ref, tx_indices, status_rx));
184 }
185 }),
186 limit_reached,
187 )
188 }
189
190 pub(crate) fn notify_own_blocks_status(
194 &self,
195 committed_blocks: Vec<BlockRef>,
196 gc_round: Round,
197 ) {
198 let mut block_status_subscribers = self.block_status_subscribers.lock();
200 for block_ref in committed_blocks {
201 if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
202 subscribers.into_iter().for_each(|s| {
203 let _ = s.send(BlockStatus::Sequenced(block_ref));
204 });
205 }
206 }
207
208 while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
210 if block_ref.round <= gc_round {
211 subscribers.into_iter().for_each(|s| {
212 let _ = s.send(BlockStatus::GarbageCollected(block_ref));
213 });
214 } else {
215 block_status_subscribers.insert(block_ref, subscribers);
216 break;
217 }
218 }
219 }
220
221 #[cfg(test)]
222 pub(crate) fn subscribe_for_block_status_testing(
223 &self,
224 block_ref: BlockRef,
225 ) -> oneshot::Receiver<BlockStatus> {
226 let (tx, rx) = oneshot::channel();
227 let mut block_status_subscribers = self.block_status_subscribers.lock();
228 block_status_subscribers
229 .entry(block_ref)
230 .or_default()
231 .push(tx);
232 rx
233 }
234
235 #[cfg(test)]
236 fn is_empty(&mut self) -> bool {
237 if self.pending_transactions.is_some() {
238 return false;
239 }
240 if let Ok(t) = self.tx_receiver.try_recv() {
241 self.pending_transactions = Some(t);
242 return false;
243 }
244 true
245 }
246}
247
248#[derive(Clone)]
249pub struct TransactionClient {
250 context: Arc<Context>,
251 sender: Sender<TransactionsGuard>,
252 max_transaction_size: u64,
253 max_transactions_in_block_bytes: u64,
254 max_transactions_in_block_count: u64,
255}
256
257#[derive(Debug, Error)]
258pub enum ClientError {
259 #[error("Failed to submit transaction, consensus is shutting down: {0}")]
260 ConsensusShuttingDown(String),
261
262 #[error("Transaction size ({0}B) is over limit ({1}B)")]
263 OversizedTransaction(u64, u64),
264
265 #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
266 OversizedTransactionBundleBytes(u64, u64),
267
268 #[error("Transaction bundle count ({0}) is over limit ({1})")]
269 OversizedTransactionBundleCount(u64, u64),
270}
271
272impl TransactionClient {
273 pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
274 Self::new_with_max_pending_transactions(context, MAX_PENDING_TRANSACTIONS)
275 }
276
277 fn new_with_max_pending_transactions(
278 context: Arc<Context>,
279 max_pending_transactions: usize,
280 ) -> (Self, Receiver<TransactionsGuard>) {
281 let (sender, receiver) = channel("consensus_input", max_pending_transactions);
282 (
283 Self {
284 sender,
285 max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
286
287 max_transactions_in_block_bytes: context
288 .protocol_config
289 .max_transactions_in_block_bytes(),
290 max_transactions_in_block_count: context
291 .protocol_config
292 .max_num_transactions_in_block(),
293 context: context.clone(),
294 },
295 receiver,
296 )
297 }
298
299 pub fn epoch(&self) -> Epoch {
301 self.context.committee.epoch()
302 }
303
304 pub async fn submit(
311 &self,
312 transactions: Vec<Vec<u8>>,
313 ) -> Result<
314 (
315 BlockRef,
316 Vec<TransactionIndex>,
317 oneshot::Receiver<BlockStatus>,
318 ),
319 ClientError,
320 > {
321 let included_in_block = self.submit_no_wait(transactions).await?;
322 included_in_block
323 .await
324 .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
325 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
326 }
327
328 pub(crate) async fn submit_no_wait(
338 &self,
339 transactions: Vec<Vec<u8>>,
340 ) -> Result<
341 oneshot::Receiver<(
342 BlockRef,
343 Vec<TransactionIndex>,
344 oneshot::Receiver<BlockStatus>,
345 )>,
346 ClientError,
347 > {
348 let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
349
350 let mut bundle_size = 0;
351
352 if transactions.len() as u64 > self.max_transactions_in_block_count {
353 return Err(ClientError::OversizedTransactionBundleCount(
354 transactions.len() as u64,
355 self.max_transactions_in_block_count,
356 ));
357 }
358
359 for transaction in &transactions {
360 if transaction.len() as u64 > self.max_transaction_size {
361 return Err(ClientError::OversizedTransaction(
362 transaction.len() as u64,
363 self.max_transaction_size,
364 ));
365 }
366 bundle_size += transaction.len() as u64;
367
368 if bundle_size > self.max_transactions_in_block_bytes {
369 return Err(ClientError::OversizedTransactionBundleBytes(
370 bundle_size,
371 self.max_transactions_in_block_bytes,
372 ));
373 }
374 }
375
376 let t = TransactionsGuard {
377 transactions: transactions.into_iter().map(Transaction::new).collect(),
378 included_in_block_ack: included_in_block_ack_send,
379 };
380 self.sender
381 .send(t)
382 .await
383 .tap_err(|e| error!("Submit transactions failed with {:?}", e))
384 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
385 Ok(included_in_block_ack_receive)
386 }
387}
388
389pub trait TransactionVerifier: Send + Sync + 'static {
392 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
395
396 fn verify_and_vote_batch(
405 &self,
406 block_ref: &BlockRef,
407 batch: &[&[u8]],
408 ) -> Result<Vec<TransactionIndex>, ValidationError>;
409}
410
411#[derive(Debug, Error)]
412pub enum ValidationError {
413 #[error("Invalid transaction: {0}")]
414 InvalidTransaction(String),
415}
416
417#[cfg(any(test, msim))]
419pub struct NoopTransactionVerifier;
420
421#[cfg(any(test, msim))]
422impl TransactionVerifier for NoopTransactionVerifier {
423 fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
424 Ok(())
425 }
426
427 fn verify_and_vote_batch(
428 &self,
429 _block_ref: &BlockRef,
430 _batch: &[&[u8]],
431 ) -> Result<Vec<TransactionIndex>, ValidationError> {
432 Ok(vec![])
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::{sync::Arc, time::Duration};
439
440 use consensus_config::AuthorityIndex;
441 use consensus_types::block::{
442 BlockDigest, BlockRef, NUM_RESERVED_TRANSACTION_INDICES, PING_TRANSACTION_INDEX,
443 TransactionIndex,
444 };
445 use futures::{StreamExt, stream::FuturesUnordered};
446 use tokio::time::timeout;
447
448 use crate::transaction::NoopTransactionVerifier;
449 use crate::{
450 block_verifier::SignedBlockVerifier,
451 context::Context,
452 transaction::{BlockStatus, LimitReached, TransactionClient, TransactionConsumer},
453 };
454
455 #[tokio::test(flavor = "current_thread", start_paused = true)]
456 async fn basic_submit_and_consume() {
457 let (mut context, _) = Context::new_for_test(4);
458 context
459 .protocol_config
460 .set_max_transaction_size_bytes_for_testing(2_000); context
462 .protocol_config
463 .set_max_transactions_in_block_bytes_for_testing(2_000);
464 let context = Arc::new(context);
465 let (client, tx_receiver) = TransactionClient::new(context.clone());
466 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
467
468 let mut included_in_block_waiters = FuturesUnordered::new();
470 for i in 0..3 {
471 let transaction =
472 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
473 let w = client
474 .submit_no_wait(vec![transaction])
475 .await
476 .expect("Shouldn't submit successfully transaction");
477 included_in_block_waiters.push(w);
478 }
479
480 let (transactions, ack_transactions, _limit_reached) = consumer.next();
482 assert_eq!(transactions.len(), 3);
483
484 for (i, t) in transactions.iter().enumerate() {
485 let t: String = bcs::from_bytes(t.data()).unwrap();
486 assert_eq!(format!("transaction {i}").to_string(), t);
487 }
488
489 assert!(
490 timeout(Duration::from_secs(1), included_in_block_waiters.next())
491 .await
492 .is_err(),
493 "We should expect to timeout as none of the transactions have been acknowledged yet"
494 );
495
496 ack_transactions(BlockRef::MIN);
498
499 while let Some(result) = included_in_block_waiters.next().await {
501 assert!(result.is_ok());
502 }
503
504 assert!(consumer.is_empty());
506 }
507
508 #[tokio::test(flavor = "current_thread", start_paused = true)]
509 async fn block_status_update() {
510 let (mut context, _) = Context::new_for_test(4);
511 context
512 .protocol_config
513 .set_max_transaction_size_bytes_for_testing(2_000); context
515 .protocol_config
516 .set_max_transactions_in_block_bytes_for_testing(2_000);
517 context.protocol_config.set_gc_depth_for_testing(10);
518 let context = Arc::new(context);
519 let (client, tx_receiver) = TransactionClient::new(context.clone());
520 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
521
522 let mut included_in_block_waiters = FuturesUnordered::new();
524 for i in 1..=10 {
525 let transaction =
526 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
527 let w = client
528 .submit_no_wait(vec![transaction])
529 .await
530 .expect("Shouldn't submit successfully transaction");
531 included_in_block_waiters.push(w);
532
533 if i % 2 == 0 {
535 let (transactions, ack_transactions, _limit_reached) = consumer.next();
536 assert_eq!(transactions.len(), 2);
537 ack_transactions(BlockRef::new(
538 i,
539 AuthorityIndex::new_for_test(0),
540 BlockDigest::MIN,
541 ));
542 }
543 }
544
545 let mut transaction_count = 0;
546 let mut block_status_waiters = Vec::new();
548 while let Some(result) = included_in_block_waiters.next().await {
549 let (block_ref, tx_indices, block_status_waiter) =
550 result.expect("Block inclusion waiter shouldn't fail");
551 assert_eq!(tx_indices.len(), 1);
553 assert_eq!(tx_indices[0], transaction_count % 2);
557 transaction_count += 1;
558
559 block_status_waiters.push((block_ref, block_status_waiter));
560 }
561
562 let gc_round = 5;
564 consumer.notify_own_blocks_status(
565 vec![
566 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
567 BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
568 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
569 ],
570 gc_round,
571 );
572
573 for (block_ref, waiter) in block_status_waiters {
575 let block_status = waiter.await.expect("Block status waiter shouldn't fail");
576
577 if block_ref.round <= gc_round {
578 assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
579 } else {
580 assert!(matches!(block_status, BlockStatus::Sequenced(_)));
581 }
582 }
583
584 assert!(consumer.block_status_subscribers.lock().is_empty());
586 }
587
588 #[tokio::test]
589 async fn submit_over_max_fetch_size_and_consume() {
590 let (mut context, _) = Context::new_for_test(4);
591 context
592 .protocol_config
593 .set_max_transaction_size_bytes_for_testing(100);
594 context
595 .protocol_config
596 .set_max_transactions_in_block_bytes_for_testing(100);
597 let context = Arc::new(context);
598 let (client, tx_receiver) = TransactionClient::new(context.clone());
599 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
600
601 for i in 0..10 {
603 let transaction =
604 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
605 let _w = client
606 .submit_no_wait(vec![transaction])
607 .await
608 .expect("Shouldn't submit successfully transaction");
609 }
610
611 let mut all_transactions = Vec::new();
613 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
614 assert_eq!(transactions.len(), 7);
615
616 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
618 assert!(
619 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
620 "Should have fetched transactions up to {}",
621 context.protocol_config.max_transactions_in_block_bytes()
622 );
623 all_transactions.extend(transactions);
624
625 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
627 assert_eq!(transactions.len(), 3);
628
629 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
631 assert!(
632 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
633 "Should have fetched transactions up to {}",
634 context.protocol_config.max_transactions_in_block_bytes()
635 );
636 all_transactions.extend(transactions);
637
638 assert!(consumer.is_empty());
640
641 for (i, t) in all_transactions.iter().enumerate() {
642 let t: String = bcs::from_bytes(t.data()).unwrap();
643 assert_eq!(format!("transaction {i}").to_string(), t);
644 }
645 }
646
647 #[tokio::test]
648 async fn submit_large_batch_and_ack() {
649 let (mut context, _) = Context::new_for_test(4);
650 context
651 .protocol_config
652 .set_max_transaction_size_bytes_for_testing(15);
653 context
654 .protocol_config
655 .set_max_transactions_in_block_bytes_for_testing(200);
656 let context = Arc::new(context);
657 let (client, tx_receiver) = TransactionClient::new(context.clone());
658 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
659 let mut all_receivers = Vec::new();
660 for i in 0..10 {
662 let transaction =
663 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
664 let w = client
665 .submit_no_wait(vec![transaction])
666 .await
667 .expect("Should submit successfully transaction");
668 all_receivers.push(w);
669 }
670
671 {
673 let transactions: Vec<_> = (10..15)
674 .map(|i| {
675 bcs::to_bytes(&format!("transaction {i}"))
676 .expect("Serialization should not fail.")
677 })
678 .collect();
679 let w = client
680 .submit_no_wait(transactions)
681 .await
682 .expect("Should submit successfully transaction");
683 all_receivers.push(w);
684 }
685
686 {
688 let i = 15;
689 let transaction =
690 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
691 let w = client
692 .submit_no_wait(vec![transaction])
693 .await
694 .expect("Shouldn't submit successfully transaction");
695 all_receivers.push(w);
696 }
697
698 {
700 let transactions: Vec<_> = (16..32)
701 .map(|i| {
702 bcs::to_bytes(&format!("transaction {i}"))
703 .expect("Serialization should not fail.")
704 })
705 .collect();
706 let result = client.submit_no_wait(transactions).await.unwrap_err();
707 assert_eq!(
708 result.to_string(),
709 "Transaction bundle size (210B) is over limit (200B)"
710 );
711 }
712
713 let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
716 let mut batch_index = 0;
717 while !consumer.is_empty() {
718 let (transactions, ack_transactions, _limit_reached) = consumer.next();
719
720 assert!(
721 transactions.len() as u64
722 <= context.protocol_config.max_num_transactions_in_block(),
723 "Should have fetched transactions up to {}",
724 context.protocol_config.max_num_transactions_in_block()
725 );
726
727 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
728 assert!(
729 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
730 "Should have fetched transactions up to {}",
731 context.protocol_config.max_transactions_in_block_bytes()
732 );
733
734 if batch_index == 0 {
736 assert_eq!(transactions.len(), 10);
737 for (i, transaction) in transactions.iter().enumerate() {
738 let t: String = bcs::from_bytes(transaction.data()).unwrap();
739 assert_eq!(format!("transaction {}", i).to_string(), t);
740 }
741 } else if batch_index == 1 {
743 assert_eq!(transactions.len(), 6);
744 for (i, transaction) in transactions.iter().enumerate() {
745 let t: String = bcs::from_bytes(transaction.data()).unwrap();
746 assert_eq!(format!("transaction {}", i + 10).to_string(), t);
747 }
748 } else {
749 panic!("Unexpected batch index");
750 }
751
752 batch_index += 1;
753
754 all_acks.push(ack_transactions);
755 }
756
757 for ack in all_acks {
759 ack(BlockRef::MIN);
760 }
761
762 for w in all_receivers {
764 let r = w.await;
765 assert!(r.is_ok());
766 }
767 }
768
769 #[tokio::test]
770 async fn test_submit_over_max_block_size_and_validate_block_size() {
771 {
773 let (mut context, _) = Context::new_for_test(4);
774 context
775 .protocol_config
776 .set_max_transaction_size_bytes_for_testing(100);
777 context
778 .protocol_config
779 .set_max_num_transactions_in_block_for_testing(10);
780 context
781 .protocol_config
782 .set_max_transactions_in_block_bytes_for_testing(300);
783 let context = Arc::new(context);
784 let (client, tx_receiver) = TransactionClient::new(context.clone());
785 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
786 let mut all_receivers = Vec::new();
787
788 let max_num_transactions_in_block =
790 context.protocol_config.max_num_transactions_in_block();
791 for i in 0..2 * max_num_transactions_in_block {
792 let transaction = bcs::to_bytes(&format!("transaction {i}"))
793 .expect("Serialization should not fail.");
794 let w = client
795 .submit_no_wait(vec![transaction])
796 .await
797 .expect("Should submit successfully transaction");
798 all_receivers.push(w);
799 }
800
801 let (transactions, _ack_transactions, limit) = consumer.next();
803 assert_eq!(limit, LimitReached::MaxNumOfTransactions);
804 assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
805
806 let block_verifier =
808 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
809
810 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
811 assert!(
812 block_verifier.check_transactions(&batch).is_ok(),
813 "Number of transactions limit verification failed"
814 );
815 }
816
817 {
819 let (mut context, _) = Context::new_for_test(4);
820 context
821 .protocol_config
822 .set_max_transaction_size_bytes_for_testing(100);
823 context
824 .protocol_config
825 .set_max_num_transactions_in_block_for_testing(1_000);
826 context
827 .protocol_config
828 .set_max_transactions_in_block_bytes_for_testing(300);
829 let context = Arc::new(context);
830 let (client, tx_receiver) = TransactionClient::new(context.clone());
831 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
832 let mut all_receivers = Vec::new();
833
834 let max_transactions_in_block_bytes =
835 context.protocol_config.max_transactions_in_block_bytes();
836 let mut total_size = 0;
837 loop {
838 let transaction = bcs::to_bytes(&"transaction".to_string())
839 .expect("Serialization should not fail.");
840 total_size += transaction.len() as u64;
841 let w = client
842 .submit_no_wait(vec![transaction])
843 .await
844 .expect("Should submit successfully transaction");
845 all_receivers.push(w);
846
847 if total_size >= 2 * max_transactions_in_block_bytes {
849 break;
850 }
851 }
852
853 let (transactions, _ack_transactions, limit) = consumer.next();
855 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
856 let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
857
858 assert_eq!(limit, LimitReached::MaxBytes);
859 assert!(
860 batch.len() < context.protocol_config.max_num_transactions_in_block() as usize,
861 "Should have submitted less than the max number of transactions in a block"
862 );
863 assert!(size <= max_transactions_in_block_bytes);
864
865 let block_verifier =
867 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
868
869 assert!(
870 block_verifier.check_transactions(&batch).is_ok(),
871 "Total size of transactions limit verification failed"
872 );
873 }
874 }
875
876 #[tokio::test]
878 async fn submit_with_no_transactions() {
879 let (mut context, _) = Context::new_for_test(4);
880 context
881 .protocol_config
882 .set_max_transaction_size_bytes_for_testing(15);
883 context
884 .protocol_config
885 .set_max_transactions_in_block_bytes_for_testing(200);
886 let context = Arc::new(context);
887 let (client, tx_receiver) = TransactionClient::new(context.clone());
888 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
889
890 let w_no_transactions = client
891 .submit_no_wait(vec![])
892 .await
893 .expect("Should submit successfully empty array of transactions");
894
895 let transaction =
896 bcs::to_bytes(&"transaction".to_string()).expect("Serialization should not fail.");
897 let w_with_transactions = client
898 .submit_no_wait(vec![transaction])
899 .await
900 .expect("Should submit successfully transaction");
901
902 let (transactions, ack_transactions, _limit_reached) = consumer.next();
903 assert_eq!(transactions.len(), 1);
904
905 ack_transactions(BlockRef::MIN);
907
908 {
909 let r = w_no_transactions.await;
910 let (block_ref, indices, _status) = r.unwrap();
911 assert_eq!(block_ref, BlockRef::MIN);
912 assert_eq!(indices, vec![PING_TRANSACTION_INDEX]);
913 }
914
915 {
916 let r = w_with_transactions.await;
917 let (block_ref, indices, _status) = r.unwrap();
918 assert_eq!(block_ref, BlockRef::MIN);
919 assert_eq!(indices, vec![0]);
920 }
921 }
922
923 #[tokio::test]
924 async fn ping_transaction_index_never_reached() {
925 static MAX_NUM_TRANSACTIONS_IN_BLOCK: u64 =
927 (TransactionIndex::MAX - NUM_RESERVED_TRANSACTION_INDICES) as u64;
928
929 static MAX_PENDING_TRANSACTIONS: usize = 2 * MAX_NUM_TRANSACTIONS_IN_BLOCK as usize;
931
932 let (mut context, _) = Context::new_for_test(4);
933 context
934 .protocol_config
935 .set_max_transaction_size_bytes_for_testing(200_000);
936 context
937 .protocol_config
938 .set_max_transactions_in_block_bytes_for_testing(1_000_000);
939 context
940 .protocol_config
941 .set_max_num_transactions_in_block_for_testing(MAX_NUM_TRANSACTIONS_IN_BLOCK);
942 let context = Arc::new(context);
943 let (client, tx_receiver) = TransactionClient::new_with_max_pending_transactions(
944 context.clone(),
945 MAX_PENDING_TRANSACTIONS,
946 );
947 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
948
949 for i in 0..MAX_NUM_TRANSACTIONS_IN_BLOCK + 10 {
951 println!("Submitting transaction {i}");
952 let transaction =
953 bcs::to_bytes(&format!("t {i}")).expect("Serialization should not fail.");
954 let _w = client
955 .submit_no_wait(vec![transaction])
956 .await
957 .expect("Shouldn't submit successfully transaction");
958 }
959
960 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
962 assert_eq!(transactions.len() as u64, MAX_NUM_TRANSACTIONS_IN_BLOCK);
963
964 let t: String = bcs::from_bytes(transactions.last().unwrap().data()).unwrap();
965 assert_eq!(
966 t,
967 format!(
968 "t {}",
969 PING_TRANSACTION_INDEX - NUM_RESERVED_TRANSACTION_INDICES - 1
970 )
971 );
972 }
973}