Skip to content

Commit bbbd87a

Browse files
Copilotithewei
andcommitted
Add hio_sendfile() API for zero-copy file transfer using sendfile(2)
Implements: - Platform-abstracted sendfile in nio.c (Linux sendfile, macOS/FreeBSD sendfile, generic pread+write fallback) - hio_sendfile() public API in hloop.h with write queue integration - Sendfile state tracking in hio_t (sendfile_fd, sendfile_offset, sendfile_remain) - SSL fallback using pread + hio_write - Windows fallback in overlapio.c - Integration into HTTP large file handler for unlimited rate mode - hio_write_bufsize accounts for sendfile remaining bytes Co-authored-by: ithewei <26049660+ithewei@users.noreply.github.com> Agent-Logs-Url: https://github.com/ithewei/libhv/sessions/e59504ed-e745-410d-a8ab-e99946d80a3c
1 parent 17b80d2 commit bbbd87a

6 files changed

Lines changed: 205 additions & 4 deletions

File tree

event/hevent.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ void hio_init(hio_t* io) {
7777
// write_queue_init(&io->write_queue, 4);
7878

7979
hrecursive_mutex_init(&io->write_mutex);
80+
io->sendfile_fd = -1;
81+
io->sendfile_offset = 0;
82+
io->sendfile_remain = 0;
8083
}
8184

