diff --git a/osu.Server.QueueProcessor/QueueConfiguration.cs b/osu.Server.QueueProcessor/QueueConfiguration.cs
index 1cf6820..5174ba9 100644
--- a/osu.Server.QueueProcessor/QueueConfiguration.cs
+++ b/osu.Server.QueueProcessor/QueueConfiguration.cs
@@ -34,5 +34,14 @@ public class QueueConfiguration
/// Setting above 1 will allow processing in batches (see ).
///
public int BatchSize { get; set; } = 1;
+
+ ///
+ /// When enabled, uses BRPOP to wait for items instead of polling with RPOP.
+ ///
+ ///
+ /// is ignored when this is enabled.
+ /// is still used as a delay for when processor is overloaded.
+ ///
+ public bool UseBlockingPop { get; set; } = false;
}
}
diff --git a/osu.Server.QueueProcessor/QueueProcessor.cs b/osu.Server.QueueProcessor/QueueProcessor.cs
index 668c982..2e787b9 100644
--- a/osu.Server.QueueProcessor/QueueProcessor.cs
+++ b/osu.Server.QueueProcessor/QueueProcessor.cs
@@ -85,6 +85,9 @@ protected QueueProcessor(QueueConfiguration config)
/// An optional cancellation token.
public void Run(CancellationToken cancellation = default)
{
+ // dedicated redis connection for the BRPOP path, workaround for StackExchange.Redis not offering blocking operations
+ var blockingRedis = new Lazy(() => RedisAccess.GetConnection().GetDatabase());
+
using (SentrySdk.Init(setupSentry))
using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
using (var cts = new GracefulShutdownSource(cancellation))
@@ -100,20 +103,38 @@ public void Run(CancellationToken cancellation = default)
try
{
- if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold)
+ // avoid processing too many items at once
+ if (totalInFlight >= config.MaxInFlightItems)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
}
- var redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
+ RedisValue[] redisItems;
- // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
- // queue doesn't exist.
- if (redisItems == null)
+ if (config.UseBlockingPop)
{
- Thread.Sleep(config.TimeBetweenPolls);
- continue;
+ // timeout in seconds, can't be higher than StackExchange.Redis timeout (default is 5 seconds)
+ const string timeout = "1";
+
+ RedisResult redisResult = blockingRedis.Value.Execute("BRPOP", QueueName, timeout);
+
+ if (redisResult.IsNull)
+ continue;
+
+ redisItems = [(RedisValue)redisResult[1]];
+ }
+ else
+ {
+ redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
+
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
+ // queue doesn't exist.
+ if (redisItems == null)
+ {
+ Thread.Sleep(config.TimeBetweenPolls);
+ continue;
+ }
}
List items = new List();