Skip to content

Commit 30f079a

Browse files
alvherrehoriguti
authored andcommitted
libpq: Improve idle state handling in pipeline mode
We were going into IDLE state too soon when executing queries via PQsendQuery in pipeline mode, causing several scenarios to misbehave in different ways -- most notably, as reported by Daniele Varrazzo, that a warning message is produced by libpq: message type 0x33 arrived from server while idle But it is also possible, if queries are sent and results consumed not in lockstep, for the expected mediating NULL result values from PQgetResult to be lost (a problem which has not been reported, but which is more serious). Fix this by introducing two new concepts: one is a command queue element PGQUERY_CLOSE to tell libpq to wait for the CloseComplete server response to the Close message that is sent by PQsendQuery. Because the application is not expecting any PGresult from this, the mechanism to consume it is a bit hackish. The other concept, authored by Horiguchi-san, is a PGASYNC_PIPELINE_IDLE state for libpq's state machine to differentiate "really idle" from merely "the idle state that occurs in between reading results from the server for elements in the pipeline". This makes libpq not go fully IDLE when the libpq command queue contains entries; in normal cases, we only go IDLE once at the end of the pipeline, when the server response to the final SYNC message is received. (However, there are corner cases it doesn't fix, such as terminating the query sequence by PQsendFlushRequest instead of PQpipelineSync; this sort of scenario is what requires PGQUERY_CLOSE bit above.) This last bit helps make the libpq state machine clearer; in particular we can get rid of an ugly hack in pqParseInput3 to avoid considering IDLE as such when the command queue contains entries. A new test mode is added to libpq_pipeline.c to tickle some related problematic cases. Reported-by: Daniele Varrazzo <daniele.varrazzo@gmail.com> Co-authored-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Discussion: https://postgr.es/m/CA+mi_8bvD0_CW3sumgwPvWdNzXY32itoG_16tDYRu_1S2gV2iw@mail.gmail.com
1 parent fe889fd commit 30f079a

6 files changed

Lines changed: 425 additions & 38 deletions

File tree

src/interfaces/libpq/fe-exec.c

