Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messenger] Ensure message is handled only once per handler #30020

Merged
merged 2 commits into from Apr 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -79,6 +79,7 @@ CHANGELOG
* Added a `SetupTransportsCommand` command to setup the transports
* Added a Doctrine transport. For example, use the `doctrine://default` DSN (this uses the `default` Doctrine entity manager)
* [BC BREAK] The `getConnectionConfiguration` method on Amqp's `Connection` has been removed.
* [BC BREAK] A `HandlerFailedException` exception will be thrown if one or more handler fails.

4.2.0
-----
Expand Down
@@ -0,0 +1,52 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

use Symfony\Component\Messenger\Envelope;

class HandlerFailedException extends RuntimeException
{
private $exceptions;
private $envelope;
sroze marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param \Throwable[] $exceptions
*/
public function __construct(Envelope $envelope, array $exceptions)
{
$firstFailure = current($exceptions);

parent::__construct(
1 === \count($exceptions)
? $firstFailure->getMessage()
: sprintf('%d handlers failed. First failure is: "%s"', \count($exceptions), $firstFailure->getMessage()),
$firstFailure->getCode(),
$firstFailure
);

$this->envelope = $envelope;
$this->exceptions = $exceptions;
}

public function getEnvelope(): Envelope
{
return $this->envelope;
}

/**
* @return \Throwable[]
*/
public function getNestedExceptions(): array
{
return $this->exceptions;
}
}
Expand Up @@ -14,6 +14,7 @@
use Psr\Log\LoggerAwareTrait;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
Expand Down Expand Up @@ -52,10 +53,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
'class' => \get_class($message),
];

$exceptions = [];
foreach ($this->handlersLocator->getHandlers($envelope) as $alias => $handler) {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), \is_string($alias) ? $alias : null);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
$alias = \is_string($alias) ? $alias : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if $alias is null and there are 2 handlers? Wouldn't that cause the second one to "appear" like it was handled? I think if the $alias is null, we have to assume that the message has not already been handled always.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would not appear as handled because we track the handler name. Alias is just an optional thing we actually don't use in core. (I think we could even remove it, it's been introduced - #29166 - on the assumption that it might be useful later, while it complexifies reading this code).


if ($this->messageHasAlreadyBeenHandled($envelope, $handler, $alias)) {
continue;
}

try {
$handledStamp = HandledStamp::fromCallable($handler, $handler($message), $alias);
$envelope = $envelope->with($handledStamp);
$this->logger->info('Message "{class}" handled by "{handler}"', $context + ['handler' => $handledStamp->getCallableName()]);
} catch (\Throwable $e) {
$exceptions[] = $e;
}
}

if (null === $handler) {
Expand All @@ -66,6 +78,21 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
$this->logger->info('No handler for message "{class}"', $context);
}

if (\count($exceptions)) {
throw new HandlerFailedException($envelope, $exceptions);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking out loud: one practical implication is that, if someone listens on the new WorkerMessageFailedEvent event, they will always (well, not technically "always", but pretty much always) receive this exception instead of the actual exception. Then they'll need to loop over getNestedExceptions() if they want to look for a specific exception. I think that's ok, just stating that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I was thinking whether it would make sense or not to throw the original exception if there is only one but it would mean you need to catch your own exception plus HandlerFailedException. So better always throwing it.


return $stack->next()->handle($envelope, $stack);
}

private function messageHasAlreadyBeenHandled(Envelope $envelope, callable $handler, ?string $alias): bool
{
$some = array_filter($envelope
sroze marked this conversation as resolved.
Show resolved Hide resolved
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handler, $alias) {
return $stamp->getCallableName() === HandledStamp::getNameFromCallable($handler) &&
$stamp->getHandlerAlias() === $alias;
});

