Skip to content

Commit 43daa35

Browse files
committed
implement dlq to queue
Signed-off-by: bota <Bota@dotkernel.com>
1 parent 9428e32 commit 43daa35

4 files changed

Lines changed: 95 additions & 133 deletions

File tree

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,41 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
use Netglue\PsrContainer\Messenger\Container\TransportFactory;
6+
use Psr\Container\ContainerInterface;
47
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
58
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;
69

710
return [
8-
"symfony" => [
11+
"symfony" => [
912
"messenger" => [
10-
"transports" => [
11-
"redis_transport" => [
12-
'dsn' => 'redis://127.0.0.1:6379/messages',
13-
'options' => [], // Redis specific options
13+
'transports' => [
14+
'redis_transport' => [
15+
'dsn' => 'redis://127.0.0.1:6379/messages',
16+
'serializer' => SymfonySerializer::class,
17+
'retry_strategy' => [
18+
'max_retries' => 3, //maximum number of times a message will be retried after the first failure
19+
'delay' => 1000 // initial delay before retrying a failed message, in milliseconds,
20+
'multiplier' => 2 // factor to increase the delay for each subsequent retry,
21+
'max_delay' => 0 // maximum delay between retries, in milliseconds,
22+
],
23+
],
24+
// separate transport for failed messages
25+
'failed' => [
26+
'dsn' => 'redis://127.0.0.1:6379/failed',
1427
'serializer' => SymfonySerializer::class,
15-
]
16-
]
17-
]
28+
],
29+
],
30+
// tells Messenger that the transport to store failed messages is "failed"
31+
'failure_transport' => 'failed',
32+
],
1833
],
1934
"dependencies" => [
2035
"factories" => [
21-
"redis_transport" => [TransportFactory::class, 'redis_transport'],
22-
SymfonySerializer::class => fn(\Psr\Container\ContainerInterface $container) => new PhpSerializer()
23-
]
24-
]
25-
];
36+
"redis_transport" => [TransportFactory::class, 'redis_transport'],
37+
"failed" => [TransportFactory::class, 'failed'],
38+
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
39+
],
40+
],
41+
];

src/App/Message/Message.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,9 @@ public function getPayload(): array
1515
{
1616
return $this->payload;
1717
}
18+
19+
public function setPayload(array $payload): void
20+
{
21+
$this->payload = $payload;
22+
}
1823
}

src/App/Message/MessageHandler.php

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66

77
use Dot\DependencyInjection\Attribute\Inject;
88
use Dot\Log\Logger;
9-
use Symfony\Component\Messenger\Exception\ExceptionInterface;
109
use Symfony\Component\Messenger\MessageBusInterface;
11-
use Symfony\Component\Messenger\Stamp\DelayStamp;
10+
use Throwable;
1211

1312
class MessageHandler
1413
{
@@ -24,51 +23,38 @@ public function __construct(
2423
) {
2524
}
2625

26+
/**
27+
* @throws Throwable
28+
*/
2729
public function __invoke(Message $message): void
2830
{
2931
$payload = $message->getPayload();
3032

3133
try {
32-
// Throwing an exception to satisfy PHPStan (replace with own code)
33-
// For proof of concept and testing purposes message "control" will automatically mark it as successfully
34-
// processed and logged as info
3534
if ($payload['foo'] === 'control') {
36-
$this->logger->info($payload['foo'] . ': was processed successfully');
37-
} else {
38-
throw new \Exception("Failed to execute");
35+
//user control message to log successfully processed message
36+
$this->logger->info($payload['foo'] . ' processed successfully');
37+
} elseif ($payload['foo'] === 'retry') {
38+
//user retry message to test retry functionality
39+
throw new \RuntimeException("Intentional failure for testing retries");
3940
}
40-
} catch (\Throwable $exception) {
41-
$this->logger->error($payload['foo'] . ' failed with message: '
42-
. $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries');
43-
$this->retry($payload);
44-
}
45-
}
41+
} catch (\Throwable $e) {
42+
$retryCount = $payload['retry_count'] ?? 0;
4643

47-
/**
48-
* @throws ExceptionInterface
49-
*/
50-
public function retry(array $payload): void
51-
{
52-
if (! isset($payload['retry'])) {
53-
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => 1]), [
54-
new DelayStamp($this->config['fail-safe']['first_retry']),
55-
]);
56-
} else {
57-
$retry = $payload['retry'];
58-
switch ($retry) {
59-
case 1:
60-
$delay = $this->config['fail-safe']['second_retry'];
61-
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [
62-
new DelayStamp($delay),
63-
]);
64-
break;
65-
case 2:
66-
$delay = $this->config['fail-safe']['third_retry'];
67-
$this->bus->dispatch(new Message(["foo" => $payload['foo'], 'retry' => ++$retry]), [
68-
new DelayStamp($delay),
69-
]);
70-
break;
44+
if ($retryCount === 0) {
45+
$this->logger->error(
46+
"Message '{$payload['foo']}' failed because: " . $e->getMessage()
47+
);
48+
} else {
49+
$this->logger->error(
50+
"Message '{$payload['foo']}' failed because: " . $e->getMessage() . " Retry {$retryCount}"
51+
);
7152
}
53+
54+
$payload['retry_count'] = $retryCount + 1;
55+
$message->setPayload($payload);
56+
57+
throw $e;
7258
}
7359
}
7460
}

