Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions osu.Server.QueueProcessor/QueueConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,14 @@ public class QueueConfiguration
/// Setting above 1 will allow processing in batches (see <see cref="QueueProcessor{T}.ProcessResults"/>).
/// </summary>
public int BatchSize { get; set; } = 1;

/// <summary>
/// When enabled, uses <c>BRPOP</c> to wait for items instead of polling with <c>RPOP</c>.
/// </summary>
/// <remarks>
/// <see cref="BatchSize"/> is ignored when this is enabled.
/// <see cref="TimeBetweenPolls"/> is still used as a delay for when processor is overloaded.
/// </remarks>
public bool UseBlockingPop { get; set; } = false;
}
}
35 changes: 28 additions & 7 deletions osu.Server.QueueProcessor/QueueProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@
/// <param name="cancellation">An optional cancellation token.</param>
public void Run(CancellationToken cancellation = default)
{
// dedicated redis connection for the BRPOP path, workaround for StackExchange.Redis not offering blocking operations
var blockingRedis = new Lazy<IDatabase>(() => RedisAccess.GetConnection().GetDatabase());

using (SentrySdk.Init(setupSentry))
using (new Timer(_ => outputStats(), null, TimeSpan.Zero, TimeSpan.FromSeconds(5)))
using (var cts = new GracefulShutdownSource(cancellation))
Expand All @@ -100,20 +103,38 @@

try
{
if (totalInFlight >= config.MaxInFlightItems || consecutiveErrors > config.ErrorThreshold)
// avoid processing too many items at once
if (totalInFlight >= config.MaxInFlightItems)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
}

var redisItems = Redis.ListRightPop(QueueName, config.BatchSize);
RedisValue[] redisItems;

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
// queue doesn't exist.
if (redisItems == null)
if (config.UseBlockingPop)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
// timeout in seconds, can't be higher than StackExchange.Redis timeout (default is 5 seconds)
const string timeout = "1";

RedisResult redisResult = blockingRedis.Value.Execute("BRPOP", QueueName, timeout);

if (redisResult.IsNull)
continue;

redisItems = [(RedisValue)redisResult[1]];
}
else
{
redisItems = Redis.ListRightPop(QueueName, config.BatchSize);

// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract (https://github.com/StackExchange/StackExchange.Redis/issues/2697)
// queue doesn't exist.
if (redisItems == null)
{
Thread.Sleep(config.TimeBetweenPolls);
continue;
}
}

List<T> items = new List<T>();
Expand Down Expand Up @@ -260,7 +281,7 @@
public void PublishMessage<TMessage>(string channelName, TMessage message)
{
Redis.Publish(new RedisChannel(channelName, RedisChannel.PatternMode.Auto), JsonConvert.SerializeObject(message));
DogStatsd.Increment("messages_published", tags: new[] { $"channel:{channelName}", $"type:{typeof(TMessage).FullName}" });

Check notice on line 284 in osu.Server.QueueProcessor/QueueProcessor.cs

View workflow job for this annotation

GitHub Actions / Code Quality

Use collection expression in osu.Server.QueueProcessor\QueueProcessor.cs on line 284
}

/// <summary>
Expand Down
Loading