Skip to content

Commit 17d3e5f

Browse files
committed
Add more retries and backoff
1 parent 9968095 commit 17d3e5f

1 file changed

Lines changed: 12 additions & 1 deletion

File tree

crates/core/src/client/client_connection.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::util::prometheus_handle::IntGaugeExt;
2323
use crate::worker_metrics::WORKER_METRICS;
2424
use bytes::Bytes;
2525
use bytestring::ByteString;
26+
use std::time::Duration;
2627
use derive_more::From;
2728
use futures::prelude::*;
2829
use prometheus::{Histogram, IntCounter, IntGauge};
@@ -35,6 +36,7 @@ use spacetimedb_lib::{bsatn, Identity, TimeDuration, Timestamp};
3536
use tokio::sync::mpsc::error::{SendError, TrySendError};
3637
use tokio::sync::{mpsc, oneshot, watch};
3738
use tokio::task::AbortHandle;
39+
use tokio::time::sleep;
3840
use tracing::{trace, warn};
3941

4042
#[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)]
@@ -870,10 +872,12 @@ impl ClientConnection {
870872
timer: Instant,
871873
_flags: ws_v2::CallReducerFlags,
872874
) -> Result<ReducerCallResult, ReducerCallError> {
873-
const MAX_WOUNDED_RETRIES: usize = 3;
875+
const MAX_WOUNDED_RETRIES: usize = 10;
876+
const MAX_BACKOFF: Duration = Duration::from_millis(100);
874877

875878
let module = self.module();
876879
let mut tx_id = module.replica_ctx().mint_global_tx_id(Timestamp::now());
880+
let mut wound_backoff = Duration::from_millis(10);
877881

878882
for attempt in 0..=MAX_WOUNDED_RETRIES {
879883
let result = module
@@ -890,8 +894,15 @@ impl ClientConnection {
890894
.await?;
891895

892896
if !matches!(result.outcome, ReducerOutcome::Wounded(_)) || attempt == MAX_WOUNDED_RETRIES {
897+
if attempt == MAX_WOUNDED_RETRIES && matches!(result.outcome, ReducerOutcome::Wounded(_)) {
898+
log::warn!("Reducer call was wounded on final attempt. Returning error to client.");
899+
}
893900
return Ok(result);
894901
}
902+
903+
log::info!("Reducer call was wounded on attempt {attempt}, retrying after {wound_backoff:?} with new transaction ID {tx_id}");
904+
sleep(wound_backoff).await;
905+
wound_backoff = wound_backoff.mul_f32(2.0).min(MAX_BACKOFF);
895906

896907
tx_id = tx_id.next_attempt();
897908
}

0 commit comments

Comments
 (0)