|
3 | 3 | namespace Enqueue\Monitoring; |
4 | 4 |
|
5 | 5 | use Enqueue\Consumption\Context\End; |
6 | | -use Enqueue\Consumption\Context\InitLogger; |
7 | 6 | use Enqueue\Consumption\Context\MessageReceived; |
8 | 7 | use Enqueue\Consumption\Context\MessageResult; |
9 | | -use Enqueue\Consumption\Context\PostConsume; |
10 | | -use Enqueue\Consumption\Context\PostMessageReceived; |
11 | 8 | use Enqueue\Consumption\Context\PreConsume; |
12 | 9 | use Enqueue\Consumption\Context\PreSubscribe; |
13 | 10 | use Enqueue\Consumption\Context\ProcessorException; |
14 | 11 | use Enqueue\Consumption\Context\Start; |
15 | | -use Enqueue\Consumption\ExtensionInterface; |
| 12 | +use Enqueue\Consumption\EndExtensionInterface; |
| 13 | +use Enqueue\Consumption\MessageReceivedExtensionInterface; |
| 14 | +use Enqueue\Consumption\MessageResultExtensionInterface; |
| 15 | +use Enqueue\Consumption\PreConsumeExtensionInterface; |
| 16 | +use Enqueue\Consumption\PreSubscribeExtensionInterface; |
| 17 | +use Enqueue\Consumption\ProcessorExceptionExtensionInterface; |
16 | 18 | use Enqueue\Consumption\Result; |
| 19 | +use Enqueue\Consumption\StartExtensionInterface; |
17 | 20 | use Psr\Log\LoggerInterface; |
18 | 21 | use Ramsey\Uuid\Uuid; |
19 | 22 |
|
20 | | -class MonitoringExtension implements ExtensionInterface |
| 23 | +class MonitoringExtension implements StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, EndExtensionInterface, ProcessorExceptionExtensionInterface, MessageReceivedExtensionInterface, MessageResultExtensionInterface |
21 | 24 | { |
22 | 25 | /** |
23 | 26 | * @var StatsStorage |
@@ -165,9 +168,18 @@ public function onProcessorException(ProcessorException $context): void |
165 | 168 | $timeMs, |
166 | 169 | $context->getReceivedAt(), |
167 | 170 | $context->getConsumer()->getQueue()->getQueueName(), |
| 171 | + $context->getMessage()->getMessageId(), |
| 172 | + $context->getMessage()->getCorrelationId(), |
168 | 173 | $context->getMessage()->getHeaders(), |
169 | 174 | $context->getMessage()->getProperties(), |
170 | | - ConsumedMessageStats::STATUS_FAILED |
| 175 | + $context->getMessage()->isRedelivered(), |
| 176 | + ConsumedMessageStats::STATUS_FAILED, |
| 177 | + get_class($context->getException()), |
| 178 | + $context->getException()->getMessage(), |
| 179 | + $context->getException()->getCode(), |
| 180 | + $context->getException()->getFile(), |
| 181 | + $context->getException()->getLine(), |
| 182 | + $context->getException()->getTraceAsString() |
171 | 183 | ); |
172 | 184 |
|
173 | 185 | $this->safeCall(function () use ($event) { |
@@ -238,8 +250,11 @@ public function onResult(MessageResult $context): void |
238 | 250 | $timeMs, |
239 | 251 | $context->getReceivedAt(), |
240 | 252 | $context->getConsumer()->getQueue()->getQueueName(), |
| 253 | + $context->getMessage()->getMessageId(), |
| 254 | + $context->getMessage()->getCorrelationId(), |
241 | 255 | $context->getMessage()->getHeaders(), |
242 | 256 | $context->getMessage()->getProperties(), |
| 257 | + $context->getMessage()->isRedelivered(), |
243 | 258 | $status |
244 | 259 | ); |
245 | 260 |
|
@@ -275,18 +290,6 @@ public function onResult(MessageResult $context): void |
275 | 290 | } |
276 | 291 | } |
277 | 292 |
|
278 | | - public function onPostConsume(PostConsume $context): void |
279 | | - { |
280 | | - } |
281 | | - |
282 | | - public function onPostMessageReceived(PostMessageReceived $context): void |
283 | | - { |
284 | | - } |
285 | | - |
286 | | - public function onInitLogger(InitLogger $context): void |
287 | | - { |
288 | | - } |
289 | | - |
290 | 293 | private function getNowMs(): int |
291 | 294 | { |
292 | 295 | return (int) (microtime(true) * 1000); |
|
0 commit comments