1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crypto::PublicKey;
use fastcrypto::Hash;
use rand::{prelude::SliceRandom as _, rngs::SmallRng};
use std::collections::HashMap;

#[derive(Clone)]
pub struct Peer<Value: Hash + Clone> {
    pub name: PublicKey,

    /// Those are the values that we got from the peer and that is able
    /// to serve.
    pub values_able_to_serve: HashMap<<Value as Hash>::TypedDigest, Value>,

    /// Those are the assigned values after a re-balancing event
    assigned_values: HashMap<<Value as Hash>::TypedDigest, Value>,
}

impl<Value: Hash + Clone> Peer<Value> {
    pub fn new(name: PublicKey, values_able_to_serve: Vec<Value>) -> Self {
        let certs: HashMap<<Value as fastcrypto::Hash>::TypedDigest, Value> = values_able_to_serve
            .into_iter()
            .map(|c| (c.digest(), c))
            .collect();

        Peer {
            name,
            values_able_to_serve: certs,
            assigned_values: HashMap::new(),
        }
    }

    pub fn assign_values(&mut self, certificate: Value) {
        self.assigned_values
            .insert(certificate.digest(), certificate);
    }

    pub fn assigned_values(&self) -> Vec<Value> {
        self.assigned_values.values().cloned().collect()
    }
}

/// A helper structure to allow us store the peer result values
/// and redistribute the common ones between them evenly.
/// The implementation is NOT considered thread safe. Especially
/// the re-balancing process is not guaranteed to be atomic and
/// thread safe which could lead to potential issues if used in
/// such environment.
pub struct Peers<Value: Hash + Clone> {
    /// A map with all the peers assigned on this pool.
    peers: HashMap<PublicKey, Peer<Value>>,

    /// When true, it means that the values have been assigned to peers and no
    /// more mutating operations can be applied
    rebalanced: bool,

    /// Keeps all the unique values in the map so we don't
    /// have to recompute every time they are needed by
    /// iterating over the peers.
    unique_values: HashMap<<Value as Hash>::TypedDigest, Value>,

    /// An rng used to shuffle the list of peers
    rng: SmallRng,
}

impl<Value: Hash + Clone> Peers<Value> {
    pub fn new(rng: SmallRng) -> Self {
        Self {
            peers: HashMap::new(),
            unique_values: HashMap::new(),
            rebalanced: false,
            rng,
        }
    }

    #[allow(clippy::mutable_key_type)]
    pub fn peers(&self) -> &HashMap<PublicKey, Peer<Value>> {
        &self.peers
    }

    pub fn unique_value_count(&self) -> usize {
        self.unique_values.len()
    }

    pub fn unique_values(&self) -> Vec<Value> {
        self.unique_values.values().cloned().collect()
    }

    pub fn contains_peer(&mut self, name: &PublicKey) -> bool {
        self.peers.contains_key(name)
    }

    pub fn add_peer(&mut self, name: PublicKey, available_values: Vec<Value>) {
        self.ensure_not_rebalanced();

        // update the unique values
        for value in &available_values {
            self.unique_values.insert(value.digest(), value.to_owned());
        }

        self.peers
            .insert(name.clone(), Peer::new(name, available_values));
    }

    /// Re-distributes the values to the peers in a load balanced manner.
    /// We expect to have duplicates across the peers. The goal is in the end
    /// for each peer to have a unique list of values and those lists to
    /// not differ significantly in length, so we balance the load.
    /// Once the peers are rebalanced, then no other operation that mutates
    /// the struct is allowed.
    pub fn rebalance_values(&mut self) {
        self.ensure_not_rebalanced();

        let values = self.unique_values();

        for v in values {
            self.reassign_value(v);
        }

        self.rebalanced = true;
    }

    fn reassign_value(&mut self, value: Value) {
        let id = value.digest();
        let mut peer = self.peer_to_assign_value(id);

        peer.assign_values(value);

        self.peers.insert(peer.name.clone(), peer);

        self.delete_values_from_peers(id);
    }

