Skip to content

Commit 1b90ab6

Browse files
kesmit13claude
andcommitted
Fix broken pipe in collocated UDF server under concurrent load
Add poll()-based timeout to C recv_exact to avoid the interaction between Python's settimeout() (which sets O_NONBLOCK on the fd) and direct fd-level recv() in the C code. When the fd was non-blocking, recv() returned EAGAIN immediately when no data was available, which the C code treated as an error, closing the connection and causing EPIPE on the client side. - accel.c: Add optional timeout_ms parameter to recv_exact that uses poll(POLLIN) before each recv() call, raising TimeoutError on timeout. Also add mmap_read and mmap_write C helpers for fd-level I/O. - connection.py: Only call settimeout() for the Python fallback path; keep fd blocking for C accel path. Pass 100ms timeout to C recv_exact. Catch TimeoutError instead of socket.timeout. Replace select() loop with timeout-based recv. Add C accel paths for mmap read/write. Add optional per-request profiling via SINGLESTOREDB_UDF_PROFILE=1. - registry.py: Consolidate accel imports (mmap_read, mmap_write, recv_exact) under single _has_accel flag. - wasm.py: Update to use renamed _has_accel flag. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e4d8627 commit 1b90ab6

4 files changed

Lines changed: 315 additions & 45 deletions

File tree

accel.c

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11

22
#include <math.h>
3+
#include <poll.h>
34
#include <stdint.h>
45
#include <stdlib.h>
6+
#include <string.h>
7+
#include <sys/mman.h>
8+
#include <sys/socket.h>
9+
#include <unistd.h>
510
#include <Python.h>
611

712
#ifndef Py_LIMITED_API
@@ -5293,13 +5298,163 @@ static PyObject *call_function_accel(PyObject *self, PyObject *args, PyObject *k
52935298
goto exit;
52945299
}
52955300

