TrySpawnStreamExt

Trait TrySpawnStreamExt 

Source
pub trait TrySpawnStreamExt: Stream {
    // Required methods
    fn try_for_each_spawned<Fut, F, E>(
        self,
        limit: impl Into<Option<usize>>,
        f: F,
    ) -> impl Future<Output = Result<(), E>>
       where Fut: Future<Output = Result<(), E>> + Send + 'static,
             F: FnMut(Self::Item) -> Fut,
             E: Send + 'static;
    fn try_for_each_send_spawned<Fut, F, T, E, R>(
        self,
        config: ConcurrencyConfig,
        f: F,
        tx: Sender<T>,
        report: R,
    ) -> impl Future<Output = Result<(), Break<E>>>
       where Fut: Future<Output = Result<T, Break<E>>> + Send + 'static,
             F: FnMut(Self::Item) -> Fut,
             T: Send + 'static,
             E: Send + 'static,
             R: Fn(ConcurrencyStats);
    fn try_for_each_broadcast_spawned<Fut, F, T, E, R>(
        self,
        config: ConcurrencyConfig,
        f: F,
        txs: Vec<Sender<T>>,
        report: R,
    ) -> impl Future<Output = Result<(), Break<E>>>
       where Fut: Future<Output = Result<T, Break<E>>> + Send + 'static,
             F: FnMut(Self::Item) -> Fut,
             T: Clone + Send + Sync + 'static,
             E: Send + 'static,
             R: Fn(ConcurrencyStats);
}
Expand description

Extension trait introducing try_for_each_spawned to all streams.

Required Methods§

Source

fn try_for_each_spawned<Fut, F, E>( self, limit: impl Into<Option<usize>>, f: F, ) -> impl Future<Output = Result<(), E>>
where Fut: Future<Output = Result<(), E>> + Send + 'static, F: FnMut(Self::Item) -> Fut, E: Send + 'static,

Attempts to run this stream to completion, executing the provided asynchronous closure on each element from the stream as elements become available.

This is similar to [futures::stream::StreamExt::for_each_concurrent], but it may take advantage of any parallelism available in the underlying runtime, because each unit of work is spawned as its own tokio task.

The first argument is an optional limit on the number of tasks to spawn concurrently. Values of 0 and None are interpreted as no limit, and any other value will result in no more than that many tasks being spawned at one time.

§Safety

This function will panic if any of its futures panics, will return early with success if the runtime it is running on is cancelled, and will return early with an error propagated from any worker that produces an error.

Source

fn try_for_each_send_spawned<Fut, F, T, E, R>( self, config: ConcurrencyConfig, f: F, tx: Sender<T>, report: R, ) -> impl Future<Output = Result<(), Break<E>>>
where Fut: Future<Output = Result<T, Break<E>>> + Send + 'static, F: FnMut(Self::Item) -> Fut, T: Send + 'static, E: Send + 'static, R: Fn(ConcurrencyStats),

Process each stream item through a spawned task, sending results to a single channel.

Each item is passed to f which returns a future producing Result<T, Break<E>>. The resulting T is sent to tx. Concurrency is controlled by config: for fixed configs, the limit never changes; for adaptive configs, the limit adjusts based on the fill fraction of the output channel.

Unlike try_for_each_broadcast_spawned, T does not need to be Clone since there is only a single receiver.

The report callback is invoked each iteration with concurrency stats for metrics.

Source

fn try_for_each_broadcast_spawned<Fut, F, T, E, R>( self, config: ConcurrencyConfig, f: F, txs: Vec<Sender<T>>, report: R, ) -> impl Future<Output = Result<(), Break<E>>>
where Fut: Future<Output = Result<T, Break<E>>> + Send + 'static, F: FnMut(Self::Item) -> Fut, T: Clone + Send + Sync + 'static, E: Send + 'static, R: Fn(ConcurrencyStats),

Process each stream item through a spawned task, broadcasting results to multiple channels.

Same as try_for_each_send_spawned but sends a clone of each result to every channel in txs. Fill fraction is measured as the maximum across all channels. Requires T: Clone since values are cloned to each receiver.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<S: Stream + Sized + 'static> TrySpawnStreamExt for S