Lines changed: 97 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,7 +1313,8 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
13131313
* itself consume commands from the queue; if we're in any other
13141314
* state, we don't have to do anything.
13151315
*/
1316-
if (conn->asyncStatus == PGASYNC_IDLE)
1316+
if (conn->asyncStatus == PGASYNC_IDLE ||
1317+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE)
13171318
{
13181319
resetPQExpBuffer(&conn->errorMessage);
13191320
pqPipelineProcessQueue(conn);
@@ -1372,6 +1373,7 @@ int
13721373
PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13731374
{
13741375
PGcmdQueueEntry *entry = NULL;
1376+
PGcmdQueueEntry *entry2 = NULL;
13751377

13761378
if (!PQsendQueryStart(conn, newQuery))
13771379
return 0;
@@ -1387,6 +1389,12 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
13871389
entry = pqAllocCmdQueueEntry(conn);
13881390
if (entry == NULL)
13891391
return 0; /* error msg already set */
1392+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1393+
{
1394+
entry2 = pqAllocCmdQueueEntry(conn);
1395+
if (entry2 == NULL)
1396+
goto sendFailed;
1397+
}
13901398

13911399
/* Send the query message(s) */
13921400
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
@@ -1456,6 +1464,20 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
14561464

14571465
/* OK, it's launched! */
14581466
pqAppendCmdQueueEntry(conn, entry);
1467+
1468+
/*
1469+
* When pipeline mode is in use, we need a second entry in the command
1470+
* queue to represent Close Portal message. This allows us later to wait
1471+
* for the CloseComplete message to be received before getting in IDLE
1472+
* state.
1473+
*/
1474+
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
1475+
{
1476+
entry2->queryclass = PGQUERY_CLOSE;
1477+
entry2->query = NULL;
1478+
pqAppendCmdQueueEntry(conn, entry2);
1479+
}
1480+
14591481
return 1;
14601482

14611483
sendFailed:
@@ -1702,11 +1724,13 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
17021724
switch (conn->asyncStatus)
17031725
{
17041726
case PGASYNC_IDLE:
1727+
case PGASYNC_PIPELINE_IDLE:
17051728
case PGASYNC_READY:
17061729
case PGASYNC_READY_MORE:
17071730
case PGASYNC_BUSY:
17081731
/* ok to queue */
17091732
break;
1733+
17101734
case PGASYNC_COPY_IN:
17111735
case PGASYNC_COPY_OUT:
17121736
case PGASYNC_COPY_BOTH:
@@ -2082,19 +2106,22 @@ PQgetResult(PGconn *conn)
20822106
{
20832107
case PGASYNC_IDLE:
20842108
res = NULL; /* query is complete */
2085-
if (conn->pipelineStatus != PQ_PIPELINE_OFF)
2086-
{
2087-
/*
2088-
* We're about to return the NULL that terminates the round of
2089-
* results from the current query; prepare to send the results
2090-
* of the next query when we're called next. Also, since this
2091-
* is the start of the results of the next query, clear any
2092-
* prior error message.
2093-
*/
2094-
resetPQExpBuffer(&conn->errorMessage);
2095-
pqPipelineProcessQueue(conn);
2096-
}
20972109
break;
2110+
case PGASYNC_PIPELINE_IDLE:
2111+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
2112+
2113+
/*
2114+
* We're about to return the NULL that terminates the round of
2115+
* results from the current query; prepare to send the results
2116+
* of the next query, if any, when we're called next. If there's
2117+
* no next element in the command queue, this gets us in IDLE
2118+
* state.
2119+
*/
2120+
resetPQExpBuffer(&conn->errorMessage);
2121+
pqPipelineProcessQueue(conn);
2122+
res = NULL; /* query is complete */
2123+
break;
2124+
20982125
case PGASYNC_READY:
20992126

21002127
/*
@@ -2115,7 +2142,7 @@ PQgetResult(PGconn *conn)
21152142
* We're about to send the results of the current query. Set
21162143
* us idle now, and ...
21172144
*/
2118-
conn->asyncStatus = PGASYNC_IDLE;
2145+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
21192146

21202147
/*
21212148
* ... in cases when we're sending a pipeline-sync result,
@@ -2159,6 +2186,22 @@ PQgetResult(PGconn *conn)
21592186
break;
21602187
}
21612188

2189+
/* If the next command we expect is CLOSE, read and consume it */
2190+
if (conn->asyncStatus == PGASYNC_PIPELINE_IDLE &&
2191+
conn->cmd_queue_head &&
2192+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
2193+
{
2194+
if (res && res->resultStatus != PGRES_FATAL_ERROR)
2195+
{
2196+
conn->asyncStatus = PGASYNC_BUSY;
2197+
parseInput(conn);
2198+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
2199+
}
2200+
else
2201+
/* we won't ever see the Close */
2202+
pqCommandQueueAdvance(conn);
2203+
}
2204+
21622205
if (res)
21632206
{
21642207
int i;
@@ -2967,7 +3010,10 @@ PQexitPipelineMode(PGconn *conn)
29673010
if (!conn)
29683011
return 0;
29693012

2970-
if (conn->pipelineStatus == PQ_PIPELINE_OFF)
3013+
if (conn->pipelineStatus == PQ_PIPELINE_OFF &&
3014+
(conn->asyncStatus == PGASYNC_IDLE ||
3015+
conn->asyncStatus == PGASYNC_PIPELINE_IDLE) &&
3016+
conn->cmd_queue_head == NULL)
29713017
return 1;
29723018

29733019
switch (conn->asyncStatus)
@@ -2984,9 +3030,16 @@ PQexitPipelineMode(PGconn *conn)
29843030
libpq_gettext("cannot exit pipeline mode while busy\n"));
29853031
return 0;
29863032

2987-
default:
3033+
case PGASYNC_IDLE:
3034+
case PGASYNC_PIPELINE_IDLE:
29883035
/* OK */
29893036
break;
3037+
3038+
case PGASYNC_COPY_IN:
3039+
case PGASYNC_COPY_OUT:
3040+
case PGASYNC_COPY_BOTH:
3041+
appendPQExpBufferStr(&conn->errorMessage,
3042+
libpq_gettext("cannot exit pipeline mode while in COPY\n"));
29903043
}
29913044

29923045
/* still work to process */
@@ -3023,6 +3076,10 @@ pqCommandQueueAdvance(PGconn *conn)
30233076
prevquery = conn->cmd_queue_head;
30243077
conn->cmd_queue_head = conn->cmd_queue_head->next;
30253078

3079+
/* If the queue is now empty, reset the tail too */
3080+
if (conn->cmd_queue_head == NULL)
3081+
conn->cmd_queue_tail = NULL;
3082+
30263083
/* and make it recyclable */
30273084
prevquery->next = NULL;
30283085
pqRecycleCmdQueueEntry(conn, prevquery);
@@ -3045,15 +3102,35 @@ pqPipelineProcessQueue(PGconn *conn)
30453102
case PGASYNC_BUSY:
30463103
/* client still has to process current query or results */
30473104
return;
3105+
30483106
case PGASYNC_IDLE:
3107+
/*
3108+
* If we're in IDLE mode and there's some command in the queue,
3109+
* get us into PIPELINE_IDLE mode and process normally. Otherwise
3110+
* there's nothing for us to do.
3111+
*/
3112+
if (conn->cmd_queue_head != NULL)
3113+
{
3114+
conn->asyncStatus = PGASYNC_PIPELINE_IDLE;
3115+
break;
3116+
}
3117+
return;
3118+
3119+
case PGASYNC_PIPELINE_IDLE:
3120+
Assert(conn->pipelineStatus != PQ_PIPELINE_OFF);
30493121
/* next query please */
30503122
break;
30513123
}
30523124

3053-
/* Nothing to do if not in pipeline mode, or queue is empty */
3054-
if (conn->pipelineStatus == PQ_PIPELINE_OFF ||
3055-
conn->cmd_queue_head == NULL)
3125+
/*
3126+
* If there are no further commands to process in the queue, get us in
3127+
* "real idle" mode now.
3128+
*/
3129+
if (conn->cmd_queue_head == NULL)
3130+
{
3131+
conn->asyncStatus = PGASYNC_IDLE;
30563132
return;
3133+
}
30573134

30583135
/* Initialize async result-accumulation state */
30593136
pqClearAsyncResult(conn);
@@ -3140,6 +3217,7 @@ PQpipelineSync(PGconn *conn)
31403217
case PGASYNC_READY_MORE:
31413218
case PGASYNC_BUSY:
31423219
case PGASYNC_IDLE:
3220+
case PGASYNC_PIPELINE_IDLE:
31433221
/* OK to send sync */
31443222
break;
31453223
}

src/interfaces/libpq/fe-protocol3.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,6 @@ pqParseInput3(PGconn *conn)
192192
if (conn->asyncStatus != PGASYNC_IDLE)
193193
return;
194194

195-
/*
196-
* We're also notionally not-IDLE when in pipeline mode the state
197-
* says "idle" (so we have completed receiving the results of one
198-
* query from the server and dispatched them to the application)
199-
* but another query is queued; yield back control to caller so
200-
* that they can initiate processing of the next query in the
201-
* queue.
202-
*/
203-
if (conn->pipelineStatus != PQ_PIPELINE_OFF &&
204-
conn->cmd_queue_head != NULL)
205-
return;
206-
207195
/*
208196
* Unexpected message in IDLE state; need to recover somehow.
209197
* ERROR messages are handled using the notice processor;
@@ -330,8 +318,24 @@ pqParseInput3(PGconn *conn)
330318
}
331319
break;
332320
case '2': /* Bind Complete */
321+
/* Nothing to do for this message type */
322+
break;
333323
case '3': /* Close Complete */
334-
/* Nothing to do for these message types */
324+
/*
325+
* If we get CloseComplete when waiting for it, consume
326+
* the queue element and keep going. A result is not
327+
* expected from this message; it is just there so that
328+
* we know to wait for it when PQsendQuery is used in
329+
* pipeline mode, before going in IDLE state. Failing to
330+
* do this makes us receive CloseComplete when IDLE, which
331+
* creates problems.
332+
*/
333+
if (conn->cmd_queue_head &&
334+
conn->cmd_queue_head->queryclass == PGQUERY_CLOSE)
335+
{
336+
pqCommandQueueAdvance(conn);
337+
}
338+
335339
break;
336340
case 'S': /* parameter status */
337341
if (getParameterStatus(conn))

src/interfaces/libpq/libpq-int.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ typedef enum
268268
* query */
269269
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
270270
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
271-
PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
271+
PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
272+
PGASYNC_PIPELINE_IDLE, /* "Idle" between commands in pipeline mode */
272273
} PGAsyncStatusType;
273274

274275
/* Target server type (decoded value of target_session_attrs) */
@@ -354,7 +355,8 @@ typedef enum
354355
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
355356
PGQUERY_PREPARE, /* Parse only (PQprepare) */
356357
PGQUERY_DESCRIBE, /* Describe Statement or Portal */
357-
PGQUERY_SYNC /* Sync (at end of a pipeline) */
358+
PGQUERY_SYNC, /* Sync (at end of a pipeline) */
359+
PGQUERY_CLOSE
358360
} PGQueryClass;
359361

360362
/*

0 commit comments

Comments
 (0)