return \count($some) > 0;
}
}
21 changes: 13 additions & 8 deletions src/Symfony/Component/Messenger/Stamp/HandledStamp.php
Expand Up @@ -40,33 +40,38 @@ public function __construct($result, string $callableName, string $handlerAlias
/**
* @param mixed $result The returned value of the message handler
*/
public static function fromCallable(callable $handler, $result, string $handlerAlias = null): self
public static function fromCallable(callable $handler, $result, ?string $handlerAlias = null): self
{
return new self($result, self::getNameFromCallable($handler), $handlerAlias);
}

public static function getNameFromCallable(callable $handler): string
{
if (\is_array($handler)) {
if (\is_object($handler[0])) {
return new self($result, \get_class($handler[0]).'::'.$handler[1], $handlerAlias);
return \get_class($handler[0]).'::'.$handler[1];
}

return new self($result, $handler[0].'::'.$handler[1], $handlerAlias);
return $handler[0].'::'.$handler[1];
}

if (\is_string($handler)) {
return new self($result, $handler, $handlerAlias);
return $handler;
}

if ($handler instanceof \Closure) {
$r = new \ReflectionFunction($handler);
if (false !== strpos($r->name, '{closure}')) {
return new self($result, 'Closure', $handlerAlias);
return 'Closure';
}
if ($class = $r->getClosureScopeClass()) {
return new self($result, $class->name.'::'.$r->name, $handlerAlias);
return $class->name.'::'.$r->name;
}

return new self($result, $r->name, $handlerAlias);
return $r->name;
}

return new self($result, \get_class($handler).'::__invoke', $handlerAlias);
return \get_class($handler).'::__invoke';
}

/**
Expand Down
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Fixtures;

class DummyMessageHandlerFailingFirstTimes
{
private $remainingFailures;

private $called = 0;

public function __construct(int $throwExceptionOnFirstTries = 0)
{
$this->remainingFailures = $throwExceptionOnFirstTries;
}

public function __invoke(DummyMessage $message)
{
if ($this->remainingFailures > 0) {
--$this->remainingFailures;
throw new \Exception('Handler should throw Exception.');
}

++$this->called;
}

public function getTimesCalledWithoutThrowing(): int
{
return $this->called;
}
}
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\StackMiddleware;
Expand Down Expand Up @@ -40,7 +41,7 @@ public function testItCallsTheHandlerAndNextMiddleware()
/**
* @dataProvider itAddsHandledStampsProvider
*/
public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
public function testItAddsHandledStamps(array $handlers, array $expectedStamps, bool $nextIsCalled)
{
$message = new DummyMessage('Hey');
$envelope = new Envelope($message);
Expand All @@ -49,7 +50,11 @@ public function testItAddsHandledStamps(array $handlers, array $expectedStamps)
DummyMessage::class => $handlers,
]));

$envelope = $middleware->handle($envelope, $this->getStackMock());
try {
$envelope = $middleware->handle($envelope, $this->getStackMock($nextIsCalled));
} catch (HandlerFailedException $e) {
$envelope = $e->getEnvelope();
}

$this->assertEquals($expectedStamps, $envelope->all(HandledStamp::class));
}
Expand All @@ -64,17 +69,22 @@ public function itAddsHandledStampsProvider()
$second->method('__invoke')->willReturn(null);
$secondClass = \get_class($second);

$failing = $this->createPartialMock(\stdClass::class, ['__invoke']);
$failing->method('__invoke')->will($this->throwException(new \Exception('handler failed.')));

yield 'A stamp is added' => [
[$first],
[new HandledStamp('first result', $firstClass.'::__invoke')],
true,
];

yield 'A stamp is added per handler' => [
[$first, $second],
['first' => $first, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
new HandledStamp(null, $secondClass.'::__invoke'),
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
true,
];

yield 'Yielded locator alias is used' => [
Expand All @@ -83,6 +93,24 @@ public function itAddsHandledStampsProvider()
new HandledStamp('first result', $firstClass.'::__invoke', 'first_alias'),
new HandledStamp(null, $secondClass.'::__invoke'),
],
true,
];

yield 'It tries all handlers' => [
['first' => $first, 'failing' => $failing, 'second' => $second],
[
new HandledStamp('first result', $firstClass.'::__invoke', 'first'),
new HandledStamp(null, $secondClass.'::__invoke', 'second'),
],
false,
];

yield 'It ignores duplicated handler' => [
[$first, $first],
[
new HandledStamp('first result', $firstClass.'::__invoke'),
],
true,
];
}

Expand Down
63 changes: 63 additions & 0 deletions src/Symfony/Component/Messenger/Tests/RetryIntegrationTest.php
@@ -0,0 +1,63 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessageHandlerFailingFirstTimes;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Worker;

class RetryIntegrationTest extends TestCase
{
public function testRetryMechanism()
{
$apiMessage = new DummyMessage('API');

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->method('get')
->willReturn([
new Envelope($apiMessage, [
new SentStamp('Some\Sender', 'sender_alias'),
]),
]);

$senderLocator = new SendersLocator([], ['*' => true]);

$handler = new DummyMessageHandlerFailingFirstTimes();
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
$handlerLocator = new HandlersLocator([
DummyMessage::class => [
'handler' => $handler,
'throwing' => $throwingHandler,
],
]);

$bus = new MessageBus([new SendMessageMiddleware($senderLocator), new HandleMessageMiddleware($handlerLocator)]);

$worker = new Worker(['receiverName' => $receiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
$worker->run([], function () use ($worker) {
$worker->stop();
});

$this->assertSame(1, $handler->getTimesCalledWithoutThrowing());
$this->assertSame(1, $throwingHandler->getTimesCalledWithoutThrowing());
}
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/Worker.php
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
Expand Down Expand Up @@ -123,6 +124,10 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp()));
} catch (\Throwable $throwable) {
if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}

$shouldRetry = $this->shouldRetry($throwable, $envelope, $retryStrategy);

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $receiverName, $throwable, $shouldRetry));
Expand Down