1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use core:: mem:: Discriminant ;
1516use core:: ops:: Bound ;
1617use core:: sync:: atomic:: { AtomicU64 , Ordering } ;
1718use core:: time:: Duration ;
@@ -60,6 +61,10 @@ pub struct OperationSubscriber<S: SchedulerStore, I: InstantWrapper, NowFn: Fn()
6061 > ,
6162 last_known_keepalive_ts : AtomicU64 ,
6263 now_fn : NowFn ,
64+ // If the SchedulerSubscriptionManager is not reliable, then this is populated
65+ // when the state is set to subscribed. When set it causes the state to be polled
66+ // as well as listening for the publishing.
67+ maybe_last_stage : Option < Discriminant < ActionStage > > ,
6368}
6469
6570impl < S : SchedulerStore , I : InstantWrapper , NowFn : Fn ( ) -> I + core:: fmt:: Debug > core:: fmt:: Debug
99104 last_known_keepalive_ts : AtomicU64 :: new ( 0 ) ,
100105 state : OperationSubscriberState :: Unsubscribed ,
101106 now_fn,
107+ maybe_last_stage : None ,
102108 }
103109 }
104110
@@ -158,35 +164,50 @@ where
158164 . upgrade ( )
159165 . err_tip ( || "Store gone in OperationSubscriber::get_awaited_action" ) ?;
160166 let subscription = match & mut self . state {
167+ OperationSubscriberState :: Subscribed ( subscription) => subscription,
161168 OperationSubscriberState :: Unsubscribed => {
162169 let subscription = store
163170 . subscription_manager ( )
164171 . err_tip ( || "In OperationSubscriber::changed::subscription_manager" ) ?
165172 . subscribe ( self . subscription_key . borrow ( ) )
166173 . err_tip ( || "In OperationSubscriber::changed::subscribe" ) ?;
167174 self . state = OperationSubscriberState :: Subscribed ( subscription) ;
175+ // When we've just subscribed, there may have been changes before now.
176+ let action = Self :: inner_get_awaited_action (
177+ store. as_ref ( ) ,
178+ self . subscription_key . borrow ( ) ,
179+ self . maybe_client_operation_id . clone ( ) ,
180+ & self . last_known_keepalive_ts ,
181+ )
182+ . await
183+ . err_tip ( || "In OperationSubscriber::changed" ) ?;
184+ if !<S as SchedulerStore >:: SubscriptionManager :: is_reliable ( ) {
185+ self . maybe_last_stage = Some ( core:: mem:: discriminant ( & action. state ( ) . stage ) ) ;
186+ }
187+ // Existing changes are only interesting if the state is past queued.
188+ if !matches ! ( action. state( ) . stage, ActionStage :: Queued ) {
189+ return Ok ( action) ;
190+ }
168191 let OperationSubscriberState :: Subscribed ( subscription) = & mut self . state else {
169192 unreachable ! ( "Subscription should be in Subscribed state" ) ;
170193 } ;
171194 subscription
172195 }
173- OperationSubscriberState :: Subscribed ( subscription) => subscription,
174196 } ;
175197
176198 let changed_fut = subscription. changed ( ) ;
177199 tokio:: pin!( changed_fut) ;
178200 loop {
179- let mut retries = 0 ;
180- loop {
201+ // This is set if the maybe_last_state doesn't match the state in the store.
202+ let mut maybe_changed_action = None ;
203+ for attempt in 1 ..=MAX_RETRIES_FOR_CLIENT_KEEPALIVE {
181204 let last_known_keepalive_ts = self . last_known_keepalive_ts . load ( Ordering :: Acquire ) ;
182205 if I :: from_secs ( last_known_keepalive_ts) . elapsed ( ) <= CLIENT_KEEPALIVE_DURATION {
183206 break ; // We are still within the keep alive duration.
184207 }
185- if retries > MAX_RETRIES_FOR_CLIENT_KEEPALIVE {
186- return Err ( make_err ! (
187- Code :: Aborted ,
188- "Could not update client keep alive for AwaitedAction" ,
189- ) ) ;
208+ if attempt > 1 {
209+ // Wait a tick before retrying.
210+ ( self . now_fn ) ( ) . sleep ( Duration :: from_millis ( 100 ) ) . await ;
190211 }
191212 let mut awaited_action = Self :: inner_get_awaited_action (
192213 store. as_ref ( ) ,
@@ -197,38 +218,62 @@ where
197218 . await
198219 . err_tip ( || "In OperationSubscriber::changed" ) ?;
199220 awaited_action. update_client_keep_alive ( ( self . now_fn ) ( ) . now ( ) ) ;
200- let update_res = inner_update_awaited_action ( store. as_ref ( ) , awaited_action)
201- . await
202- . err_tip ( || "In OperationSubscriber::changed" ) ;
203- if update_res. is_ok ( ) {
204- break ;
221+ // If this is set to Some then the action changed without being published.
222+ maybe_changed_action = self
223+ . maybe_last_stage
224+ . as_ref ( )
225+ . is_some_and ( |last_stage| {
226+ * last_stage != core:: mem:: discriminant ( & awaited_action. state ( ) . stage )
227+ } )
228+ . then ( || awaited_action. clone ( ) ) ;
229+ match inner_update_awaited_action ( store. as_ref ( ) , awaited_action) . await {
230+ Ok ( ( ) ) => break ,
231+ err if attempt == MAX_RETRIES_FOR_CLIENT_KEEPALIVE => {
232+ err. err_tip_with_code ( |_| {
233+ ( Code :: Aborted , "Could not update client keep alive" )
234+ } ) ?;
235+ }
236+ _ => ( ) ,
205237 }
206- retries += 1 ;
207- // Wait a tick before retrying.
208- ( self . now_fn ) ( ) . sleep ( Duration :: from_millis ( 100 ) ) . await ;
209238 }
210- let sleep_fut = ( self . now_fn ) ( ) . sleep ( CLIENT_KEEPALIVE_DURATION ) ;
239+ // If the polling shows that it's changed state then publish now.
240+ if let Some ( changed_action) = maybe_changed_action {
241+ self . maybe_last_stage =
242+ Some ( core:: mem:: discriminant ( & changed_action. state ( ) . stage ) ) ;
243+ return Ok ( changed_action) ;
244+ }
245+ // Determine the sleep time based on the last client keep alive.
246+ let sleep_time = CLIENT_KEEPALIVE_DURATION
247+ . checked_sub (
248+ I :: from_secs ( self . last_known_keepalive_ts . load ( Ordering :: Acquire ) ) . elapsed ( ) ,
249+ )
250+ . unwrap_or ( Duration :: from_millis ( 100 ) ) ;
211251 tokio:: select! {
212252 result = & mut changed_fut => {
213253 result?;
214254 break ;
215255 }
216- ( ) = sleep_fut => {
256+ ( ) = ( self . now_fn ) ( ) . sleep ( sleep_time ) => {
217257 // If we haven't received any updates for a while, we should
218258 // let the database know that we are still listening to prevent
219- // the action from being dropped.
259+ // the action from being dropped. Also poll for updates if the
260+ // subscription manager is unreliable.
220261 }
221262 }
222263 }
223264
224- Self :: inner_get_awaited_action (
265+ let awaited_action = Self :: inner_get_awaited_action (
225266 store. as_ref ( ) ,
226267 self . subscription_key . borrow ( ) ,
227268 self . maybe_client_operation_id . clone ( ) ,
228269 & self . last_known_keepalive_ts ,
229270 )
230271 . await
231- . err_tip ( || "In OperationSubscriber::changed" )
272+ . err_tip ( || "In OperationSubscriber::changed" ) ?;
273+ if self . maybe_last_stage . is_some ( ) {
274+ self . maybe_last_stage = Some ( core:: mem:: discriminant ( & awaited_action. state ( ) . stage ) ) ;
275+ }
276+ Ok ( awaited_action)
232277 }
233278
234279 async fn borrow ( & self ) -> Result < AwaitedAction , Error > {
0 commit comments