Skip to content

Commit 6e7dd66

Browse files
committed
Reuse single request parser for all requests
This changeset is in preparation for upcoming refactorings to move unrelated logic out of the parser class to prepare for persistent HTTP connections in follow-up PR. This changeset does not affect the public API and happens to improves performance slightly from around 9000 req/s to 9200 req/s on my machine (best of 5).
1 parent 2fe19c4 commit 6e7dd66

3 files changed

Lines changed: 230 additions & 138 deletions

File tree

src/Io/RequestHeaderParser.php

Lines changed: 59 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Evenement\EventEmitter;
66
use Psr\Http\Message\ServerRequestInterface;
7+
use React\Socket\ConnectionInterface;
78
use RingCentral\Psr7 as g7;
89
use Exception;
910

@@ -20,50 +21,73 @@
2021
*/
2122
class RequestHeaderParser extends EventEmitter
2223
{
23-
private $buffer = '';
2424
private $maxSize = 8192;
2525

26-
private $localSocketUri;
27-
private $remoteSocketUri;
28-
29-
public function __construct($localSocketUri = null, $remoteSocketUri = null)
26+
public function handle(ConnectionInterface $conn)
3027
{
31-
$this->localSocketUri = $localSocketUri;
32-
$this->remoteSocketUri = $remoteSocketUri;
33-
}
28+
$buffer = '';
29+
$maxSize = $this->maxSize;
30+
$that = $this;
31+
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
32+
// append chunk of data to buffer and look for end of request headers
33+
$buffer .= $data;
34+
$endOfHeader = \strpos($buffer, "\r\n\r\n");
35+
36+
// reject request if buffer size is exceeded
37+
if ($endOfHeader > $maxSize || ($endOfHeader === false && isset($buffer[$maxSize]))) {
38+
$conn->removeListener('data', $fn);
39+
$fn = null;
40+
41+
$that->emit('error', array(
42+
new \OverflowException("Maximum header size of {$maxSize} exceeded.", 431),
43+
$conn
44+
));
45+
return;
46+
}
3447

35-
public function feed($data)
36-
{
37-
$this->buffer .= $data;
38-
$endOfHeader = \strpos($this->buffer, "\r\n\r\n");
48+
// ignore incomplete requests
49+
if ($endOfHeader === false) {
50+
return;
51+
}
3952

40-
if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($this->buffer[$this->maxSize]))) {
41-
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431), $this));
42-
$this->removeAllListeners();
43-
return;
44-
}
53+
// request headers received => try to parse request
54+
$conn->removeListener('data', $fn);
55+
$fn = null;
4556

46-
if (false !== $endOfHeader) {
4757
try {
48-
$request = $this->parseRequest((string)\substr($this->buffer, 0, $endOfHeader));
58+
$request = $that->parseRequest(
59+
(string)\substr($buffer, 0, $endOfHeader),
60+
$conn->getRemoteAddress(),
61+
$conn->getLocalAddress()
62+
);
4963
} catch (Exception $exception) {
50-
$this->emit('error', array($exception));
51-
$this->removeAllListeners();
64+
$buffer = '';
65+
$that->emit('error', array(
66+
$exception,
67+
$conn
68+
));
5269
return;
5370
}
5471

55-
$bodyBuffer = isset($this->buffer[$endOfHeader + 4]) ? \substr($this->buffer, $endOfHeader + 4) : '';
56-
$this->emit('headers', array($request, $bodyBuffer));
57-
$this->removeAllListeners();
58-
}
72+
$bodyBuffer = isset($buffer[$endOfHeader + 4]) ? \substr($buffer, $endOfHeader + 4) : '';
73+
$buffer = '';
74+
$that->emit('headers', array($request, $bodyBuffer, $conn));
75+
});
76+
77+
$conn->on('close', function () use (&$buffer, &$fn) {
78+
$fn = $buffer = null;
79+
});
5980
}
6081