    /// Finds a peer to assign the value identified by the provided `id`.
    /// This method will perform two operations:
    /// 1) Will filter only the peers that value dictated by the
    /// provided `value_id`
    /// 2) Will pick a peer in random to assign the value to
    fn peer_to_assign_value(&mut self, value_id: <Value as Hash>::TypedDigest) -> Peer<Value> {
        // step 1 - find the peers who have this id
        let peers_with_value: Vec<Peer<Value>> = self
            .peers
            .iter()
            .filter(|p| p.1.values_able_to_serve.contains_key(&value_id))
            .map(|p| p.1.clone())
            .collect();

        // step 2 - pick at random a peer to assign the value.
        // For now we consider this good enough and we avoid doing any
        // explicit client-side load balancing as this should be tackled
        // on the server-side via demand control.
        if let Some(peer) = peers_with_value.choose(&mut self.rng) {
            peer.to_owned()
        } else {
            panic!("At least one peer should be available when trying to assign a value!");
        }
    }

    // Deletes the value identified by the provided id from the list of
    // available values from all the peers.
    fn delete_values_from_peers(&mut self, id: <Value as Hash>::TypedDigest) {
        for (_, peer) in self.peers.iter_mut() {
            peer.values_able_to_serve.remove(&id);
        }
    }

    fn ensure_not_rebalanced(&mut self) {
        debug_assert!(
            !self.rebalanced,
            "rebalance has been called, this operation is not allowed"
        );
    }
}

#[cfg(test)]
mod tests {
    use crate::block_synchronizer::peers::Peers;
    use blake2::{digest::Update, VarBlake2b};
    use crypto::KeyPair;
    use fastcrypto::{traits::KeyPair as _, Digest, Hash, DIGEST_LEN};
    use rand::{
        rngs::{SmallRng, StdRng},
        SeedableRng,
    };
    use std::{
        borrow::Borrow,
        collections::{HashMap, HashSet},
        fmt,
    };

    #[test]
    fn test_assign_certificates_to_peers_when_all_respond() {
        struct TestCase {
            num_of_certificates: u8,
            num_of_peers: u8,
        }

        let test_cases: Vec<TestCase> = vec![
            TestCase {
                num_of_certificates: 5,
                num_of_peers: 4,
            },
            TestCase {
                num_of_certificates: 8,
                num_of_peers: 2,
            },
            TestCase {
                num_of_certificates: 3,
                num_of_peers: 2,
            },
            TestCase {
                num_of_certificates: 20,
                num_of_peers: 5,
            },
            TestCase {
                num_of_certificates: 10,
                num_of_peers: 1,
            },
        ];

        for test in test_cases {
            println!(
                "Testing case where num_of_certificates={} , num_of_peers={}",
                test.num_of_certificates, test.num_of_peers
            );
            let mut mock_certificates = Vec::new();

            for i in 0..test.num_of_certificates {
                mock_certificates.push(MockCertificate(i));
            }

            let mut rng = StdRng::from_seed([0; 32]);

            let mut peers = Peers::<MockCertificate>::new(SmallRng::from_entropy());

            for _ in 0..test.num_of_peers {
                let key_pair = KeyPair::generate(&mut rng);
                peers.add_peer(key_pair.public().clone(), mock_certificates.clone());
            }

            // WHEN
            peers.rebalance_values();

            // THEN
            assert_eq!(peers.peers.len() as u8, test.num_of_peers);

            // The certificates should be balanced to the peers.
            let mut seen_certificates = HashSet::new();

            for peer in peers.peers().values() {
                for c in peer.assigned_values() {
                    assert!(
                        seen_certificates.insert(c.digest()),
                        "Certificate already assigned to another peer"
                    );
                }
            }

            // ensure that all the initial certificates have been assigned
            assert_eq!(
                seen_certificates.len(),
                mock_certificates.len(),
                "Returned certificates != Expected certificates"
            );

            for c in mock_certificates {
                assert!(
                    seen_certificates.contains(&c.digest()),
                    "Expected certificate not found in set of returned ones"
                );
            }
        }
    }

