Skip to content

Conversation

@TheDomco
Copy link
Contributor

Adds support for blocking pop to avoid RPOP spam in scenarios where we want low latency.
The timeout is one second and can't be higher than StackExchange.Redis timeout which should be 5 seconds by default.
Needs a separate connection because of the blocking nature. The connection should only be used internally and in a single thread.

@peppy peppy self-requested a review February 10, 2026 04:50
/// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>.
/// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
/// </summary>
private IDatabase BlockingRedis => blockingRedis.Value;
Copy link
Member

@peppy peppy Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this, to keep things simple?

diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs
index 12be26f..aecc445 100644
--- a/osu.Server.QueueProcessor/QueueProcessor.cs
+++ b/osu.Server.QueueProcessor/QueueProcessor.cs
@@ -46,21 +46,12 @@ namespace osu.Server.QueueProcessor
 
         private readonly QueueConfiguration config;
 
-        private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
-        private readonly Lazy<IDatabase> blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
-
         /// <summary>
         /// Access redis instance.
         /// </summary>
         protected IDatabase Redis => redis.Value;
 
-        /// <summary>
-        /// Allows access to a separate redis instance (connection) dedicated to blocking calls.
-        /// Must not be accessed from more than one thread. Currently used only in <see cref="Run"/>.
-        /// This is a workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
-        /// </summary>
-        private IDatabase BlockingRedis => blockingRedis.Value;
+        private readonly Lazy<IDatabase> redis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());
 
         private long totalProcessed;
 
@@ -94,6 +85,9 @@ namespace osu.Server.QueueProcessor
         /// <param name="cancellation">An optional cancellation token.</param>
         public void Run(CancellationToken cancellation = default)
         {
+            // Open a separate redis for the queue watcher. We ensure a dedicated connection specifically for the BRPOP path, which blocks indefinitely.
+            IDatabase blockingRedis = RedisAccess.GetConnection().GetDatabase();
+
             using (SentrySdk.Init(setupSentry))
             using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
             using (var cts = new GracefulShutdownSource(cancellation))
@@ -123,7 +117,8 @@ namespace osu.Server.QueueProcessor
                                 // timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds)
                                 const string timeout = "1";
 
-                                RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout);
+                                // workaround for <c>StackExchange.Redis</c> not offering support for operations like <c>BRPOP</c>.
+                                RedisResult redisResult = blockingRedis.Execute("BRPOP", QueueName, timeout);
 
                                 if (redisResult.IsNull)
                                     continue;
@@ -132,7 +127,7 @@ namespace osu.Server.QueueProcessor
                             }
                             else
                             {
-                                redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
+                                redisItems = blockingRedis.ListRightPop(QueueName, config.BatchSize);
 
                                 // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
                                 // queue doesn't exist.

It will mean we keep two connections open even in the non-blocking path, but I don't think that's an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already made a change addressing the code quality there. I would say there is no reason to keep a separate connection open for queue processors which do not use this feature. Although arguably they are pinned to the old versions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's more for the code simplicity. If you want you can apply my patch and then make the connection only used in the blocking flow, but I'd argue it's not worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine now? Removed the field to simplify but kept it lazy so no unnecessary connections.

@peppy peppy merged commit 0922bb8 into ppy:master Feb 10, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants