sui_rpc/client/v2/lists.rs
1use futures::stream;
2use futures::stream::Stream;
3
4use crate::client::v2::Client;
5use crate::client::v2::Result;
6use crate::proto::sui::rpc::v2::Balance;
7use crate::proto::sui::rpc::v2::DynamicField;
8use crate::proto::sui::rpc::v2::ListBalancesRequest;
9use crate::proto::sui::rpc::v2::ListDynamicFieldsRequest;
10use crate::proto::sui::rpc::v2::ListOwnedObjectsRequest;
11use crate::proto::sui::rpc::v2::ListPackageVersionsRequest;
12use crate::proto::sui::rpc::v2::Object;
13use crate::proto::sui::rpc::v2::PackageVersion;
14
15impl Client {
16 /// Creates a stream of objects based on the provided request.
17 ///
18 /// The stream handles pagination automatically by using the page_token from responses
19 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
20 ///
21 /// # Arguments
22 /// * `request` - The initial `ListOwnedObjectsRequest` with search criteria
23 ///
24 /// # Returns
25 /// A stream that yields `Result<Object>` instances. If any RPC call fails, the
26 /// tonic::Status from that request is returned.
27 pub fn list_owned_objects(
28 &self,
29 request: impl tonic::IntoRequest<ListOwnedObjectsRequest>,
30 ) -> impl Stream<Item = Result<Object>> + 'static {
31 let client = self.clone();
32 let request = request.into_request();
33
34 stream::unfold(
35 (
36 Vec::new().into_iter(), // current batch of objects
37 true, // has_next_page
38 request, // request (page_token will be updated as we paginate)
39 client, // client for making requests
40 ),
41 move |(mut iter, has_next_page, mut request, mut client)| async move {
42 if let Some(item) = iter.next() {
43 return Some((Ok(item), (iter, has_next_page, request, client)));
44 }
45
46 if has_next_page {
47 let new_request = tonic::Request::from_parts(
48 request.metadata().clone(),
49 request.extensions().clone(),
50 request.get_ref().clone(),
51 );
52
53 match client.state_client().list_owned_objects(new_request).await {
54 Ok(response) => {
55 let response = response.into_inner();
56 let mut iter = response.objects.into_iter();
57
58 let has_next_page = response.next_page_token.is_some();
59 request.get_mut().page_token = response.next_page_token;
60
61 iter.next()
62 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
63 }
64 Err(e) => {
65 // Return error and terminate stream
66 request.get_mut().page_token = None;
67 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
68 }
69 }
70 } else {
71 None
72 }
73 },
74 )
75 }
76
77 /// Creates a stream of `DynamicField`s based on the provided request.
78 ///
79 /// The stream handles pagination automatically by using the page_token from responses
80 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
81 ///
82 /// # Arguments
83 /// * `request` - The initial `ListDynamicFieldsRequest` with search criteria
84 ///
85 /// # Returns
86 /// A stream that yields `Result<DynamicField>` instances. If any RPC call fails, the
87 /// tonic::Status from that request is returned.
88 pub fn list_dynamic_fields(
89 &self,
90 request: impl tonic::IntoRequest<ListDynamicFieldsRequest>,
91 ) -> impl Stream<Item = Result<DynamicField>> + 'static {
92 let client = self.clone();
93 let request = request.into_request();
94
95 stream::unfold(
96 (
97 Vec::new().into_iter(), // current batch of objects
98 true, // has_next_page
99 request, // request (page_token will be updated as we paginate)
100 client, // client for making requests
101 ),
102 move |(mut iter, has_next_page, mut request, mut client)| async move {
103 if let Some(item) = iter.next() {
104 return Some((Ok(item), (iter, has_next_page, request, client)));
105 }
106
107 if has_next_page {
108 let new_request = tonic::Request::from_parts(
109 request.metadata().clone(),
110 request.extensions().clone(),
111 request.get_ref().clone(),
112 );
113
114 match client.state_client().list_dynamic_fields(new_request).await {
115 Ok(response) => {
116 let response = response.into_inner();
117 let mut iter = response.dynamic_fields.into_iter();
118
119 let has_next_page = response.next_page_token.is_some();
120 request.get_mut().page_token = response.next_page_token;
121
122 iter.next()
123 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
124 }
125 Err(e) => {
126 // Return error and terminate stream
127 request.get_mut().page_token = None;
128 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
129 }
130 }
131 } else {
132 None
133 }
134 },
135 )
136 }
137
138 /// Creates a stream of `Balance`s based on the provided request.
139 ///
140 /// The stream handles pagination automatically by using the page_token from responses
141 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
142 ///
143 /// # Arguments
144 /// * `request` - The initial `ListBalancesRequest` with search criteria
145 ///
146 /// # Returns
147 /// A stream that yields `Result<Balance>` instances. If any RPC call fails, the
148 /// tonic::Status from that request is returned.
149 pub fn list_balances(
150 &self,
151 request: impl tonic::IntoRequest<ListBalancesRequest>,
152 ) -> impl Stream<Item = Result<Balance>> + 'static {
153 let client = self.clone();
154 let request = request.into_request();
155
156 stream::unfold(
157 (
158 Vec::new().into_iter(), // current batch of objects
159 true, // has_next_page
160 request, // request (page_token will be updated as we paginate)
161 client, // client for making requests
162 ),
163 move |(mut iter, has_next_page, mut request, mut client)| async move {
164 if let Some(item) = iter.next() {
165 return Some((Ok(item), (iter, has_next_page, request, client)));
166 }
167
168 if has_next_page {
169 let new_request = tonic::Request::from_parts(
170 request.metadata().clone(),
171 request.extensions().clone(),
172 request.get_ref().clone(),
173 );
174
175 match client.state_client().list_balances(new_request).await {
176 Ok(response) => {
177 let response = response.into_inner();
178 let mut iter = response.balances.into_iter();
179
180 let has_next_page = response.next_page_token.is_some();
181 request.get_mut().page_token = response.next_page_token;
182
183 iter.next()
184 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
185 }
186 Err(e) => {
187 // Return error and terminate stream
188 request.get_mut().page_token = None;
189 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
190 }
191 }
192 } else {
193 None
194 }
195 },
196 )
197 }
198
199 /// Creates a stream of `PackageVersion`s based on the provided request.
200 ///
201 /// The stream handles pagination automatically by using the page_token from responses
202 /// to fetch subsequent pages. The original request's page_token is used as the starting point.
203 ///
204 /// # Arguments
205 /// * `request` - The initial `ListPackageVersionsRequest` with search criteria
206 ///
207 /// # Returns
208 /// A stream that yields `Result<PackageVersion>` instances. If any RPC call fails, the
209 /// tonic::Status from that request is returned.
210 pub fn list_package_versions(
211 &self,
212 request: impl tonic::IntoRequest<ListPackageVersionsRequest>,
213 ) -> impl Stream<Item = Result<PackageVersion>> + 'static {
214 let client = self.clone();
215 let request = request.into_request();
216
217 stream::unfold(
218 (
219 Vec::new().into_iter(), // current batch of objects
220 true, // has_next_page
221 request, // request (page_token will be updated as we paginate)
222 client, // client for making requests
223 ),
224 move |(mut iter, has_next_page, mut request, mut client)| async move {
225 if let Some(item) = iter.next() {
226 return Some((Ok(item), (iter, has_next_page, request, client)));
227 }
228
229 if has_next_page {
230 let new_request = tonic::Request::from_parts(
231 request.metadata().clone(),
232 request.extensions().clone(),
233 request.get_ref().clone(),
234 );
235
236 match client
237 .package_client()
238 .list_package_versions(new_request)
239 .await
240 {
241 Ok(response) => {
242 let response = response.into_inner();
243 let mut iter = response.versions.into_iter();
244
245 let has_next_page = response.next_page_token.is_some();
246 request.get_mut().page_token = response.next_page_token;
247
248 iter.next()
249 .map(|item| (Ok(item), (iter, has_next_page, request, client)))
250 }
251 Err(e) => {
252 // Return error and terminate stream
253 request.get_mut().page_token = None;
254 Some((Err(e), (Vec::new().into_iter(), false, request, client)))
255 }
256 }
257 } else {
258 None
259 }
260 },
261 )
262 }
263}