From 71f721ddea7c0ec92dad5ddd1a5f0386774d5f5c Mon Sep 17 00:00:00 2001 From: Domco Date: Sat, 7 Feb 2026 02:02:50 +0100 Subject: [PATCH 1/4] Add support for blocking pop instead of polling --- .../QueueConfiguration.cs | 9 +++++ osu.Server.QueueProcessor/QueueProcessor.cs | 39 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/osu.Server.QueueProcessor/QueueConfiguration.cs b/osu.Server.QueueProcessor/QueueConfiguration.cs index 1cf6820..5174ba9 100644 --- a/osu.Server.QueueProcessor/QueueConfiguration.cs +++ b/osu.Server.QueueProcessor/QueueConfiguration.cs @@ -34,5 +34,14 @@ public class QueueConfiguration /// Setting above 1 will allow processing in batches (see ). /// public int BatchSize { get; set; } = 1; + + /// + /// When enabled, uses BRPOP to wait for items instead of polling with RPOP. + /// + /// + /// is ignored when this is enabled. + /// is still used as a delay for when processor is overloaded. + /// + public bool UseBlockingPop { get; set; } = false; } } diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 668c982..b783d6f 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -48,11 +48,20 @@ public abstract class QueueProcessor where T : QueueItem private readonly Lazy redis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); + private readonly Lazy blockingRedis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); + /// /// Access redis instance. /// protected IDatabase Redis => redis.Value; + /// + /// Allows access to a separate redis instance (connection) dedicated to blocking calls. + /// Must not be accessed from more than one thread. Currently used only in . + /// This is a workaround for StackExchange.Redis not offering support for operations like BRPOP. + /// + private IDatabase BlockingRedis => blockingRedis.Value; + private long totalProcessed; private long totalDequeued; @@ -100,20 +109,38 @@ public void Run(CancellationToken cancellation = default) try { + // avoid processing too many items at once if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold) { Thread.Sleep(config.TimeBetweenPolls); continue; } - var redisItems = Redis.ListRightPop(QueueName, config.BatchSize); + RedisValue[] redisItems; - // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697) - // queue doesn't exist. - if (redisItems == null) + if (config.UseBlockingPop) { - Thread.Sleep(config.TimeBetweenPolls); - continue; + // timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds) + const string timeout = "1"; + + RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout); + + if (redisResult.IsNull) + continue; + + redisItems = [(RedisValue)redisResult[1]]; + } + else + { + redisItems = Redis.ListRightPop(QueueName, config.BatchSize); + + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697) + // queue doesn't exist. + if (redisItems == null) + { + Thread.Sleep(config.TimeBetweenPolls); + continue; + } } List items = new List(); From e4831dca91e36297587c1208147448abba2e48ab Mon Sep 17 00:00:00 2001 From: Domco Date: Tue, 10 Feb 2026 05:41:56 +0100 Subject: [PATCH 2/4] Remove redundant check --- osu.Server.QueueProcessor/QueueProcessor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index b783d6f..12be26f 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -110,7 +110,7 @@ public void Run(CancellationToken cancellation = default) try { // avoid processing too many items at once - if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold) + if (totalInFlight >= config.MaxInFlightItems) { Thread.Sleep(config.TimeBetweenPolls); continue; From adc305275b413dac439a09339ae2d1196c62644b Mon Sep 17 00:00:00 2001 From: Domco Date: Tue, 10 Feb 2026 05:54:45 +0100 Subject: [PATCH 3/4] Fix code quality --- osu.Server.QueueProcessor/QueueProcessor.cs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index 12be26f..e714ab5 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -48,6 +48,11 @@ public abstract class QueueProcessor where T : QueueItem private readonly Lazy redis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); + /// + /// Separate redis instance (connection) dedicated to blocking calls. + /// Must not be accessed from more than one thread. Currently used only in . + /// This is a workaround for StackExchange.Redis not offering support for operations like BRPOP. + /// private readonly Lazy blockingRedis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); /// @@ -55,13 +60,6 @@ public abstract class QueueProcessor where T : QueueItem /// protected IDatabase Redis => redis.Value; - /// - /// Allows access to a separate redis instance (connection) dedicated to blocking calls. - /// Must not be accessed from more than one thread. Currently used only in . - /// This is a workaround for StackExchange.Redis not offering support for operations like BRPOP. - /// - private IDatabase BlockingRedis => blockingRedis.Value; - private long totalProcessed; private long totalDequeued; @@ -123,7 +121,7 @@ public void Run(CancellationToken cancellation = default) // timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds) const string timeout = "1"; - RedisResult redisResult = BlockingRedis.Execute("BRPOP", QueueName, timeout); + RedisResult redisResult = blockingRedis.Value.Execute("BRPOP", QueueName, timeout); if (redisResult.IsNull) continue; From 1ae0f78fb1ae4433db20a73d408afe4a20c0958d Mon Sep 17 00:00:00 2001 From: Domco Date: Tue, 10 Feb 2026 08:30:24 +0100 Subject: [PATCH 4/4] Simplify by removing field --- osu.Server.QueueProcessor/QueueProcessor.cs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs index e714ab5..2e787b9 100644 --- a/osu.Server.QueueProcessor/QueueProcessor.cs +++ b/osu.Server.QueueProcessor/QueueProcessor.cs @@ -48,13 +48,6 @@ public abstract class QueueProcessor where T : QueueItem private readonly Lazy redis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); - /// - /// Separate redis instance (connection) dedicated to blocking calls. - /// Must not be accessed from more than one thread. Currently used only in . - /// This is a workaround for StackExchange.Redis not offering support for operations like BRPOP. - /// - private readonly Lazy blockingRedis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); - /// /// Access redis instance. /// @@ -92,6 +85,9 @@ protected QueueProcessor(QueueConfiguration config) /// An optional cancellation token. public void Run(CancellationToken cancellation = default) { + // dedicated redis connection for the BRPOP path, workaround for StackExchange.Redis not offering blocking operations + var blockingRedis = new Lazy(() => RedisAccess.GetConnection().GetDatabase()); + using (SentrySdk.Init(setupSentry)) using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5))) using (var cts = new GracefulShutdownSource(cancellation)) @@ -118,7 +114,7 @@ public void Run(CancellationToken cancellation = default) if (config.UseBlockingPop) { - // timeout in seconds, can't be higher than the Redis library timeout (default is 5 seconds) + // timeout in seconds, can't be higher than StackExchange.Redis timeout (default is 5 seconds) const string timeout = "1"; RedisResult redisResult = blockingRedis.Value.Execute("BRPOP", QueueName, timeout);