sui_authority_aggregation/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use futures::Future;
5use futures::{StreamExt, future::BoxFuture, stream::FuturesUnordered};
6use mysten_metrics::monitored_future;
7
8use std::collections::{BTreeMap, BTreeSet};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use sui_types::base_types::ConciseableName;
12use sui_types::committee::{CommitteeTrait, StakeUnit};
13
14use tokio::time::timeout;
15
16pub type AsyncResult<'a, T, E> = BoxFuture<'a, Result<T, E>>;
17
18pub struct SigRequestPrefs<K> {
19    pub ordering_pref: BTreeSet<K>,
20    pub prefetch_timeout: Duration,
21}
22
23pub enum ReduceOutput<R, S> {
24    Continue(S),
25    Failed(S),
26    Success(R),
27}
28
29/// This function takes an initial state, than executes an asynchronous function (FMap) for each
30/// authority, and folds the results as they become available into the state using an async function (FReduce).
31///
32/// prefetch_timeout: the minimum amount of time to spend trying to gather results from all authorities
33/// before falling back to arrival order.
34///
35/// total_timeout: the maximum amount of total time to wait for results from all authorities, including
36/// time spent prefetching.
37pub async fn quorum_map_then_reduce_with_timeout_and_prefs<
38    'a,
39    C,
40    K,
41    Client: 'a,
42    S,
43    V,
44    R,
45    E,
46    FMap,
47    FReduce,
48>(
49    committee: Arc<C>,
50    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
51    authority_preferences: Option<SigRequestPrefs<K>>,
52    initial_state: S,
53    map_each_authority: FMap,
54    reduce_result: FReduce,
55    total_timeout: Duration,
56) -> Result<
57    (
58        R,
59        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
60    ),
61    S,
62>
63where
64    K: Ord + ConciseableName<'a> + Clone + 'a,
65    C: CommitteeTrait<K>,
66    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
67    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>>,
68{
69    let (preference, prefetch_timeout) = if let Some(SigRequestPrefs {
70        ordering_pref,
71        prefetch_timeout,
72    }) = authority_preferences
73    {
74        (Some(ordering_pref), Some(prefetch_timeout))
75    } else {
76        (None, None)
77    };
78    let authorities_shuffled = committee.shuffle_by_stake(preference.as_ref(), None);
79    let mut accumulated_state = initial_state;
80    let mut total_timeout = total_timeout;
81
82    // First, execute in parallel for each authority FMap.
83    let mut responses: futures::stream::FuturesUnordered<_> = authorities_shuffled
84        .clone()
85        .into_iter()
86        .map(|name| {
87            let client = authority_clients[&name].clone();
88            let execute = map_each_authority.clone();
89            monitored_future!(async move { (name.clone(), execute(name, client).await,) })
90        })
91        .collect();
92    if let Some(prefetch_timeout) = prefetch_timeout {
93        let elapsed = Instant::now();
94        let prefetch_sleep = tokio::time::sleep(prefetch_timeout);
95        let mut authority_to_result: BTreeMap<K, Result<V, E>> = BTreeMap::new();
96        tokio::pin!(prefetch_sleep);
97        // get all the sigs we can within prefetch_timeout
98        loop {
99            tokio::select! {
100                resp = responses.next() => {
101                    match resp {
102                        Some((authority_name, result)) => {
103                            authority_to_result.insert(authority_name, result);
104                        }
105                        None => {
106                            // we have processed responses from the full committee so can stop early
107                            break;
108                        }
109                    }
110                }
111                _ = &mut prefetch_sleep => {
112                    break;
113                }
114            }
115        }
116        // process what we have up to this point
117        for authority_name in authorities_shuffled {
118            let authority_weight = committee.weight(&authority_name);
119            if let Some(result) = authority_to_result.remove(&authority_name) {
120                accumulated_state = match reduce_result(
121                    accumulated_state,
122                    authority_name,
123                    authority_weight,
124                    result,
125                )
126                .await
127                {
128                    // In the first two cases we are told to continue the iteration.
129                    ReduceOutput::Continue(state) => state,
130                    ReduceOutput::Failed(state) => {
131                        return Err(state);
132                    }
133                    ReduceOutput::Success(result) => {
134                        // The reducer tells us that we have the result needed. Just return it.
135                        return Ok((result, responses));
136                    }
137                };
138            }
139        }
140        // if we got here, fallback through the if statement to continue in arrival order on
141        // the remaining validators
142        total_timeout = total_timeout.saturating_sub(elapsed.elapsed());
143    }
144
145    // As results become available fold them into the state using FReduce.
146    while let Ok(Some((authority_name, result))) = timeout(total_timeout, responses.next()).await {
147        let authority_weight = committee.weight(&authority_name);
148        accumulated_state =
149            match reduce_result(accumulated_state, authority_name, authority_weight, result).await {
150                // In the first two cases we are told to continue the iteration.
151                ReduceOutput::Continue(state) => state,
152                ReduceOutput::Failed(state) => {
153                    return Err(state);
154                }
155                ReduceOutput::Success(result) => {
156                    // The reducer tells us that we have the result needed. Just return it.
157                    return Ok((result, responses));
158                }
159            }
160    }
161    // If we have exhausted all authorities and still have not returned a result, return
162    // error with the accumulated state.
163    Err(accumulated_state)
164}
165
166/// This function takes an initial state, than executes an asynchronous function (FMap) for each
167/// authority, and folds the results as they become available into the state using an async function (FReduce).
168///
169/// FMap can do io, and returns a result V. An error there may not be fatal, and could be consumed by the
170/// MReduce function to overall recover from it. This is necessary to ensure byzantine authorities cannot
171/// interrupt the logic of this function.
172///
173/// FReduce returns a result to a ReduceOutput. If the result is Err the function
174/// shortcuts and the Err is returned. An Ok ReduceOutput result can be used to shortcut and return
175/// the resulting state (ReduceOutput::End), continue the folding as new states arrive (ReduceOutput::Continue).
176///
177/// This function provides a flexible way to communicate with a quorum of authorities, processing and
178/// processing their results into a safe overall result, and also safely allowing operations to continue
179/// past the quorum to ensure all authorities are up to date (up to a timeout).
180pub async fn quorum_map_then_reduce_with_timeout<
181    'a,
182    C,
183    K,
184    Client: 'a,
185    S: 'a,
186    V: 'a,
187    R: 'a,
188    E,
189    FMap,
190    FReduce,
191>(
192    committee: Arc<C>,
193    authority_clients: Arc<BTreeMap<K, Arc<Client>>>,
194    // The initial state that will be used to fold in values from authorities.
195    initial_state: S,
196    // The async function used to apply to each authority. It takes an authority name,
197    // and authority client parameter and returns a Result<V>.
198    map_each_authority: FMap,
199    // The async function that takes an accumulated state, and a new result for V from an
200    // authority and returns a result to a ReduceOutput state.
201    reduce_result: FReduce,
202    // The initial timeout applied to all
203    initial_timeout: Duration,
204) -> Result<
205    (
206        R,
207        FuturesUnordered<impl Future<Output = (K, Result<V, E>)> + 'a>,
208    ),
209    S,
210>
211where
212    K: Ord + ConciseableName<'a> + Clone + 'a,
213    C: CommitteeTrait<K>,
214    FMap: FnOnce(K, Arc<Client>) -> AsyncResult<'a, V, E> + Clone + 'a,
215    FReduce: Fn(S, K, StakeUnit, Result<V, E>) -> BoxFuture<'a, ReduceOutput<R, S>> + 'a,
216{
217    quorum_map_then_reduce_with_timeout_and_prefs(
218        committee,
219        authority_clients,
220        None,
221        initial_state,
222        map_each_authority,
223        reduce_result,
224        initial_timeout,
225    )
226    .await
227}