    #[allow(clippy::mutable_key_type)]
    #[test]
    fn test_assign_certificates_to_peers_when_all_respond_uniquely() {
        struct TestCase {
            num_of_certificates_each_peer: u8,
            num_of_peers: u8,
        }

        let test_cases: Vec<TestCase> = vec![
            TestCase {
                num_of_certificates_each_peer: 5,
                num_of_peers: 4,
            },
            TestCase {
                num_of_certificates_each_peer: 8,
                num_of_peers: 2,
            },
            TestCase {
                num_of_certificates_each_peer: 3,
                num_of_peers: 2,
            },
            TestCase {
                num_of_certificates_each_peer: 20,
                num_of_peers: 5,
            },
            TestCase {
                num_of_certificates_each_peer: 10,
                num_of_peers: 1,
            },
            TestCase {
                num_of_certificates_each_peer: 0,
                num_of_peers: 4,
            },
        ];

        for test in test_cases {
            println!(
                "Testing case where num_of_certificates_each_peer={} , num_of_peers={}",
                test.num_of_certificates_each_peer, test.num_of_peers
            );
            let mut mock_certificates_by_peer = HashMap::new();

            let mut rng = StdRng::from_seed([0; 32]);

            let mut peers = Peers::<MockCertificate>::new(SmallRng::from_entropy());

            for peer_index in 0..test.num_of_peers {
                let key_pair = KeyPair::generate(&mut rng);
                let peer_name = key_pair.public().clone();
                let mut mock_certificates = Vec::new();

                for i in 0..test.num_of_certificates_each_peer {
                    mock_certificates.push(MockCertificate(
                        i + (peer_index * test.num_of_certificates_each_peer),
                    ));
                }

                peers.add_peer(peer_name.clone(), mock_certificates.clone());

                mock_certificates_by_peer.insert(peer_name, mock_certificates.clone());
            }

            // WHEN
            peers.rebalance_values();

            // THEN
            assert_eq!(peers.peers().len() as u8, test.num_of_peers);

            // The certificates should be balanced to the peers.
            let mut seen_certificates = HashSet::new();

            for peer in peers.peers().values() {
                // we want to ensure that a peer has got at least a certificate
                let peer_certs = mock_certificates_by_peer.get(&peer.name).unwrap();
                assert_eq!(
                    peer.assigned_values().len(),
                    peer_certs.len(),
                    "Expected peer to have been assigned the required certificates"
                );

                for c in peer.assigned_values() {
                    let found = peer_certs.iter().any(|c| c.digest().eq(&c.digest()));

                    assert!(found, "Assigned certificate not in set of expected");
                    assert!(
                        seen_certificates.insert(c.digest()),
                        "Certificate already assigned to another peer"
                    );
                }
            }
        }
    }

    // The mock certificate structure we'll use for our tests
    // It's easier to debug since the value is a u8 which can
    // be easily understood, print etc.
    #[derive(Clone)]
    struct MockCertificate(u8);

    #[derive(Clone, Copy, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
    pub struct MockDigest([u8; DIGEST_LEN]);

    impl From<MockDigest> for Digest {
        fn from(hd: MockDigest) -> Self {
            Digest::new(hd.0)
        }
    }

    impl fmt::Debug for MockDigest {
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
            write!(f, "{}", base64::encode(self.0))
        }
    }

    impl fmt::Display for MockDigest {
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
            write!(f, "{}", base64::encode(self.0).get(0..16).unwrap())
        }
    }

    impl Hash for MockCertificate {
        type TypedDigest = MockDigest;

        fn digest(&self) -> MockDigest {
            let v = self.0.borrow();

            let hasher_update = |hasher: &mut VarBlake2b| {
                hasher.update([*v].as_ref());
            };

            MockDigest(fastcrypto::blake2b_256(hasher_update))
        }
    }
}