8285
void hio_ready(hio_t* io) {
@@ -752,7 +755,7 @@ void hio_set_max_write_bufsize(hio_t* io, uint32_t size) {
752755
}
753756

754757
size_t hio_write_bufsize(hio_t* io) {
755-
return io->write_bufsize;
758+
return io->write_bufsize + io->sendfile_remain;
756759
}
757760

758761
int hio_read_once (hio_t* io) {

event/hevent.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ struct hio_s {
175175
char* hostname; // for hssl_set_sni_hostname
176176
// context
177177
void* ctx; // for hio_context / hio_set_context
178+
// sendfile
179+
int sendfile_fd; // file descriptor for sendfile, -1 when inactive
180+
off_t sendfile_offset; // current offset in file
181+
size_t sendfile_remain; // remaining bytes to send
178182
// private:
179183
#if defined(EVENT_POLL) || defined(EVENT_KQUEUE)
180184
int event_index[2]; // for poll,kqueue

event/hloop.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,15 @@ HV_EXPORT int hio_read_remain(hio_t* io);
380380
HV_EXPORT int hio_write (hio_t* io, const void* buf, size_t len);
381381
HV_EXPORT int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr);
382382

383+
// NOTE: hio_sendfile uses zero-copy sendfile(2) on Linux, sendfile(2) on macOS/FreeBSD.
384+
// Falls back to read+write for SSL connections and unsupported platforms.
385+
// @param in_fd: file descriptor of the file to send from
386+
// @param offset: starting offset in the file
387+
// @param length: number of bytes to send
388+
// @return 0 on success (async operation started), -1 on error
389+
// hwrite_cb is called as data is sent. When complete, write_queue is empty.
390+
HV_EXPORT int hio_sendfile(hio_t* io, int in_fd, off_t offset, size_t length);
391+
383392
// NOTE: hio_close is thread-safe, hio_close_async will be called actually in other thread.
384393
// hio_del(io, HV_RDWR) => close => hclose_cb
385394
HV_EXPORT int hio_close (hio_t* io);

event/nio.c

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@
77
#include "herr.h"
88
#include "hthread.h"
99

10+
#ifdef OS_LINUX
11+
#include <sys/sendfile.h>
12+
#endif
13+
#if defined(OS_DARWIN) || defined(OS_FREEBSD)
14+
#include <sys/types.h>
15+
#include <sys/socket.h>
16+
#include <sys/uio.h>
17+
#endif
18+
1019
static void __connect_timeout_cb(htimer_t* timer) {
1120
hio_t* io = (hio_t*)timer->privdata;
1221
if (io) {
@@ -304,6 +313,50 @@ static int __nio_write(hio_t* io, const void* buf, int len, struct sockaddr* add
304313
return nwrite;
305314
}
306315

316+
// Platform-abstracted sendfile: returns bytes sent, -1 on error
317+
static ssize_t __nio_sendfile_sys(int out_fd, int in_fd, off_t* offset, size_t count) {
318+
#ifdef OS_LINUX
319+
return sendfile(out_fd, in_fd, offset, count);
320+
#elif defined(OS_DARWIN)
321+
off_t len = count;
322+
int ret = sendfile(in_fd, out_fd, *offset, &len, NULL, 0);
323+
if (ret == 0 || (ret == -1 && errno == EAGAIN && len > 0)) {
324+
*offset += len;
325+
return (ssize_t)len;
326+
}
327+
return -1;
328+
#elif defined(OS_FREEBSD)
329+
off_t sbytes = 0;
330+
int ret = sendfile(in_fd, out_fd, *offset, count, NULL, &sbytes, 0);
331+
if (ret == 0 || (ret == -1 && errno == EAGAIN && sbytes > 0)) {
332+
*offset += sbytes;
333+
return (ssize_t)sbytes;
334+
}
335+
return -1;
336+
#else
337+
// Fallback: read + write
338+
char buf[65536];
339+
size_t to_read = count < sizeof(buf) ? count : sizeof(buf);
340+
ssize_t nread = pread(in_fd, buf, to_read, *offset);
341+
if (nread <= 0) return nread;
342+
ssize_t nwrite = write(out_fd, buf, nread);
343+
if (nwrite > 0) {
344+
*offset += nwrite;
345+
}
346+
return nwrite;
347+
#endif
348+
}
349+
350+
// Try sendfile for the current io, called with write_mutex held
351+
// Returns: > 0 bytes sent, 0 if nothing sent, < 0 on error
352+
static ssize_t __nio_sendfile(hio_t* io) {
353+
ssize_t nsent = __nio_sendfile_sys(io->fd, io->sendfile_fd, &io->sendfile_offset, io->sendfile_remain);
354+
if (nsent > 0) {
355+
io->sendfile_remain -= nsent;
356+
}
357+
return nsent;
358+
}
359+
307360
static void nio_read(hio_t* io) {
308361
// printd("nio_read fd=%d\n", io->fd);
309362
void* buf;
@@ -361,6 +414,10 @@ static void nio_write(hio_t* io) {
361414
hrecursive_mutex_lock(&io->write_mutex);
362415
write:
363416
if (write_queue_empty(&io->write_queue)) {
417+
// Check for pending sendfile after write queue is drained
418+
if (io->sendfile_fd >= 0 && io->sendfile_remain > 0) {
419+
goto do_sendfile;
420+
}
364421
hrecursive_mutex_unlock(&io->write_mutex);
365422
if (io->close) {
366423
io->close = 0;
@@ -407,6 +464,32 @@ static void nio_write(hio_t* io) {
407464
}
408465
hrecursive_mutex_unlock(&io->write_mutex);
409466
return;
467+
468+
do_sendfile:
469+
{
470+
ssize_t nsent = __nio_sendfile(io);
471+
if (nsent < 0) {
472+
err = socket_errno();
473+
if (err == EAGAIN || err == EINTR) {
474+
hrecursive_mutex_unlock(&io->write_mutex);
475+
return;
476+
}
477+
io->error = err;
478+
goto write_error;
479+
}
480+
if (nsent == 0 && (io->io_type & HIO_TYPE_SOCK_STREAM)) {
481+
goto disconnect;
482+
}
483+
hrecursive_mutex_unlock(&io->write_mutex);
484+
if (nsent > 0) {
485+
__write_cb(io, NULL, nsent);
486+
}
487+
if (io->sendfile_remain == 0) {
488+
io->sendfile_fd = -1;
489+
}
490+
return;
491+
}
492+
410493
write_error:
411494
disconnect:
412495
hrecursive_mutex_unlock(&io->write_mutex);
@@ -426,9 +509,10 @@ static void hio_handle_events(hio_t* io) {
426509
}
427510

428511
if ((io->events & HV_WRITE) && (io->revents & HV_WRITE)) {
429-
// NOTE: del HV_WRITE, if write_queue empty
512+
// NOTE: del HV_WRITE, if write_queue empty and no pending sendfile
430513
hrecursive_mutex_lock(&io->write_mutex);
431-
if (write_queue_empty(&io->write_queue)) {
514+
if (write_queue_empty(&io->write_queue) &&
515+
!(io->sendfile_fd >= 0 && io->sendfile_remain > 0)) {
432516
hio_del(io, HV_WRITE);
433517
}
434518
hrecursive_mutex_unlock(&io->write_mutex);
@@ -590,6 +674,72 @@ int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
590674
return hio_write4(io, buf, len, addr ? addr : io->peeraddr);
591675
}
592676

677+
int hio_sendfile (hio_t* io, int in_fd, off_t offset, size_t length) {
678+
if (io->closed) {
679+
hloge("hio_sendfile called but fd[%d] already closed!", io->fd);
680+
return -1;
681+
}
682+
if (length == 0) return 0;
683+
684+
// SSL: fall back to read + hio_write (sendfile cannot bypass SSL encryption)
685+
if (io->io_type == HIO_TYPE_SSL) {
686+
char buf[65536];
687+
off_t cur_offset = offset;
688+
size_t remaining = length;
689+
while (remaining > 0) {
690+
size_t to_read = remaining < sizeof(buf) ? remaining : sizeof(buf);
691+
ssize_t nread = pread(in_fd, buf, to_read, cur_offset);
692+
if (nread <= 0) {
693+
hloge("hio_sendfile pread error!");
694+
return -1;
695+
}
696+
int nwrite = hio_write(io, buf, nread);
697+
if (nwrite < 0) return nwrite;
698+
cur_offset += nread;
699+
remaining -= nread;
700+
}
701+
return 0;
702+
}
703+
704+
hrecursive_mutex_lock(&io->write_mutex);
705+
706+
io->sendfile_fd = in_fd;
707+
io->sendfile_offset = offset;
708+
io->sendfile_remain = length;
709+
710+
// If write queue is empty, try sendfile immediately
711+
if (write_queue_empty(&io->write_queue)) {
712+
ssize_t nsent = __nio_sendfile(io);
713+
if (nsent < 0) {
714+
int err = socket_errno();
715+
if (err != EAGAIN && err != EINTR) {
716+
io->error = err;
717+
io->sendfile_fd = -1;
718+
hrecursive_mutex_unlock(&io->write_mutex);
719+
hio_close_async(io);
720+
return -1;
721+
}
722+
}
723+
if (nsent > 0) {
724+
hrecursive_mutex_unlock(&io->write_mutex);
725+
__write_cb(io, NULL, nsent);
726+
if (io->sendfile_remain == 0) {
727+
io->sendfile_fd = -1;
728+
return 0;
729+
}
730+
hrecursive_mutex_lock(&io->write_mutex);
731+
}
732+
}
733+
734+
// If still remaining, register for writable event
735+
if (io->sendfile_remain > 0) {
736+
hio_add(io, hio_handle_events, HV_WRITE);
737+
}
738+
739+
hrecursive_mutex_unlock(&io->write_mutex);
740+
return 0;
741+
}
742+
593743
int hio_close (hio_t* io) {
594744
if (io->closed) return 0;
595745
if (io->destroy == 0 && hv_gettid() != io->loop->tid) {
@@ -611,6 +761,8 @@ int hio_close (hio_t* io) {
611761
return 0;
612762
}
613763
io->closed = 1;
764+
io->sendfile_fd = -1;
765+
io->sendfile_remain = 0;
614766
hrecursive_mutex_unlock(&io->write_mutex);
615767

616768
hio_done(io);

event/overlapio.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,25 @@ int hio_sendto (hio_t* io, const void* buf, size_t len, struct sockaddr* addr) {
387387
return hio_write4(io, buf, len, addr ? addr : io->peeraddr);
388388
}
389389

390+
int hio_sendfile (hio_t* io, int in_fd, off_t offset, size_t length) {
391+
if (io->closed) return -1;
392+
if (length == 0) return 0;
393+
// Fallback: read + hio_write
394+
char buf[65536];
395+
off_t cur_offset = offset;
396+
size_t remaining = length;
397+
while (remaining > 0) {
398+
size_t to_read = remaining < sizeof(buf) ? remaining : sizeof(buf);
399+
ssize_t nread = _lseeki64(in_fd, cur_offset, SEEK_SET) >= 0 ? _read(in_fd, buf, (unsigned int)to_read) : -1;
400+
if (nread <= 0) return -1;
401+
int nwrite = hio_write(io, buf, nread);
402+
if (nwrite < 0) return nwrite;
403+
cur_offset += nread;
404+
remaining -= nread;
405+
}
406+
return 0;
407+
}
408+
390409
int hio_close (hio_t* io) {
391410
if (io->closed) return 0;
392411
io->closed = 1;

http/server/HttpHandler.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,11 +659,25 @@ int HttpHandler::defaultLargeFileHandler(const std::string &filepath) {
659659
// forbidden to send large file
660660
resp->content_length = 0;
661661
resp->status_code = HTTP_STATUS_FORBIDDEN;
662+
} else if (service->limit_rate < 0 && file->fp) {
663+
// unlimited: use zero-copy sendfile if possible
664+
int filefd = fileno(file->fp);
665+
size_t length = resp->content_length;
666+
writer->EndHeaders();
667+
writer->onwrite = [this](HBuf* buf) {
668+
if (writer->isWriteComplete()) {
669+
resp->content_length = 0;
670+
writer->End();
671+
closeFile();
672+
}
673+
};
674+
hio_sendfile(io, filefd, 0, length);
675+
return HTTP_STATUS_UNFINISHED;
662676
} else {
663677
size_t bufsize = 40960; // 40K
664678
file->buf.resize(bufsize);
665679
if (service->limit_rate < 0) {
666-
// unlimited: sendFile when writable
680+
// unlimited: sendFile when writable (fallback when fileno unavailable)
667681
writer->onwrite = [this](HBuf* buf) {
668682
if (writer->isWriteComplete()) {
669683
sendFile();

0 commit comments

Comments
 (0)