5301+
/*
5302+
* mmap_read(fd, length) -> bytes
5303+
*
5304+
* Maps the given fd with MAP_SHARED|PROT_READ for `length` bytes,
5305+
* copies into a Python bytes object, and unmaps in a single C call.
5306+
* Eliminates Python mmap object creation/destruction overhead.
5307+
*/
5308+
static PyObject *accel_mmap_read(PyObject *self, PyObject *args) {
5309+
int fd;
5310+
Py_ssize_t length;
5311+
5312+
if (!PyArg_ParseTuple(args, "in", &fd, &length))
5313+
return NULL;
5314+
5315+
if (length <= 0) {
5316+
return PyBytes_FromStringAndSize(NULL, 0);
5317+
}
5318+
5319+
void *addr = mmap(NULL, (size_t)length, PROT_READ, MAP_SHARED, fd, 0);
5320+
if (addr == MAP_FAILED) {
5321+
PyErr_SetFromErrno(PyExc_OSError);
5322+
return NULL;
5323+
}
5324+
5325+
PyObject *result = PyBytes_FromStringAndSize((const char *)addr, length);
5326+
munmap(addr, (size_t)length);
5327+
return result;
5328+
}
5329+
5330+
/*
5331+
* mmap_write(fd, data, min_size) -> None
5332+
*
5333+
* Writes `data` to the file descriptor, combining ftruncate + lseek + write
5334+
* into a single C call. If min_size > 0, ftruncate is called with
5335+
* max(min_size, len(data)); if min_size == 0, ftruncate is skipped
5336+
* (caller manages file size).
5337+
*/
5338+
static PyObject *accel_mmap_write(PyObject *self, PyObject *args) {
5339+
int fd;
5340+
const char *data;
5341+
Py_ssize_t data_len;
5342+
Py_ssize_t min_size;
5343+
5344+
if (!PyArg_ParseTuple(args, "iy#n", &fd, &data, &data_len, &min_size))
5345+
return NULL;
5346+
5347+
if (min_size > 0) {
5348+
Py_ssize_t trunc_size = data_len > min_size ? data_len : min_size;
5349+
if (ftruncate(fd, (off_t)trunc_size) < 0) {
5350+
PyErr_SetFromErrno(PyExc_OSError);
5351+
return NULL;
5352+
}
5353+
}
5354+
5355+
if (lseek(fd, 0, SEEK_SET) < 0) {
5356+
PyErr_SetFromErrno(PyExc_OSError);
5357+
return NULL;
5358+
}
5359+
5360+
const char *p = data;
5361+
Py_ssize_t remaining = data_len;
5362+
while (remaining > 0) {
5363+
ssize_t written = write(fd, p, (size_t)remaining);
5364+
if (written < 0) {
5365+
PyErr_SetFromErrno(PyExc_OSError);
5366+
return NULL;
5367+
}
5368+
p += written;
5369+
remaining -= written;
5370+
}
5371+
5372+
Py_RETURN_NONE;
5373+
}
5374+
5375+
/*
5376+
* recv_exact(fd, n, timeout_ms=-1) -> bytes or None
5377+
*
5378+
* Receives exactly `n` bytes from a socket fd using blocking recv.
5379+
* Returns None on EOF (peer closed). Operates on raw fd to avoid
5380+
* Python socket object overhead. Releases the GIL during recv.
5381+
*
5382+
* When timeout_ms >= 0, uses poll() before each recv() to wait for
5383+
* data with a timeout. Raises TimeoutError on timeout. This allows
5384+
* the fd to remain in blocking mode while still supporting timeouts,
5385+
* avoiding the interaction between Python's settimeout() (which sets
5386+
* O_NONBLOCK) and direct fd-level recv().
5387+
*/
5388+
static PyObject *accel_recv_exact(PyObject *self, PyObject *args) {
5389+
int fd, timeout_ms = -1;
5390+
Py_ssize_t n;
5391+
5392+
if (!PyArg_ParseTuple(args, "in|i", &fd, &n, &timeout_ms))
5393+
return NULL;
5394+
5395+
if (n <= 0) {
5396+
return PyBytes_FromStringAndSize(NULL, 0);
5397+
}
5398+
5399+
char *buf = (char *)malloc((size_t)n);
5400+
if (!buf) {
5401+
PyErr_NoMemory();
5402+
return NULL;
5403+
}
5404+
5405+
Py_ssize_t pos = 0;
5406+
while (pos < n) {
5407+
if (timeout_ms >= 0) {
5408+
struct pollfd pfd = {fd, POLLIN, 0};
5409+
int poll_rc;
5410+
Py_BEGIN_ALLOW_THREADS
5411+
poll_rc = poll(&pfd, 1, timeout_ms);
5412+
Py_END_ALLOW_THREADS
5413+
if (poll_rc == 0) {
5414+
free(buf);
5415+
PyErr_SetString(PyExc_TimeoutError, "recv_exact timed out");
5416+
return NULL;
5417+
}
5418+
if (poll_rc < 0) {
5419+
free(buf);
5420+
PyErr_SetFromErrno(PyExc_OSError);
5421+
return NULL;
5422+
}
5423+
}
5424+
5425+
ssize_t received;
5426+
Py_BEGIN_ALLOW_THREADS
5427+
received = recv(fd, buf + pos, (size_t)(n - pos), 0);
5428+
Py_END_ALLOW_THREADS
5429+
5430+
if (received < 0) {
5431+
free(buf);
5432+
PyErr_SetFromErrno(PyExc_OSError);
5433+
return NULL;
5434+
}
5435+
if (received == 0) {
5436+
/* EOF */
5437+
free(buf);
5438+
Py_RETURN_NONE;
5439+
}
5440+
pos += received;
5441+
}
5442+
5443+
PyObject *result = PyBytes_FromStringAndSize(buf, n);
5444+
free(buf);
5445+
return result;
5446+
}
5447+
52965448
static PyMethodDef PyMySQLAccelMethods[] = {
52975449
{"read_rowdata_packet", (PyCFunction)read_rowdata_packet, METH_VARARGS | METH_KEYWORDS, "PyMySQL row data packet reader"},
52985450
{"dump_rowdat_1", (PyCFunction)dump_rowdat_1, METH_VARARGS | METH_KEYWORDS, "ROWDAT_1 formatter for external functions"},
52995451
{"load_rowdat_1", (PyCFunction)load_rowdat_1, METH_VARARGS | METH_KEYWORDS, "ROWDAT_1 parser for external functions"},
53005452
{"dump_rowdat_1_numpy", (PyCFunction)dump_rowdat_1_numpy, METH_VARARGS | METH_KEYWORDS, "ROWDAT_1 formatter for external functions which takes numpy.arrays"},
53015453
{"load_rowdat_1_numpy", (PyCFunction)load_rowdat_1_numpy, METH_VARARGS | METH_KEYWORDS, "ROWDAT_1 parser for external functions which creates numpy.arrays"},
53025454
{"call_function_accel", (PyCFunction)call_function_accel, METH_VARARGS | METH_KEYWORDS, "Combined load/call/dump for UDF function calls"},
5455+
{"mmap_read", (PyCFunction)accel_mmap_read, METH_VARARGS, "mmap read: maps fd, copies data, unmaps"},
5456+
{"mmap_write", (PyCFunction)accel_mmap_write, METH_VARARGS, "mmap write: ftruncate+lseek+write in one call"},
5457+
{"recv_exact", (PyCFunction)accel_recv_exact, METH_VARARGS, "Receive exactly N bytes from a socket fd"},
53035458
{NULL, NULL, 0, NULL}
53045459
};
53055460

0 commit comments

Comments
 (0)