sui_graphql_client/
streams.rs1use crate::error;
5use crate::query_types::PageInfo;
6use crate::Direction;
7use crate::Page;
8use crate::PaginationFilter;
9
10use futures::Stream;
11use std::future::Future;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16pub struct PageStream<T, F, Fut> {
18 query_fn: F,
19 direction: Direction,
20 current_page: Option<(PageInfo, std::vec::IntoIter<T>)>,
21 current_future: Option<Pin<Box<Fut>>>,
22 finished: bool,
23 is_first_page: bool,
24}
25
26impl<T, F, Fut> PageStream<T, F, Fut> {
27 pub fn new(query_fn: F, direction: Direction) -> Self {
28 Self {
29 query_fn,
30 direction,
31 current_page: None,
32 current_future: None,
33 finished: false,
34 is_first_page: true,
35 }
36 }
37}
38
39impl<T, F, Fut> Stream for PageStream<T, F, Fut>
40where
41 T: Clone + Unpin,
42 F: Fn(PaginationFilter) -> Fut,
43 F: Unpin,
44 Fut: Future<Output = Result<Page<T>, error::Error>>,
45{
46 type Item = Result<T, error::Error>;
47
48 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49 if self.finished {
50 return Poll::Ready(None);
51 }
52
53 loop {
54 let direction = self.direction.clone();
55 if let Some((page_info, iter)) = &mut self.current_page {
57 if let Some(item) = iter.next() {
58 return Poll::Ready(Some(Ok(item)));
59 }
60
61 let should_continue = match direction {
64 Direction::Forward => page_info.has_next_page,
65 Direction::Backward => page_info.has_previous_page,
66 };
67 if !should_continue {
68 self.finished = true;
69 return Poll::Ready(None);
70 }
71 }
72
73 let current_cursor = self
75 .current_page
76 .as_ref()
77 .and_then(|(page_info, _iter)| {
78 match self.direction {
79 Direction::Forward => page_info
80 .has_next_page
81 .then(|| page_info.end_cursor.clone()),
82 Direction::Backward => {
83 if self.is_first_page {
86 None
87 } else {
88 page_info
89 .has_previous_page
90 .then(|| page_info.start_cursor.clone())
91 }
92 }
93 }
94 })
95 .flatten();
96
97 if self.current_future.is_none() {
99 if self.is_first_page && current_cursor.is_some() {
100 self.is_first_page = false;
101 }
102 let filter = PaginationFilter {
103 direction: self.direction.clone(),
104 cursor: current_cursor,
105 limit: None,
106 };
107 let future = (self.query_fn)(filter);
108 self.current_future = Some(Box::pin(future));
109 }
110
111 match self.current_future.as_mut().unwrap().as_mut().poll(cx) {
113 Poll::Ready(Ok(page)) => {
114 self.current_future = None;
115
116 if page.is_empty() {
117 self.finished = true;
118 return Poll::Ready(None);
119 }
120
121 let (page_info, data) = page.into_parts();
122 let iter = match self.direction {
124 Direction::Forward => data.into_iter(),
125 Direction::Backward => {
126 let mut vec = data;
127 vec.reverse();
128 vec.into_iter()
129 }
130 };
131 self.current_page = Some((page_info, iter));
132
133 if self.is_first_page {
134 self.is_first_page = false;
135 }
136 }
137 Poll::Ready(Err(e)) => {
138 if self.is_first_page {
139 self.is_first_page = false;
140 }
141 self.finished = true;
142 self.current_future = None;
143 return Poll::Ready(Some(Err(e)));
144 }
145 Poll::Pending => return Poll::Pending,
146 }
147 }
148 }
149}
150
151pub fn stream_paginated_query<T, F, Fut>(query_fn: F, direction: Direction) -> PageStream<T, F, Fut>
175where
176 F: Fn(PaginationFilter) -> Fut,
177 Fut: Future<Output = Result<Page<T>, error::Error>>,
178{
179 PageStream::new(query_fn, direction)
180}