Skip to content

[Messenger] Ensure message is handled only once per handler #30020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 6, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -79,6 +79,7 @@ CHANGELOG
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.

4.2.0
-----
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

use Symfony\Component\Messenger\Envelope;

class HandlerFailedException extends RuntimeException
{
private $exceptions;
private $envelope;

/**
* @param \Throwable[] $exceptions
*/
public function __construct(Envelope $envelope, array $exceptions)
{
$firstFailure = current($exceptions);

parent::__construct(
1 === \count($exceptions)
? $firstFailure->getMessage()
: sprintf('%d handlers failed. First failure is: "%s"', \count($exceptions), $firstFailure->getMessage()),
$firstFailure->getCode(),
$firstFailure
);

$this->envelope = $envelope;
$this->exceptions = $exceptions;
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}

/**
* @return \Throwable[]
*/
public function getNestedExceptions(): array
{
return $this->exceptions;
}
}
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
@@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
'class' => \get_class($message),
];

$exceptions = [];
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
$alias = \is_string($alias) ? $alias : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if $alias is null and there are 2 handlers? Wouldn't that cause the second one to "appear" like it was handled? I think if the $alias is null, we have to assume that the message has not already been handled always.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would not appear as handled because we track the handler name. Alias is just an optional thing we actually don't use in core. (I think we could even remove it, it's been introduced - #29166 - on the assumption that it might be useful later, while it complexifies reading this code).


if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
continue;
}

try {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
}

if (null === $handler) {
@@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$this->logger->info('No handler for message "{class}"', $context);
}

if (\count($exceptions)) {
throw new HandlerFailedException($envelope, $exceptions);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud: one practical implication is that, if someone listens on the new WorkerMessageFailedEvent event, they will always (well, not technically "always", but pretty much always) receive this exception instead of the actual exception. Then they'll need to loop over getNestedExceptions() if they want to look for a specific exception. I think that's ok, just stating that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I was thinking whether it would make sense or not to throw the original exception if there is only one but it would mean you need to catch your own exception plus HandlerFailedException. So better always throwing it.


return $stack->next()->handle($envelope, $stack);
}

private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
{
$some = array_filter($envelope
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
$stamp->getHandlerAlias() === $alias;
});

return \count($some) > 0;
}
}
21 changes: 13 additions & 8 deletions src/Symfony/Component/Messenger/Stamp/HandledStamp.php
Original file line number Diff line number Diff line change
@@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias
/**
* @param mixed $result The returned value of the message handler
*/
public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
{
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
}

public static function getNameFromCallable(callable $handler): string
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias);
return \get_class($handler[0]).'::'.$handler[1];
}

return new self($result, $handler[0].'::'.$handler[1], $handlerAlias);
return $handler[0].'::'.$handler[1];
}

if (\is_string($handler)) {
return new self($result, $handler, $handlerAlias);
return $handler;
}

if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return new self($result, 'Closure', $handlerAlias);
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return new self($result, $class->name.'::'.$r->name, $handlerAlias);
return $class->name.'::'.$r->name;
}

return new self($result, $r->name, $handlerAlias);
return $r->name;
}

return new self($result, \get_class($handler).'::__invoke', $handlerAlias);
return \get_class($handler).'::__invoke';
}

/**
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

class DummyMessageHandlerFailingFirstTimes
{
private $remainingFailures;

private $called = 0;

public function __construct(int $throwExceptionOnFirstTries = 0)
{
$this->remainingFailures = $throwExceptionOnFirstTries;
}

public function __invoke(DummyMessage $message)
{
if ($this->remainingFailures > 0) {
--$this->remainingFailures;
throw new \Exception('Handler should throw Exception.');
}

++$this->called;
}

public function getTimesCalledWithoutThrowing(): int
{
return $this->called;
}
}
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\StackMiddleware;
@@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware()
/**
* @dataProvider itAddsHandledStampsProvider
*/
public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled)
{
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
@@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
DummyMessage::class => $handlers,
]));

$envelope = $middleware->handle($envelope, $this->getStackMock());
try {
$envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled));
} catch (HandlerFailedException $e) {
$envelope = $e->getEnvelope();
}

$this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class));
}
@@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider()
$second->method('__invoke')->willReturn(null);
$secondClass = \get_class($second);

$failing = $this->createPartialMock(\stdClass::class, ['__invoke']);
$failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.')));

yield 'A stamp is added' => [
[$first],
[new HandledStamp('first result', $firstClass.'::__invoke')],
true,
];

yield 'A stamp is added per handler' => [
[$first, $second],
['first' => $first, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
new HandledStamp(null, $secondClass.'::__invoke'),
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
true,
];

yield 'Yielded locator alias is used' => [
@@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider()
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
new HandledStamp(null, $secondClass.'::__invoke'),
],
true,
];

yield 'It tries all handlers' => [
['first' => $first, 'failing' => $failing, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
false,
];

yield 'It ignores duplicated handler' => [
[$first, $first],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
],
true,
];
}

63 changes: 63 additions & 0 deletions src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;

class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
$apiMessage = new DummyMessage('API');

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->method('get')
->willReturn([
new Envelope($apiMessage, [
new SentStamp('Some\Sender', 'sender_alias'),
]),
]);

$senderLocator = new SendersLocator([], ['*' => true]);

$handler = new DummyMessageHandlerFailingFirstTimes();
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
'handler' => $handler,
'throwing' => $throwingHandler,
],
]);

$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);

$worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
$worker->run([], function () use ($worker) {
$worker->stop();
});

$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
@@ -123,6 +124,10 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}

$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));