sui_rpc/client/v2/
lists.rs

1use futures::stream;
2use futures::stream::Stream;
3
4use crate::client::v2::Client;
5use crate::client::v2::Result;
6use crate::proto::sui::rpc::v2::Balance;
7use crate::proto::sui::rpc::v2::DynamicField;
8use crate::proto::sui::rpc::v2::ListBalancesRequest;
9use crate::proto::sui::rpc::v2::ListDynamicFieldsRequest;
10use crate::proto::sui::rpc::v2::ListOwnedObjectsRequest;
11use crate::proto::sui::rpc::v2::ListPackageVersionsRequest;
12use crate::proto::sui::rpc::v2::Object;
13use crate::proto::sui::rpc::v2::PackageVersion;
14
15impl Client {
16    /// Creates a stream of objects based on the provided request.
17    ///
18    /// The stream handles pagination automatically by using the page_token from responses
19    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
20    ///
21    /// # Arguments
22    /// * `request` - The initial `ListOwnedObjectsRequest` with search criteria
23    ///
24    /// # Returns
25    /// A stream that yields `Result<Object>` instances. If any RPC call fails, the
26    /// tonic::Status from that request is returned.
27    pub fn list_owned_objects(
28        &self,
29        request: impl tonic::IntoRequest<ListOwnedObjectsRequest>,
30    ) -> impl Stream<Item = Result<Object>> + 'static {
31        let client = self.clone();
32        let request = request.into_request();
33
34        stream::unfold(
35            (
36                Vec::new().into_iter(), // current batch of objects
37                true,                   // has_next_page
38                request,                // request (page_token will be updated as we paginate)
39                client,                 // client for making requests
40            ),
41            move |(mut iter, has_next_page, mut request, mut client)| async move {
42                if let Some(item) = iter.next() {
43                    return Some((Ok(item), (iter, has_next_page, request, client)));
44                }
45
46                if has_next_page {
47                    let new_request = tonic::Request::from_parts(
48                        request.metadata().clone(),
49                        request.extensions().clone(),
50                        request.get_ref().clone(),
51                    );
52
53                    match client.state_client().list_owned_objects(new_request).await {
54                        Ok(response) => {
55                            let response = response.into_inner();
56                            let mut iter = response.objects.into_iter();
57
58                            let has_next_page = response.next_page_token.is_some();
59                            request.get_mut().page_token = response.next_page_token;
60
61                            iter.next()
62                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
63                        }
64                        Err(e) => {
65                            // Return error and terminate stream
66                            request.get_mut().page_token = None;
67                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
68                        }
69                    }
70                } else {
71                    None
72                }
73            },
74        )
75    }
76
77    /// Creates a stream of `DynamicField`s based on the provided request.
78    ///
79    /// The stream handles pagination automatically by using the page_token from responses
80    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
81    ///
82    /// # Arguments
83    /// * `request` - The initial `ListDynamicFieldsRequest` with search criteria
84    ///
85    /// # Returns
86    /// A stream that yields `Result<DynamicField>` instances. If any RPC call fails, the
87    /// tonic::Status from that request is returned.
88    pub fn list_dynamic_fields(
89        &self,
90        request: impl tonic::IntoRequest<ListDynamicFieldsRequest>,
91    ) -> impl Stream<Item = Result<DynamicField>> + 'static {
92        let client = self.clone();
93        let request = request.into_request();
94
95        stream::unfold(
96            (
97                Vec::new().into_iter(), // current batch of objects
98                true,                   // has_next_page
99                request,                // request (page_token will be updated as we paginate)
100                client,                 // client for making requests
101            ),
102            move |(mut iter, has_next_page, mut request, mut client)| async move {
103                if let Some(item) = iter.next() {
104                    return Some((Ok(item), (iter, has_next_page, request, client)));
105                }
106
107                if has_next_page {
108                    let new_request = tonic::Request::from_parts(
109                        request.metadata().clone(),
110                        request.extensions().clone(),
111                        request.get_ref().clone(),
112                    );
113
114                    match client.state_client().list_dynamic_fields(new_request).await {
115                        Ok(response) => {
116                            let response = response.into_inner();
117                            let mut iter = response.dynamic_fields.into_iter();
118
119                            let has_next_page = response.next_page_token.is_some();
120                            request.get_mut().page_token = response.next_page_token;
121
122                            iter.next()
123                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
124                        }
125                        Err(e) => {
126                            // Return error and terminate stream
127                            request.get_mut().page_token = None;
128                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
129                        }
130                    }
131                } else {
132                    None
133                }
134            },
135        )
136    }
137
138    /// Creates a stream of `Balance`s based on the provided request.
139    ///
140    /// The stream handles pagination automatically by using the page_token from responses
141    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
142    ///
143    /// # Arguments
144    /// * `request` - The initial `ListBalancesRequest` with search criteria
145    ///
146    /// # Returns
147    /// A stream that yields `Result<Balance>` instances. If any RPC call fails, the
148    /// tonic::Status from that request is returned.
149    pub fn list_balances(
150        &self,
151        request: impl tonic::IntoRequest<ListBalancesRequest>,
152    ) -> impl Stream<Item = Result<Balance>> + 'static {
153        let client = self.clone();
154        let request = request.into_request();
155
156        stream::unfold(
157            (
158                Vec::new().into_iter(), // current batch of objects
159                true,                   // has_next_page
160                request,                // request (page_token will be updated as we paginate)
161                client,                 // client for making requests
162            ),
163            move |(mut iter, has_next_page, mut request, mut client)| async move {
164                if let Some(item) = iter.next() {
165                    return Some((Ok(item), (iter, has_next_page, request, client)));
166                }
167
168                if has_next_page {
169                    let new_request = tonic::Request::from_parts(
170                        request.metadata().clone(),
171                        request.extensions().clone(),
172                        request.get_ref().clone(),
173                    );
174
175                    match client.state_client().list_balances(new_request).await {
176                        Ok(response) => {
177                            let response = response.into_inner();
178                            let mut iter = response.balances.into_iter();
179
180                            let has_next_page = response.next_page_token.is_some();
181                            request.get_mut().page_token = response.next_page_token;
182
183                            iter.next()
184                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
185                        }
186                        Err(e) => {
187                            // Return error and terminate stream
188                            request.get_mut().page_token = None;
189                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
190                        }
191                    }
192                } else {
193                    None
194                }
195            },
196        )
197    }
198
199    /// Creates a stream of `PackageVersion`s based on the provided request.
200    ///
201    /// The stream handles pagination automatically by using the page_token from responses
202    /// to fetch subsequent pages. The original request's page_token is used as the starting point.
203    ///
204    /// # Arguments
205    /// * `request` - The initial `ListPackageVersionsRequest` with search criteria
206    ///
207    /// # Returns
208    /// A stream that yields `Result<PackageVersion>` instances. If any RPC call fails, the
209    /// tonic::Status from that request is returned.
210    pub fn list_package_versions(
211        &self,
212        request: impl tonic::IntoRequest<ListPackageVersionsRequest>,
213    ) -> impl Stream<Item = Result<PackageVersion>> + 'static {
214        let client = self.clone();
215        let request = request.into_request();
216
217        stream::unfold(
218            (
219                Vec::new().into_iter(), // current batch of objects
220                true,                   // has_next_page
221                request,                // request (page_token will be updated as we paginate)
222                client,                 // client for making requests
223            ),
224            move |(mut iter, has_next_page, mut request, mut client)| async move {
225                if let Some(item) = iter.next() {
226                    return Some((Ok(item), (iter, has_next_page, request, client)));
227                }
228
229                if has_next_page {
230                    let new_request = tonic::Request::from_parts(
231                        request.metadata().clone(),
232                        request.extensions().clone(),
233                        request.get_ref().clone(),
234                    );
235
236                    match client
237                        .package_client()
238                        .list_package_versions(new_request)
239                        .await
240                    {
241                        Ok(response) => {
242                            let response = response.into_inner();
243                            let mut iter = response.versions.into_iter();
244
245                            let has_next_page = response.next_page_token.is_some();
246                            request.get_mut().page_token = response.next_page_token;
247
248                            iter.next()
249                                .map(|item| (Ok(item), (iter, has_next_page, request, client)))
250                        }
251                        Err(e) => {
252                            // Return error and terminate stream
253                            request.get_mut().page_token = None;
254                            Some((Err(e), (Vec::new().into_iter(), false, request, client)))
255                        }
256                    }
257                } else {
258                    None
259                }
260            },
261        )
262    }
263}