Last active
November 4, 2019 10:43
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Messenger; | |
use App\Message\Command\CreateInventor; | |
use App\Message\Command\UpdateInventor; | |
use App\Util\QueryStringEncoder; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; | |
use Symfony\Component\Messenger\Stamp\BusNameStamp; | |
use Symfony\Component\Messenger\Stamp\DelayStamp; | |
use Symfony\Component\Messenger\Stamp\RedeliveryStamp; | |
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | |
class ExternalJsonMessageSerializer implements SerializerInterface | |
{ | |
private const STAMP_HEADER_PREFIX = 'X-Message-Stamp-'; | |
public function decode(array $encodedEnvelope): Envelope | |
{ | |
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) { | |
throw new MessageDecodingFailedException('Encoded envelope should have at least a "body" and some "headers".'); | |
} | |
if (empty($encodedEnvelope['headers']['type'])) { | |
throw new MessageDecodingFailedException('Encoded envelope does not have a "type" header.'); | |
} | |
$stamps = $this->decodeStamps($encodedEnvelope); | |
$data = json_decode($encodedEnvelope['body'], true); | |
switch ($encodedEnvelope['headers']['type']) { | |
case 'user_created': | |
return $this->createEnvelope(CreateInventor::class, $data, $stamps); | |
case 'user_updated': | |
return $this->createEnvelope(UpdateInventor::class, $data, $stamps); | |
} | |
throw new MessageDecodingFailedException(sprintf('Invalid type "%s"', $headers['type'])); | |
} | |
public function encode(Envelope $envelope): array | |
{ | |
$message = $envelope->getMessage(); | |
switch (\get_class($message)) { | |
case CreateInventor::class: | |
return $this->encodeEnvelope($envelope, 'user_created'); | |
case UpdateInventor::class: | |
return $this->encodeEnvelope($envelope, 'user_updated'); | |
} | |
throw new \Exception(sprintf('Serializer do not support encoding message from class: "%s"', \get_class($message))); | |
} | |
private function createEnvelope(string $messageClass, array $data, array $stamps): Envelope | |
{ | |
if (!isset($data['iri'])) { | |
throw new MessageDecodingFailedException('Missing the IRI key!'); | |
} | |
$message = new $messageClass($data['iri']); | |
$envelope = new Envelope($message, $stamps); | |
$envelope = $envelope->with(new BusNameStamp('command.bus')); | |
return $envelope; | |
} | |
private function encodeEnvelope(Envelope $envelope, string $type): array | |
{ | |
$headers = array_merge(['type' => $type], $this->encodeStamps($envelope)); | |
$body = ['iri' => $envelope->getMessage()->getIri()]; | |
$encoded = [ | |
'headers' => $headers, | |
'body' => json_encode($body), | |
]; | |
return $encoded; | |
} | |
private function encodeStamps(Envelope $envelope): array | |
{ | |
if (!$allStamps = $envelope->all()) { | |
return []; | |
} | |
$headers = []; | |
// Retrieve the last DelayStamp | |
/** @var DelayStamp $delayStamp */ | |
$delayStamp = $envelope->last(DelayStamp::class); | |
if (null !== $delayStamp) { | |
$options = [ | |
'delay' => $delayStamp->getDelay(), | |
]; | |
$headers[self::STAMP_HEADER_PREFIX.'Delay'] = QueryStringEncoder::encode($options); | |
} | |
// Retrieve the last RetryStamp | |
/** @var RedeliveryStamp $redeliveryStamp */ | |
$redeliveryStamp = $envelope->last(RedeliveryStamp::class); | |
if (null !== $redeliveryStamp) { | |
$options = [ | |
'retryCount' => $redeliveryStamp->getRetryCount(), | |
'senderClass' => $redeliveryStamp->getSenderClassOrAlias(), | |
]; | |
$headers[self::STAMP_HEADER_PREFIX.'Redelivery'] = QueryStringEncoder::encode($options); | |
} | |
return $headers; | |
} | |
private function decodeStamps(array $encodedEnvelope): array | |
{ | |
$headers = $encodedEnvelope['headers']; | |
$stamps = []; | |
// Convert DelayStamp | |
if (isset($headers[self::STAMP_HEADER_PREFIX.'Delay'])) { | |
$options = QueryStringEncoder::decode($headers[self::STAMP_HEADER_PREFIX.'Delay']); | |
$stamps[] = new DelayStamp($options['delay']); | |
} | |
// Convert DelayStamp | |
if (isset($headers[self::STAMP_HEADER_PREFIX.'Redelivery'])) { | |
$options = QueryStringEncoder::decode($headers[self::STAMP_HEADER_PREFIX.'Redelivery']); | |
$stamps[] = new RedeliveryStamp($options['retryCount'], $options['senderClass']); | |
} | |
return $stamps; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Util; | |
class QueryStringEncoder | |
{ | |
public static function encode(array $data): string | |
{ | |
return http_build_query($data, '', ';'); | |
} | |
public static function decode(string $queryString): array | |
{ | |
$array = []; | |
$subsets = explode(';', $queryString); | |
foreach ($subsets as $subset) { | |
$value = explode('=', $subset); | |
$array[$value[0]] = $value[1]; | |
} | |
return $array; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment