Skip to content

Commit 6040296

Browse files
authored
persist/pubsub: fix connection leak on gRPC stream error (#35938)
When the client's gRPC response stream errors, the reconnect loop drops the tonic `Channel` but hyper's background task keeps running: It's blocked polling the `broadcast_messages` async stream, which holds a live `BroadcastStream` receiver. This keeps the HTTP2 connection open, leaking one connection per reconnect. Fix this by giving the stream a cancellation token. When the reconnect loop drops `cancel_tx`, the stream observes it via `select!` and terminates, allowing hyper to close the HTTP2 connection. ### Motivation Fixes https://github.com/MaterializeInc/database-issues/issues/11276
1 parent 51c628a commit 6040296

1 file changed

Lines changed: 29 additions & 9 deletions

File tree

src/persist-client/src/rpc.rs

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,18 @@ impl GrpcPubSubClient {
353353
.broadcast_recv_lagged_count
354354
.clone();
355355

356+
// `client.pub_sub(...)` starts a hyper background task reading from the
357+
// `broadcast_messages` stream, to serve the HTTP2 connection. The broadcast stream
358+
// doesn't normally terminate, which means the HTTP2 connection doesn't terminate
359+
// either. We set up a cancelation token to force termination and avoid a connection
360+
// leak.
361+
let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
362+
356363
// shard subscriptions are tracked by connection on the server, so if our
357364
// gRPC stream is ever swapped out, we must inform the server which shards
358365
// our client intended to be subscribed to.
359366
let broadcast_messages = async_stream::stream! {
367+
let mut cancel_rx = std::pin::pin!(cancel_rx);
360368
'reconnect: loop {
361369
// If we have active subscriptions, resend them.
362370
for id in sender.subscriptions() {
@@ -370,18 +378,28 @@ impl GrpcPubSubClient {
370378
}
371379

372380
// Forward on messages from the broadcast channel, reconnecting if necessary.
373-
while let Some(message) = broadcast.next().await {
374-
debug!("sending pubsub message: {:?}", message);
375-
match message {
376-
Ok(message) => yield message,
377-
Err(BroadcastStreamRecvError::Lagged(i)) => {
378-
broadcast_errors.inc_by(i);
379-
continue 'reconnect;
381+
loop {
382+
tokio::select! {
383+
message = broadcast.next() => {
384+
debug!("sending pubsub message: {:?}", message);
385+
match message {
386+
Some(Ok(message)) => yield message,
387+
Some(Err(BroadcastStreamRecvError::Lagged(i))) => {
388+
broadcast_errors.inc_by(i);
389+
continue 'reconnect;
390+
}
391+
None => {
392+
debug!("exhausted pubsub broadcast stream; shutting down");
393+
return;
394+
}
395+
}
396+
}
397+
_ = &mut cancel_rx => {
398+
debug!("pubsub broadcast stream cancelled; shutting down");
399+
return;
380400
}
381401
}
382402
}
383-
debug!("exhausted pubsub broadcast stream; shutting down");
384-
break;
385403
}
386404
};
387405
let pubsub_request =
@@ -403,6 +421,8 @@ impl GrpcPubSubClient {
403421
)
404422
.await;
405423

424+
drop(cancel_tx);
425+
406426
match stream_completed {
407427
// common case: reconnect due to some transient error
408428
Ok(_) => continue,

0 commit comments

Comments
 (0)