-
Notifications
You must be signed in to change notification settings - Fork 8
Add support for blocking pop instead of polling #38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| /// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>. | ||
| /// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>. | ||
| /// </summary> | ||
| private IDatabase BlockingRedis => blockingRedis.Value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of this, to keep things simple?
diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs
index 12be26f..aecc445 100644
--- a/osu.Server.QueueProcessor/QueueProcessor.cs
+++ b/osu.Server.QueueProcessor/QueueProcessor.cs
@@ -46,21 +46,12 @@ namespace osu.Server.QueueProcessor
private readonly QueueConfiguration config;
- private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
- private readonly Lazy<IDatabase> blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
/// <summary>
/// Access redis instance.
/// </summary>
protected IDatabase Redis => redis.Value;
- /// <summary>
- /// Allows access to a separate redis instance (connection) dedicated to blocking calls.
- /// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>.
- /// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
- /// </summary>
- private IDatabase BlockingRedis => blockingRedis.Value;
+ private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
private long totalProcessed;
@@ -94,6 +85,9 @@ namespace osu.Server.QueueProcessor
/// <param name="cancellation">An optional cancellation token.</param>
public void Run(CancellationToken cancellation = default)
{
+ // Open a separate redis for the queue watcher. We ensure a dedicated connection specifically for the BRPOP path, which blocks indefinitely.
+ IDatabase blockingRedis = RedisAccess.GetConnection().GetDatabase();
+
using (SentrySdk.Init(setupSentry))
using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
using (var cts = new GracefulShutdownSource(cancellation))
@@ -123,7 +117,8 @@ namespace osu.Server.QueueProcessor
// timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds)
const string timeout = "1";
- RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout);
+ // workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
+ RedisResult redisResult = blockingRedis.Execute("BRPOP", QueueName, timeout);
if (redisResult.IsNull)
continue;
@@ -132,7 +127,7 @@ namespace osu.Server.QueueProcessor
}
else
{
- redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
+ redisItems = blockingRedis.ListRightPop(QueueName, config.BatchSize);
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
// queue doesn't exist.
It will mean we keep two connections open even in the non-blocking path, but I don't think that's an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already made a change addressing the code quality there. I would say there is no reason to keep a separate connection open for queue processors which do not use this feature. Although arguably they are pinned to the old versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it's more for the code simplicity. If you want you can apply my patch and then make the connection only used in the blocking flow, but I'd argue it's not worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine now? Removed the field to simplify but kept it lazy so no unnecessary connections.
Adds support for blocking pop to avoid
RPOPspam in scenarios where we want low latency.The timeout is one second and can't be higher than
StackExchange.Redistimeout which should be 5 seconds by default.Needs a separate connection because of the blocking nature. The connection should only be used internally and in a single thread.