test/App/Message/MessageHandlerTest.php

Lines changed: 37 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@
1111
use Psr\Container\ContainerExceptionInterface;
1212
use Queue\App\Message\Message;
1313
use Queue\App\Message\MessageHandler;
14-
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\ExceptionInterface;
14+
use RuntimeException;
1615
use Symfony\Component\Messenger\MessageBusInterface;
17-
use Symfony\Component\Messenger\Stamp\DelayStamp;
1816

1917
class MessageHandlerTest extends TestCase
2018
{
@@ -60,94 +58,51 @@ protected function setUp(): void
6058
$this->handler = new MessageHandler($this->bus, $this->logger, $this->config);
6159
}
6260

63-
/**
64-
* @throws Exception
65-
*/
66-
public function testInvokeSuccessfulProcessing(): void
61+
public function testControlMessageDoesNotThrowAndDoesNotSetRetryCount(): void
6762
{
68-
$payload = ['foo' => 'control'];
69-
$message = $this->createMock(Message::class);
70-
$message->method('getPayload')->willReturn($payload);
63+
$handler = $this->handler;
7164

72-
$this->handler->__invoke($message);
65+
$message = new Message(['foo' => 'control']);
66+
$handler($message);
7367

74-
$this->expectNotToPerformAssertions();
68+
$payload = $message->getPayload();
69+
$this->assertArrayNotHasKey('retry_count', $payload);
7570
}
7671

77-
/**
78-
* @throws Exception
79-
*/
80-
public function testInvokeFailureTriggersFirstRetry(): void
72+
public function testRetryMessageThrowsExceptionAndSetsRetryCount(): void
8173
{
82-
$payload = ['foo' => 'fail'];
83-
$message = $this->createMock(Message::class);
84-
$message->method('getPayload')->willReturn($payload);
85-
86-
$this->bus->expects($this->once())
87-
->method('dispatch')
88-
->with(
89-
$this->callback(function ($msg) {
90-
return $msg instanceof Message
91-
&& $msg->getPayload()['foo'] === 'fail'
92-
&& $msg->getPayload()['retry'] === 1;
93-
}),
94-
$this->callback(function ($stamps) {
95-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
96-
&& $stamps[0]->getDelay() === 1000;
97-
})
98-
)
99-
->willReturn(new Envelope($message));
100-
101-
$this->handler->__invoke($message);
102-
}
74+
$handler = $this->handler;
10375

104-
/**
105-
* @throws ExceptionInterface
106-
*/
107-
public function testRetrySecondTime(): void
108-
{
109-
$payload = ['foo' => 'retry_test', 'retry' => 1];
110-
111-
$this->bus->expects($this->once())
112-
->method('dispatch')
113-
->with(
114-
$this->callback(function ($msg) {
115-
return $msg instanceof Message
116-
&& $msg->getPayload()['retry'] === 2
117-
&& $msg->getPayload()['foo'] === 'retry_test';
118-
}),
119-
$this->callback(function ($stamps) {
120-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
121-
&& $stamps[0]->getDelay() === 2000;
122-
})
123-
)
124-
->willReturn(new Envelope(new Message($payload)));
125-
126-
$this->handler->retry($payload);
76+
$message = new Message(['foo' => 'retry']);
77+
78+
$this->expectException(RuntimeException::class);
79+
$this->expectExceptionMessage("Intentional failure for testing retries");
80+
81+
try {
82+
$handler($message);
83+
} finally {
84+
$payload = $message->getPayload();
85+
$this->assertArrayHasKey('retry_count', $payload);
86+
$this->assertEquals(1, $payload['retry_count']); // first retry
87+
}
12788
}
12889

129-
/**
130-
* @throws ExceptionInterface
131-
*/
132-
public function testRetryThirdTime(): void
90+
public function testRetryMessageWithExistingRetryCountIncrementsIt(): void
13391
{
134-
$payload = ['foo' => 'retry_test', 'retry' => 2];
135-
136-
$this->bus->expects($this->once())
137-
->method('dispatch')
138-
->with(
139-
$this->callback(function ($msg) {
140-
return $msg instanceof Message
141-
&& $msg->getPayload()['retry'] === 3
142-
&& $msg->getPayload()['foo'] === 'retry_test';
143-
}),
144-
$this->callback(function ($stamps) {
145-
return isset($stamps[0]) && $stamps[0] instanceof DelayStamp
146-
&& $stamps[0]->getDelay() === 3000;
147-
})
148-
)
149-
->willReturn(new Envelope(new Message($payload)));
150-
151-
$this->handler->retry($payload);
92+
$handler = $this->handler;
93+
94+
$message = new Message([
95+
'foo' => 'retry',
96+
'retry_count' => 2,
97+
]);
98+
99+
$this->expectException(RuntimeException::class);
100+
101+
try {
102+
$handler($message);
103+
} finally {
104+
$payload = $message->getPayload();
105+
$this->assertEquals(3, $payload['retry_count']); // incremented from 2 → 3
106+
}
152107
}
153108
}

0 commit comments

Comments
 (0)