6182
/**
6283
* @param string $headers buffer string containing request headers only
63-
* @throws \InvalidArgumentException
84+
* @param ?string $remoteSocketUri
85+
* @param ?string $localSocketUri
6486
* @return ServerRequestInterface
87+
* @throws \InvalidArgumentException
88+
* @internal
6589
*/
66-
private function parseRequest($headers)
90+
public function parseRequest($headers, $remoteSocketUri, $localSocketUri)
6791
{
6892
// additional, stricter safe-guard for request line
6993
// because request parser doesn't properly cope with invalid ones
@@ -103,22 +127,22 @@ private function parseRequest($headers)
103127

104128
// apply REMOTE_ADDR and REMOTE_PORT if source address is known
105129
// address should always be known, unless this is over Unix domain sockets (UDS)
106-
if ($this->remoteSocketUri !== null) {
107-
$remoteAddress = \parse_url($this->remoteSocketUri);
130+
if ($remoteSocketUri !== null) {
131+
$remoteAddress = \parse_url($remoteSocketUri);
108132
$serverParams['REMOTE_ADDR'] = $remoteAddress['host'];
109133
$serverParams['REMOTE_PORT'] = $remoteAddress['port'];
110134
}
111135

112136
// apply SERVER_ADDR and SERVER_PORT if server address is known
113137
// address should always be known, even for Unix domain sockets (UDS)
114138
// but skip UDS as it doesn't have a concept of host/port.s
115-
if ($this->localSocketUri !== null) {
116-
$localAddress = \parse_url($this->localSocketUri);
139+
if ($localSocketUri !== null) {
140+
$localAddress = \parse_url($localSocketUri);
117141
if (isset($localAddress['host'], $localAddress['port'])) {
118142
$serverParams['SERVER_ADDR'] = $localAddress['host'];
119143
$serverParams['SERVER_PORT'] = $localAddress['port'];
120144
}
121-
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'https') {
145+
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'tls') {
122146
$serverParams['HTTPS'] = 'on';
123147
}
124148
}
@@ -177,7 +201,7 @@ private function parseRequest($headers)
177201

178202
// set URI components from socket address if not already filled via Host header
179203
if ($request->getUri()->getHost() === '') {
180-
$parts = \parse_url($this->localSocketUri);
204+
$parts = \parse_url($localSocketUri);
181205
if (!isset($parts['host'], $parts['port'])) {
182206
$parts = array('host' => '127.0.0.1', 'port' => 80);
183207
}
@@ -198,8 +222,8 @@ private function parseRequest($headers)
198222
}
199223

200224
// Update request URI to "https" scheme if the connection is encrypted
201-
$parts = \parse_url($this->localSocketUri);
202-
if (isset($parts['scheme']) && $parts['scheme'] === 'https') {
225+
$parts = \parse_url($localSocketUri);
226+
if (isset($parts['scheme']) && $parts['scheme'] === 'tls') {
203227
// The request URI may omit default ports here, so try to parse port
204228
// from Host header field (if possible)
205229
$port = $request->getUri()->getPort();

src/StreamingServer.php

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
final class StreamingServer extends EventEmitter
8888
{
8989
private $callback;
90+
private $parser;
9091

9192
/**
9293
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -108,6 +109,27 @@ public function __construct($requestHandler)
108109
}
109110

110111
$this->callback = $requestHandler;
112+
$this->parser = new RequestHeaderParser();
113+
114+
$that = $this;
115+
$this->parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer, ConnectionInterface $conn) use ($that) {
116+
$that->handleRequest($conn, $request);
117+
118+
if ($bodyBuffer !== '') {
119+
$conn->emit('data', array($bodyBuffer));
120+
}
121+
});
122+
123+
$this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
124+
$that->emit('error', array($e));
125+
126+
// parsing failed => assume dummy request and send appropriate error
127+
$that->writeError(
128+
$conn,
129+
$e->getCode() !== 0 ? $e->getCode() : 400,
130+
new ServerRequest('GET', '/')
131+
);
132+
});
111133
}
112134

113135
/**
@@ -154,47 +176,7 @@ public function __construct($requestHandler)
154176
*/
155177
public function listen(ServerInterface $socket)
156178
{
157-
$socket->on('connection', array($this, 'handleConnection'));
158-
}
159-
160-
/** @internal */
161-
public function handleConnection(ConnectionInterface $conn)
162-
{
163-
$uriLocal = $conn->getLocalAddress();
164-
if ($uriLocal !== null) {
165-
// local URI known, so translate transport scheme to application scheme
166-
$uriLocal = \strtr($uriLocal, array('tcp://' => 'http://', 'tls://' => 'https://'));
167-
}
168-
169-
$uriRemote = $conn->getRemoteAddress();
170-
171-
$that = $this;
172-
$parser = new RequestHeaderParser($uriLocal, $uriRemote);
173-
174-
$listener = array($parser, 'feed');
175-
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
176-
// parsing request completed => stop feeding parser
177-
$conn->removeListener('data', $listener);
178-
179-
$that->handleRequest($conn, $request);
180-
181-
if ($bodyBuffer !== '') {
182-
$conn->emit('data', array($bodyBuffer));
183-
}
184-
});
185-
186-
$conn->on('data', $listener);
187-
$parser->on('error', function(\Exception $e) use ($conn, $listener, $that) {
188-
$conn->removeListener('data', $listener);
189-
$that->emit('error', array($e));
190-
191-
// parsing failed => assume dummy request and send appropriate error
192-
$that->writeError(
193-
$conn,
194-
$e->getCode() !== 0 ? $e->getCode() : 400,
195-
new ServerRequest('GET', '/')
196-
);
197-
});
179+
$socket->on('connection', array($this->parser, 'handle'));
198180
}
199181

200182
/** @internal */

0 commit comments

Comments
 (0)