Skip to content

Commit d8e62bc

Browse files
Drivers retry connecting to coordinator
1 parent b25a021 commit d8e62bc

1 file changed

Lines changed: 48 additions & 11 deletions

File tree

tools/tpcc-runner/src/driver.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -534,21 +534,58 @@ async fn execute_stock_level(
534534

535535
async fn resolve_driver_setup(config: DriverConfig) -> Result<(DriverConfig, RunSchedule)> {
536536
if let Some(coordinator_url) = &config.coordinator_url {
537+
const REGISTER_ATTEMPTS: u32 = 5;
538+
const REGISTER_RETRY_DELAY_MS: u64 = 500;
537539
let client = reqwest::Client::new();
538540
let register = RegisterDriverRequest {
539541
driver_id: config.driver_id.clone(),
540542
};
541-
let response: RegisterDriverResponse = client
542-
.post(format!("{}/register", coordinator_url))
543-
.json(&register)
544-
.send()
545-
.await
546-
.context("failed to register driver with coordinator")?
547-
.error_for_status()
548-
.context("coordinator rejected register request")?
549-
.json()
550-
.await
551-
.context("failed to decode register response")?;
543+
let mut last_error = None;
544+
let mut response = None;
545+
for attempt in 1..=REGISTER_ATTEMPTS {
546+
match client
547+
.post(format!("{}/register", coordinator_url))
548+
.json(&register)
549+
.send()
550+
.await
551+
{
552+
Ok(http_response) => match http_response.error_for_status() {
553+
Ok(http_response) => match http_response.json::<RegisterDriverResponse>().await {
554+
Ok(parsed) => {
555+
response = Some(parsed);
556+
break;
557+
}
558+
Err(err) => {
559+
last_error = Some(anyhow!("failed to decode register response: {err}"));
560+
}
561+
},
562+
Err(err) => {
563+
last_error = Some(anyhow!("coordinator rejected register request: {err}"));
564+
}
565+
},
566+
Err(err) => {
567+
last_error = Some(anyhow!("failed to register driver with coordinator: {err}"));
568+
}
569+
}
570+
571+
if attempt < REGISTER_ATTEMPTS {
572+
log::warn!(
573+
"driver {} failed to register with coordinator on attempt {}/{}; retrying in {}ms",
574+
config.driver_id,
575+
attempt,
576+
REGISTER_ATTEMPTS,
577+
REGISTER_RETRY_DELAY_MS
578+
);
579+
tokio::time::sleep(Duration::from_millis(REGISTER_RETRY_DELAY_MS)).await;
580+
}
581+
}
582+
let response = match response {
583+
Some(response) => response,
584+
None => {
585+
return Err(last_error
586+
.unwrap_or_else(|| anyhow!("driver registration failed without an error")));
587+
}
588+
};
552589
if !response.accepted {
553590
bail!("coordinator did not accept driver registration");
554591
}

0 commit comments

Comments
 (0)