Skip to content

Commit 33ee988

Browse files
committed
Fix some bugs and add some logs
1 parent b9bc153 commit 33ee988

14 files changed

Lines changed: 531 additions & 212 deletions

File tree

crates/client-api/src/routes/database.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use axum::response::{ErrorResponse, IntoResponse};
2020
use axum::routing::MethodRouter;
2121
use axum::Extension;
2222
use axum_extra::TypedHeader;
23-
use http::HeaderMap;
2423
use futures::TryStreamExt;
24+
use http::HeaderMap;
2525
use http::StatusCode;
2626
use log::{info, warn};
2727
use serde::Deserialize;
@@ -421,7 +421,12 @@ pub async fn wound_2pc<S: ControlStateDelegate + NodeDelegate>(
421421
let tx_id = global_tx_id
422422
.parse::<GlobalTxId>()
423423
.map_err(|e| (StatusCode::BAD_REQUEST, e).into_response())?;
424-
let (module, _database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
424+
let (module, database) = find_module_and_database(&worker_ctx, name_or_identity).await?;
425+
426+
log::info!(
427+
"received 2PC wound request for transaction {tx_id} on database {}",
428+
database.database_identity
429+
);
425430

426431
module.wound_global_tx(tx_id).await.map_err(|e| {
427432
log::warn!("2PC wound failed for {tx_id}: {e}");

crates/core/src/client/client_connection.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::error::DBError;
1414
use crate::host::module_host::ClientConnectedError;
1515
use crate::host::{
1616
CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ProcedureCallResult, ReducerCallError,
17-
ReducerCallResult,
17+
ReducerCallResult, ReducerOutcome,
1818
};
1919
use crate::subscription::module_subscription_manager::BroadcastError;
2020
use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool;
@@ -870,18 +870,33 @@ impl ClientConnection {
870870
timer: Instant,
871871
_flags: ws_v2::CallReducerFlags,
872872
) -> Result<ReducerCallResult, ReducerCallError> {
873-
self.module()
874-
.call_reducer(
875-
self.id.identity,
876-
Some(self.id.connection_id),
877-
None,
878-
Some(self.sender()),
879-
Some(request_id),
880-
Some(timer),
881-
reducer,
882-
FunctionArgs::Bsatn(args),
883-
)
884-
.await
873+
const MAX_WOUNDED_RETRIES: usize = 3;
874+
875+
let module = self.module();
876+
let mut tx_id = module.replica_ctx().mint_global_tx_id(Timestamp::now());
877+
878+
for attempt in 0..=MAX_WOUNDED_RETRIES {
879+
let result = module
880+
.call_reducer(
881+
self.id.identity,
882+
Some(self.id.connection_id),
883+
Some(tx_id),
884+
Some(self.sender()),
885+
Some(request_id),
886+
Some(timer),
887+
reducer,
888+
FunctionArgs::Bsatn(args.clone()),
889+
)
890+
.await?;
891+
892+
if !matches!(result.outcome, ReducerOutcome::Wounded(_)) || attempt == MAX_WOUNDED_RETRIES {
893+
return Ok(result);
894+
}
895+
896+
tx_id = tx_id.next_attempt();
897+
}
898+
899+
unreachable!("retry loop should return before exhausting attempts")
885900
}
886901

887902
pub async fn call_procedure(

crates/core/src/client/messages.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,7 @@ impl ToProtocol for TransactionUpdateMessage {
381381
EventStatus::Committed(_) => ws_v1::UpdateStatus::Committed(update),
382382
EventStatus::FailedUser(errmsg)
383383
| EventStatus::FailedInternal(errmsg)
384-
| EventStatus::Wounded(errmsg) => {
385-
ws_v1::UpdateStatus::Failed(errmsg.clone().into())
386-
}
384+
| EventStatus::Wounded(errmsg) => ws_v1::UpdateStatus::Failed(errmsg.clone().into()),
387385
EventStatus::OutOfEnergy => ws_v1::UpdateStatus::OutOfEnergy,
388386
};
389387

0 commit comments

Comments
 (0)