Skip to content

Commit 278641f

Browse files
authored
Consume the Responses in Statement::drop (#8)
Instead of dropping the `Responses` in `Statement::drop` and closing the channel stored in there, poll it for the response messages. This appears to fix a weird failure that causes us to never read a response to the query back from the stream if we write a close packet for a prepared statement, and a packet for the portal created by `query_typed` in quick succession, and then start polling the read part of the stream. Somehow we receive the close success response but not the response to the query. However, if we poll the channel through which the `Connection` sends the response back to the caller in `Drop`, everything works fine. Note that it should not be necessary and this raises more questions than answers. Reading subsequent messages is not blocked on the response to the close packet not being read. `Connection` always reads through everything, and it sends the parsed responses to the intrerested parties if the channel for that specific response is still alive, otherwise it just skips over it but it always reads everything to completion. That part works well and there are no issues with it. Moreover, if I just `std::mem::forget` the `Responses` in `Statement::drop` to keep it alive and force the `Connection` to write the response to the channel's buffer without ever reading it, the bug is still reproducible, it makes no difference. What actually does make difference, judging from the traces, is that polling `Responses::next` notifies the `Connection` task's waker: this has a rather subtle effect and influences the timing of the operations, making us poll the socket for reading (which happens concurrently with us writing the query) a little bit earlier. Similarly, not doing anything in `Statement::drop`, and instead making the `Connection` artificially wake itself and busy-loop on CPU by calling `cx.waker.wake_by_ref()` also completely resolves the issue. The connection task is not deadlocked, and it is periodically woken up on socket. If we leave the seemingly frozen application running, after some time we actually start receiving unrelated asynchronous notices from the server, and we successfully read them! There's just genuinely no response to the query ever received from the server. This means one of the two things: either the write is never actually finished (contrary to the fact that we receive `Poll::Ready` in `poll_flush`), or the bug is actually on the server side. That said, _something somewhere else_ does seem to be blocked, as Ctrl+C doesn't work when the issue occurs, which means the signal handler task is not being polled for some reason. It was a big red herring that turned out to be a separate problem, possibly on our side. The possibility of a server side bug dependent on the order or timing of operations would also answer the question of "what does any of this have to do with pgbouncer or Supavisor?" to which I don't have a good answer. The fact that the issue is not reproducible when using normal Postgres is certainly interesting. This patch is probably not upstremable (for one, the introduced dependency on `rt` feature is somewhat of a breaking change, as it tightly couples the library with Tokio — without it, I believe, it's technically possible to use `tokio_postgres` with other runtimes or even WASM despite the name) and it's treating the symptom and not the root cause. That said, it's safe and doesn't hurt (for us) and it reliably and consistently fixes [#26537](https://github.com/prisma/prisma/issues/26537)/[ORM-796](https://linear.app/prisma-company/issue/ORM-796/regression-with-pgbouncer-tracing), at least on my machine. It didn't test though if this still fixes the issue (or if it occurs in the first place) if there's less than 3 threads available (to be able to run the tasks for mobc connection, tokio_postgres connection and the background task spawned in `Statement::drop` in parallel) and I'd rather not block on the future in `Statement::drop` and degrade the latency for everyone else for the sake of fixing an exotic issue.
1 parent 6afba8f commit 278641f

2 files changed

Lines changed: 18 additions & 2 deletions

File tree

tokio-postgres/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ phf = "0.11"
6060
postgres-protocol = { version = "0.6.8", path = "../postgres-protocol" }
6161
postgres-types = { version = "0.2.9", path = "../postgres-types" }
6262
socket2 = "0.5.3"
63-
tokio = { version = "1.27", features = ["io-util"] }
63+
tokio = { version = "1.27", features = ["io-util", "rt"] }
6464
tokio-util = { version = "0.7", features = ["codec"] }
6565
rand = "0.9.0"
6666
whoami = "1.4.1"

tokio-postgres/src/statement.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::client::InnerClient;
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::types::Type;
5+
use postgres_protocol::message::backend::Message;
56
use postgres_protocol::message::frontend;
67
use std::fmt;
78
use std::sync::{Arc, Weak};
@@ -19,13 +20,28 @@ impl Drop for StatementInner {
1920
// Unnamed statements don't need to be closed
2021
return;
2122
}
23+
2224
if let Some(client) = self.client.upgrade() {
2325
let buf = client.with_buf(|buf| {
2426
frontend::close(b'S', &self.name, buf).unwrap();
2527
frontend::sync(buf);
2628
buf.split().freeze()
2729
});
28-
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
30+
31+
let response = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
32+
33+
tokio::spawn(async move {
34+
let result = async move {
35+
match response?.next().await? {
36+
Message::CloseComplete => Ok(()),
37+
_ => Err(crate::Error::unexpected_message()),
38+
}
39+
};
40+
41+
if let Err(err) = result.await {
42+
log::error!("failed to deallocate prepared statement: {err}");
43+
}
44+
});
2945
}
3046
}
3147
}

0 commit comments

Comments
 (0)