Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add handling for delayed message to redis transport #31977

Merged

Conversation

alexander-schranz
Copy link
Contributor

@alexander-schranz alexander-schranz commented Jun 10, 2019

Q A
Branch? 4.4
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? WIP
Fixed tickets Fixes #31711
License MIT
Doc PR symfony/symfony-docs#... TODO

Still in WIP: This pull request implements delayed messages for redis transport. It will park the messages in an own sorted set and if the time comes it will push the messages to the stream to make them available for all consumers. Because of a race condition when having multiple consumers it need to be checked if not accidently a message from the future is popped by zpopmin so the add function is called and there is check if the delay is in the present/past and only then add the message to the stream.

@alexander-schranz alexander-schranz force-pushed the feature/redis-delayed-messages branch 3 times, most recently from e5c2dc5 to 2e54349 Compare June 10, 2019 12:31
@chalasr chalasr added this to the next milestone Jun 10, 2019
$queuedMessageCount = $this->connection->zcount($this->queue, 0, time());

if ($queuedMessageCount) {
foreach ($this->connection->zpopmin($this->queue, $queuedMessageCount) as $queuedMessage => $time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if there are 100 delayed messages, and only 5 delayed messages that actually are available now, wouldn't this end up cycling through the 100 delayed messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ragboyjr no $queuedMessageCount will be 5 then and so it only pop 5 messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, understood.

Your technique for handling the race condition seems nice and simple, but one thing that concerns me would be loss of messages between the time that we pop it, then your redis instance has some failure, and then it fails when we try to re-add it (in the event of that race condition.

Part of me wonders if maybe, doing something where we pass in a LUA script to where it pops and pushes into the stream in one transaction would be preferable since the whole transaction would fail instead of part way through. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ragboyjr As redis has only features to move a message from list to another list and not from a list to a stream. we would need to create a transaction then. never used that but I think it should be something like this:

    $this->connection->multi();
    foreach ($this->connection->zpopmin($this->queue, $queuedMessageCount) as $queuedMessage => $time) {
                $queuedMessage = json_encode($queuedMessage, true);
                // if a futured placed message is actually popped because of a race condition with
                // another running message consumer, the message is readded to the queue by add function
                // else its just added stream and will be available for all stream consumers
                $this->add($queuedMessage['body'], $queuedMessage['headers'], (time() - $time) * 1000);
    }

    $this->connection->exec();

we only need to start the transaction if there are really queued messages so its not blocking if there are none.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Y, I think a redis transaction would work as well, would like to see if that could be something we could cover in an integration test of adding items to the queue, then having redis fail and making sure we don't lose any messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transactions sadly don't work as we can't read inside a transaction.
So to only solution would be to park this messages in another list as every consumer need its own name we postfix the new list with stream + group + consumer names and so its a unique list only for this consumer to avoid a conflicts with other running consumers. If it crash the consumer need to check if he has still things in this list to push to the stream.

@weaverryan
Copy link
Member

Ping @alexander-schranz! It looks like this is technically challenging. Is the path clear? Or are we stuck on those details?

@alexander-schranz
Copy link
Contributor Author

@weaverryan theoretically there is a little time where a message could get lost when the consumer crashes. Between read from the queued/delayed message sorted set list and adding to the stream.

for ($i = 0; $i < $queuedMessageCount; ++$i) {
foreach ($this->connection->zpopmin($this->queue, 1) as $queuedMessage => $time) {
$queuedMessage = json_encode($queuedMessage, true);
// if a futured placed message is actually popped because of a race condition with
// another running message consumer, the message is readded to the queue by add function
// else its just added stream and will be available for all stream consumers
$this->add($queuedMessage['body'], $queuedMessage['headers'], (time() - $time) * 1000);
}
}

I did now change that only one message is read and added to the stream.

@ragboyjr
Copy link
Contributor

ragboyjr commented Jul 9, 2019

theoretically there is a little time where a message could get lost when the consumer crashes.

@alexander-schranz do you think utilizing a lua script similar to laravel would be able to prevent this issue?

@ragboyjr
Copy link
Contributor

ragboyjr commented Aug 3, 2019

@alexander-schranz mind if I take a stab at implementing the transactions part as a lua script?

@ragboyjr
Copy link
Contributor

ragboyjr commented Aug 3, 2019

I have a bit of time this weekend, and I'd love to see this feature for the redis adapter.

@alexander-schranz
Copy link
Contributor Author

@ragboyjr sure. Basically its the zpopmin -> zadd / xadd which we need to move into a transaction. Thank you to have a look at it I'm currently short in time.

@Mrkisha
Copy link

Mrkisha commented Sep 25, 2019

Is there any progress on this PR?

@alexander-schranz
Copy link
Contributor Author

@sroze do you want to have a look at this :)?

@weaverryan
Copy link
Member

@alexander-schranz Is this done & ready? Or are there still some issues?

@chalasr chalasr modified the milestones: next, 4.4 Oct 28, 2019
@alexander-schranz
Copy link
Contributor Author

@weaverryan ready from my side :)

@chalasr
Copy link
Member

chalasr commented Nov 3, 2019

I'm playing with this right now, will try to leave a review tonight.

Copy link
Member

@Tobion Tobion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add changelog entry

@alexander-schranz alexander-schranz force-pushed the feature/redis-delayed-messages branch 5 times, most recently from df3840b to 39057cf Compare November 4, 2019 20:36
@chalasr chalasr force-pushed the feature/redis-delayed-messages branch from 39057cf to cfece10 Compare November 5, 2019 18:53
@chalasr
Copy link
Member

chalasr commented Nov 5, 2019

Thank you @alexander-schranz.

chalasr pushed a commit that referenced this pull request Nov 5, 2019
…lexander-schranz)

This PR was merged into the 4.4 branch.

Discussion
----------

Add handling for delayed message to redis transport

| Q             | A
| ------------- | ---
| Branch?       | 4.4
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | WIP
| Fixed tickets | Fixes #31711
| License       | MIT
| Doc PR        | symfony/symfony-docs#... TODO

Still in WIP: This pull request implements delayed messages for redis transport. It will park the messages in an own sorted set and if the time comes it will push the messages to the stream to make them available for all consumers. Because of a race condition when having multiple consumers it need to be checked if not accidently a message from the future is popped by zpopmin so the add function is called and there is check if the delay is in the present/past and only then add the message to the stream.

Commits
-------

cfece10 Add handling for delayed message to redis transport
@chalasr chalasr merged commit cfece10 into symfony:4.4 Nov 5, 2019
OskarStark added a commit to symfony/symfony-docs that referenced this pull request Nov 5, 2019
…ander-schranz)

This PR was merged into the 4.4 branch.

Discussion
----------

Remove hint that redis does not support DelayStamp

The DelayStamp was added in 4.4 symfony/symfony#31977 so the hint can now be removed.

fixes #12595

Commits
-------

b98fa06 Remove hint that redis does not support DelayStamp
@alexander-schranz alexander-schranz deleted the feature/redis-delayed-messages branch November 5, 2019 22:36
@ragboyjr
Copy link
Contributor

ragboyjr commented Nov 6, 2019

Holy Wow!!! so excited to see this merged in!!!

@Mrkisha
Copy link

Mrkisha commented Nov 7, 2019

Awesome!
@alexander-schranz Great work!

This was referenced Nov 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[RFC][Messenger] Redis Delay Implementation
7 participants