From 8d0ff4f688dae9bf93bede2fa77f76c83d2062d7 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Tue, 3 Feb 2026 16:05:06 -0800 Subject: [PATCH 01/12] Detect and recover from master worker death during setup When the master worker is killed before completing queue setup, the master-status stays at 'setup' and all non-master workers poll for ~120s then fail with LostMaster. This change adds a master setup heartbeat mechanism that allows workers to detect a dead master and atomically re-elect a new one. Changes: - Add master_setup_heartbeat_interval (5s) and master_setup_heartbeat_timeout (30s) config options - Master sends heartbeat during setup via background thread - Workers detect stale heartbeat and attempt atomic takeover via Lua script - Guard in push() prevents split-brain (old master pushing after replacement) Co-Authored-By: Claude Opus 4.5 --- redis/takeover_master.lua | 50 ++++++++++++ ruby/lib/ci/queue/configuration.rb | 7 +- ruby/lib/ci/queue/redis/base.rb | 31 ++++++++ ruby/lib/ci/queue/redis/worker.rb | 120 +++++++++++++++++++++++++++-- 4 files changed, 201 insertions(+), 7 deletions(-) create mode 100644 redis/takeover_master.lua diff --git a/redis/takeover_master.lua b/redis/takeover_master.lua new file mode 100644 index 0000000..54f7e46 --- /dev/null +++ b/redis/takeover_master.lua @@ -0,0 +1,50 @@ +-- Atomically attempt to take over as master when current master is dead during setup +-- Returns 1 if takeover succeeded, 0 otherwise + +local master_status_key = KEYS[1] +local master_worker_id_key = KEYS[2] +local master_setup_heartbeat_key = KEYS[3] + +local new_worker_id = ARGV[1] +local current_time = tonumber(ARGV[2]) +local heartbeat_timeout = tonumber(ARGV[3]) +local redis_ttl = tonumber(ARGV[4]) + +-- Step 1: Verify status is still 'setup' +local status = redis.call('get', master_status_key) +if status ~= 'setup' then + return 0 +end + +-- Step 2: Check if heartbeat is stale or missing +local last_heartbeat = redis.call('get', master_setup_heartbeat_key) +if last_heartbeat then + local heartbeat_age = current_time - tonumber(last_heartbeat) + if heartbeat_age < heartbeat_timeout then + -- Master is still alive, heartbeat is fresh + return 0 + end +end +-- If no heartbeat exists and status is 'setup', master may have died before first heartbeat +-- Allow takeover in this case (heartbeat_timeout acts as grace period for initial setup) + +-- Step 3: Delete old master-status to allow SETNX +redis.call('del', master_status_key) + +-- Step 4: Atomically try to claim master role +local claimed = redis.call('setnx', master_status_key, 'setup') +if claimed == 0 then + -- Another worker beat us to it + return 0 +end + +-- Step 5: We got master role - update worker ID and heartbeat +redis.call('set', master_worker_id_key, new_worker_id) +redis.call('set', master_setup_heartbeat_key, current_time) + +-- Set TTLs +redis.call('expire', master_status_key, redis_ttl) +redis.call('expire', master_worker_id_key, redis_ttl) +redis.call('expire', master_setup_heartbeat_key, redis_ttl) + +return 1 diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 9e9c133..200e889 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -15,6 +15,7 @@ class Configuration attr_accessor :timing_redis_url attr_accessor :write_duration_averages attr_accessor :heartbeat_grace_period, :heartbeat_interval + attr_accessor :master_setup_heartbeat_interval, :master_setup_heartbeat_timeout attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -66,7 +67,9 @@ def initialize( branch: nil, timing_redis_url: nil, heartbeat_grace_period: 30, - heartbeat_interval: 10 + heartbeat_interval: 10, + master_setup_heartbeat_interval: 5, + master_setup_heartbeat_timeout: 30 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -105,6 +108,8 @@ def initialize( @write_duration_averages = false @heartbeat_grace_period = heartbeat_grace_period @heartbeat_interval = heartbeat_interval + @master_setup_heartbeat_interval = master_setup_heartbeat_interval + @master_setup_heartbeat_timeout = master_setup_heartbeat_timeout end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 9ebdd58..8599fab 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -56,9 +56,27 @@ def progress def wait_for_master(timeout: 120) return true if master? + last_takeover_check = CI::Queue.time_now.to_f + (timeout * 10 + 1).to_i.times do return true if queue_initialized? + # Periodically check if master is dead and attempt takeover + current_time = CI::Queue.time_now.to_f + if current_time - last_takeover_check >= config.master_setup_heartbeat_interval + last_takeover_check = current_time + + if queue_initializing? && master_setup_heartbeat_stale? + if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover + # Takeover succeeded - run master setup + if respond_to?(:run_master_setup, true) + run_master_setup + return true + end + end + end + end + sleep 0.1 end raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting." @@ -97,6 +115,19 @@ def master_worker_id redis.get(key('master-worker-id')) end + # Check if the master setup heartbeat is stale (or missing) + # Returns true if heartbeat is older than master_setup_heartbeat_timeout + def master_setup_heartbeat_stale? + heartbeat = redis.get(key('master-setup-heartbeat')) + return true unless heartbeat # No heartbeat = stale (master may have died before first heartbeat) + + current_time = CI::Queue.time_now.to_f + heartbeat_age = current_time - heartbeat.to_f + heartbeat_age >= config.master_setup_heartbeat_timeout + rescue *CONNECTION_ERRORS + false # On connection error, don't attempt takeover + end + private attr_reader :redis, :redis_url diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 7de1f17..c3c49c2 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -2,6 +2,8 @@ require 'ci/queue/static' require 'concurrent/set' +require 'digest/sha2' +require 'set' module CI module Queue @@ -39,15 +41,17 @@ def populate(tests, random: Random.new) @total = tests.size if acquire_master_role? - executables = reorder_tests(tests, random: random) + with_master_setup_heartbeat do + executables = reorder_tests(tests, random: random) - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - store_chunk_metadata(chunks) if chunks.any? + store_chunk_metadata(chunks) if chunks.any? - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end end register_worker_presence @@ -294,6 +298,44 @@ def with_heartbeat(test_id) heartbeat_thread&.join(1) # Wait up to 1 second for thread to finish end + # Runs a block while sending periodic heartbeats during master setup. + # This allows other workers to detect if the master dies during setup. + def with_master_setup_heartbeat + return yield unless config.master_setup_heartbeat_interval&.positive? + + # Send initial heartbeat immediately + send_master_setup_heartbeat + + stop_heartbeat = false + heartbeat_thread = Thread.new do + until stop_heartbeat + sleep(config.master_setup_heartbeat_interval) + break if stop_heartbeat + + begin + send_master_setup_heartbeat + rescue StandardError => e + warn("[master-setup-heartbeat] Failed to send heartbeat: #{e.message}") + end + end + end + + yield + ensure + stop_heartbeat = true + heartbeat_thread&.kill + heartbeat_thread&.join(1) + end + + # Send a heartbeat to indicate master is still alive during setup + def send_master_setup_heartbeat + current_time = CI::Queue.time_now.to_f + redis.set(key('master-setup-heartbeat'), current_time) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + rescue *CONNECTION_ERRORS => e + warn("[master-setup-heartbeat] Connection error: #{e.message}") + end + def ensure_connection_and_script(script) # Pre-initialize Redis connection and script in current thread context # This ensures background threads use the same initialized connection @@ -441,6 +483,15 @@ def push(tests) @total = tests.size if @master + # Guard against split-brain: verify we're still the master before pushing + # This prevents race where old master reconnects after being replaced + current_master = redis.get(key('master-worker-id')) + if current_master && current_master != worker_id + warn "Worker #{worker_id} lost master role to #{current_master}, aborting push" + @master = false + return + end + redis.multi do |transaction| transaction.lpush(key('queue'), tests) unless tests.empty? transaction.set(key('total'), @total) @@ -480,6 +531,40 @@ def acquire_master_role? false end + # Attempt to take over as master when current master appears dead during setup. + # Uses atomic Lua script to ensure only one worker can win the takeover. + # Returns true if takeover succeeded, false otherwise. + def attempt_master_takeover + return false if @master # Already master + + current_time = CI::Queue.time_now.to_f + result = eval_script( + :takeover_master, + keys: [ + key('master-status'), + key('master-worker-id'), + key('master-setup-heartbeat') + ], + argv: [ + worker_id, + current_time, + config.master_setup_heartbeat_timeout, + config.redis_ttl + ] + ) + + if result == 1 + @master = true + warn "Worker #{worker_id} took over as master (previous master died during setup)" + true + else + false + end + rescue *CONNECTION_ERRORS => e + warn "[takeover] Connection error during takeover attempt: #{e.message}" + false + end + def register_worker_presence register redis.expire(key('workers'), config.redis_ttl) @@ -487,6 +572,29 @@ def register_worker_presence raise if master? end + # Run master setup after a successful takeover. + # Reconstructs the work the original master would have done. + def run_master_setup + return unless @master && @index + + # Reconstruct tests array from index + tests = @index.values + + with_master_setup_heartbeat do + # Use the same seed for deterministic ordering + random = Random.new(Digest::SHA256.hexdigest(config.seed).to_i(16)) + executables = reorder_tests(tests, random: random) + + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end + end + def store_chunk_metadata(chunks) # Batch operations to avoid exceeding Redis multi operation limits # Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively From 51df5db3aa16550ed6f606833d289c1d4de3a2ed Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 14:20:00 -0800 Subject: [PATCH 02/12] Fix TOCTOU race in push() using WATCH/MULTI The previous guard check in push() had a race window between checking master-worker-id and executing the transaction. A takeover could occur in that window, causing both old and new master to push tests. Use Redis WATCH to monitor master-worker-id. If it changes between WATCH and MULTI execution, the transaction automatically aborts, preventing duplicate test pushes. Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 43 +++++++++++++++++++------------ 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index c3c49c2..8978326 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -481,26 +481,37 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size + return unless @master + + # Use WATCH/MULTI for atomic check-and-push to prevent TOCTOU race. + # If master-worker-id changes between WATCH and MULTI, transaction aborts. + result = redis.watch(key('master-worker-id')) do |rd| + current_master = rd.get(key('master-worker-id')) - if @master - # Guard against split-brain: verify we're still the master before pushing - # This prevents race where old master reconnects after being replaced - current_master = redis.get(key('master-worker-id')) if current_master && current_master != worker_id - warn "Worker #{worker_id} lost master role to #{current_master}, aborting push" - @master = false - return + # We're not the master anymore, unwatch and abort + rd.unwatch + :not_master + else + # We're still master, execute atomic transaction + rd.multi do |transaction| + transaction.lpush(key('queue'), tests) unless tests.empty? + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + + transaction.expire(key('queue'), config.redis_ttl) + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end end + end - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) - end + # result is nil if WATCH detected a change (race condition) + # result is :not_master if we detected we're not master + # result is an array of responses if transaction succeeded + if result.nil? || result == :not_master + warn "Worker #{worker_id} lost master role (race detected), aborting push" + @master = false end rescue *CONNECTION_ERRORS raise if @master From bbb62d9e347a953ac2ee91329e69cf5464498f2f Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:07:13 -0800 Subject: [PATCH 03/12] Fix race condition: set initial heartbeat in acquire_master_role Previously, there was a window between acquiring master role and entering with_master_setup_heartbeat where no heartbeat existed. Other workers could see status='setup' with no heartbeat and incorrectly attempt takeover. Now the initial heartbeat is set immediately after acquiring master role, closing this window. Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 8978326..71ac91e 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -529,9 +529,15 @@ def acquire_master_role? begin redis.set(key('master-worker-id'), worker_id) redis.expire(key('master-worker-id'), config.redis_ttl) + + # Set initial heartbeat immediately to prevent premature takeover + # This closes the window where status='setup' but no heartbeat exists + redis.set(key('master-setup-heartbeat'), CI::Queue.time_now.to_f) + redis.expire(key('master-setup-heartbeat'), config.redis_ttl) + warn "Worker #{worker_id} elected as master" rescue *CONNECTION_ERRORS - # If setting master-worker-id fails, we still have master status + # If setting master-worker-id/heartbeat fails, we still have master status # Log but don't lose master role warn("Failed to set master-worker-id: #{$!.message}") end From 86e003a14ff03af805a3a1d618ebc10eb92a54ea Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:08:16 -0800 Subject: [PATCH 04/12] Fix race condition: check status before running master setup after takeover After taking over as master, check if master-status is still "setup" before running setup. If status is already "ready", the original master completed its push before we could act, so we skip to avoid duplicate test pushes. This fixes the race where: 1. Original master's push() passes WATCH check 2. Heartbeat becomes stale during redis.multi execution 3. Worker takes over (status still "setup") 4. Original master's transaction commits, sets status="ready" 5. Without this fix, takeover worker would also push tests Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/worker.rb | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 71ac91e..13b0367 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -594,6 +594,19 @@ def register_worker_presence def run_master_setup return unless @master && @index + # Check if setup is already complete (original master finished before we took over) + # This prevents duplicate test pushes in the race where: + # 1. Original master's push() passes WATCH check + # 2. Original master's heartbeat becomes stale during redis.multi + # 3. We take over (status still "setup") + # 4. Original master's transaction commits, sets status="ready" + # 5. We would push again without this check + status = master_status + if status != 'setup' + warn "Worker #{worker_id} took over but setup already complete (status=#{status}), skipping" + return + end + # Reconstruct tests array from index tests = @index.values From 402d00adbce6907e719af47e15453cdddfb4067b Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:25:18 -0800 Subject: [PATCH 05/12] Revert "Fix race condition: check status before running master setup after takeover" This reverts commit 636796e6470f8f9a0fda74b668791dacb30836d4. --- ruby/lib/ci/queue/redis/worker.rb | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 13b0367..71ac91e 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -594,19 +594,6 @@ def register_worker_presence def run_master_setup return unless @master && @index - # Check if setup is already complete (original master finished before we took over) - # This prevents duplicate test pushes in the race where: - # 1. Original master's push() passes WATCH check - # 2. Original master's heartbeat becomes stale during redis.multi - # 3. We take over (status still "setup") - # 4. Original master's transaction commits, sets status="ready" - # 5. We would push again without this check - status = master_status - if status != 'setup' - warn "Worker #{worker_id} took over but setup already complete (status=#{status}), skipping" - return - end - # Reconstruct tests array from index tests = @index.values From 48442ba37b29f24dbff9203caa9c50e35f1a8cad Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:26:57 -0800 Subject: [PATCH 06/12] Fix md5 hash difference --- ruby/lib/ci/queue/redis/worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 71ac91e..5fb659a 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -599,7 +599,7 @@ def run_master_setup with_master_setup_heartbeat do # Use the same seed for deterministic ordering - random = Random.new(Digest::SHA256.hexdigest(config.seed).to_i(16)) + random = Random.new(Digest::MD5.hexdigest(config.seed).to_i(16)) executables = reorder_tests(tests, random: random) chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } From ddfef015692f8145fdf48872dcd33db8533d1084 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:45:15 -0800 Subject: [PATCH 07/12] Refactor: extract execute_master_setup to remove duplication Extract shared master setup logic (reorder tests, store chunk metadata, push to queue) into execute_master_setup method. Used by both initial master setup in populate() and takeover in run_master_setup(). Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/base.rb | 2 +- ruby/lib/ci/queue/redis/worker.rb | 29 ++++++----------------------- 2 files changed, 7 insertions(+), 24 deletions(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 8599fab..0314b69 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -70,7 +70,7 @@ def wait_for_master(timeout: 120) if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover # Takeover succeeded - run master setup if respond_to?(:run_master_setup, true) - run_master_setup + execute_master_setup return true end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 5fb659a..f182f16 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -39,20 +39,9 @@ def populate(tests, random: Random.new) # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size + @random = random - if acquire_master_role? - with_master_setup_heartbeat do - executables = reorder_tests(tests, random: random) - - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - - store_chunk_metadata(chunks) if chunks.any? - - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) - end - end + execute_master_setup(tests) if acquire_master_role? register_worker_presence @@ -589,18 +578,12 @@ def register_worker_presence raise if master? end - # Run master setup after a successful takeover. - # Reconstructs the work the original master would have done. - def run_master_setup + # Shared logic for master setup - reorders tests, stores chunk metadata, and pushes to queue. + # Used by both initial master setup (populate) and takeover. + def execute_master_setup(tests) return unless @master && @index - - # Reconstruct tests array from index - tests = @index.values - with_master_setup_heartbeat do - # Use the same seed for deterministic ordering - random = Random.new(Digest::MD5.hexdigest(config.seed).to_i(16)) - executables = reorder_tests(tests, random: random) + executables = reorder_tests(tests, random: @random) chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } From 9b76034e03bac974a789f6ce7fead18223e57a86 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 5 Feb 2026 15:58:45 -0800 Subject: [PATCH 08/12] add logging --- ruby/lib/ci/queue/redis/worker.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index f182f16..810f723 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -543,6 +543,8 @@ def acquire_master_role? def attempt_master_takeover return false if @master # Already master + warn "Worker #{worker_id} attempting to takeover as master" + current_time = CI::Queue.time_now.to_f result = eval_script( :takeover_master, @@ -564,6 +566,7 @@ def attempt_master_takeover warn "Worker #{worker_id} took over as master (previous master died during setup)" true else + warn "Failed to takeover as master. Current master is #{master_worker_id}" false end rescue *CONNECTION_ERRORS => e From 9912e2880aedda93bde43dd994c8aa768df1f7d7 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Tue, 10 Feb 2026 16:02:32 -0800 Subject: [PATCH 09/12] Fix argument error --- ruby/lib/ci/queue/redis/worker.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 810f723..3e6bdef 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -40,8 +40,9 @@ def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size @random = random + @tests = tests - execute_master_setup(tests) if acquire_master_role? + execute_master_setup if acquire_master_role? register_worker_presence @@ -543,7 +544,7 @@ def acquire_master_role? def attempt_master_takeover return false if @master # Already master - warn "Worker #{worker_id} attempting to takeover as master" + warn "Worker #{worker_id} attempting to takeover as master from #{master_worker_id}" current_time = CI::Queue.time_now.to_f result = eval_script( @@ -586,7 +587,7 @@ def register_worker_presence def execute_master_setup(tests) return unless @master && @index with_master_setup_heartbeat do - executables = reorder_tests(tests, random: @random) + executables = reorder_tests(@tests, random: @random) chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } From 35b723b4dae756ef55094c0d42860a2e77eaf9f1 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Tue, 10 Feb 2026 16:50:43 -0800 Subject: [PATCH 10/12] Fix method name mismatch in master takeover logic The respond_to? check was looking for :run_master_setup but the actual method is named :execute_master_setup, causing takeover to succeed but never run setup or return early, eventually timing out with LostMaster. Co-Authored-By: Claude Opus 4.5 --- ruby/lib/ci/queue/redis/base.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 0314b69..27326ab 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -69,7 +69,7 @@ def wait_for_master(timeout: 120) if queue_initializing? && master_setup_heartbeat_stale? if respond_to?(:attempt_master_takeover, true) && attempt_master_takeover # Takeover succeeded - run master setup - if respond_to?(:run_master_setup, true) + if respond_to?(:execute_master_setup, true) execute_master_setup return true end From 772f001af176643e00ad9fb3ad89bddb0c13234f Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Tue, 10 Feb 2026 17:06:39 -0800 Subject: [PATCH 11/12] quick fix --- ruby/lib/ci/queue/redis/worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 3e6bdef..8e04eeb 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -584,7 +584,7 @@ def register_worker_presence # Shared logic for master setup - reorders tests, stores chunk metadata, and pushes to queue. # Used by both initial master setup (populate) and takeover. - def execute_master_setup(tests) + def execute_master_setup return unless @master && @index with_master_setup_heartbeat do executables = reorder_tests(@tests, random: @random) From c606bb0850b28aae42c8e3c6c4eaf0106d6c253d Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Thu, 12 Feb 2026 09:31:44 -0800 Subject: [PATCH 12/12] Add unit and integration tests for master setup heartbeat feature Tests cover: - Configuration options (master_setup_heartbeat_interval, master_setup_heartbeat_timeout) - master_setup_heartbeat_stale? detection - send_master_setup_heartbeat functionality - attempt_master_takeover success/failure scenarios - with_master_setup_heartbeat periodic heartbeats - wait_for_master with takeover detection - push with WATCH/MULTI race detection - Integration scenarios for master death and takeover Co-Authored-By: Claude Opus 4.5 --- .../redis/master_setup_heartbeat_test.rb | 395 ++++++++++++++++++ ...master_setup_heartbeat_integration_test.rb | 325 ++++++++++++++ 2 files changed, 720 insertions(+) create mode 100644 ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb create mode 100644 ruby/test/integration/master_setup_heartbeat_integration_test.rb diff --git a/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb b/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb new file mode 100644 index 0000000..2cfea5c --- /dev/null +++ b/ruby/test/ci/queue/redis/master_setup_heartbeat_test.rb @@ -0,0 +1,395 @@ +# frozen_string_literal: true + +require 'test_helper' + +module CI + module Queue + module Redis + class MasterSetupHeartbeatTest < Minitest::Test + TEST_LIST = %w[ + ATest#test_foo + ATest#test_bar + BTest#test_foo + ].freeze + + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = ::Redis.new(url: @redis_url) + @redis.flushdb + end + + def teardown + @redis.flushdb + end + + # === Configuration Tests === + + def test_master_setup_heartbeat_interval_default + config = Configuration.new + assert_equal 5, config.master_setup_heartbeat_interval + end + + def test_master_setup_heartbeat_timeout_default + config = Configuration.new + assert_equal 30, config.master_setup_heartbeat_timeout + end + + def test_master_setup_heartbeat_interval_configurable + config = Configuration.new(master_setup_heartbeat_interval: 10) + assert_equal 10, config.master_setup_heartbeat_interval + end + + def test_master_setup_heartbeat_timeout_configurable + config = Configuration.new(master_setup_heartbeat_timeout: 60) + assert_equal 60, config.master_setup_heartbeat_timeout + end + + # === master_setup_heartbeat_stale? Tests === + + def test_master_setup_heartbeat_stale_when_no_heartbeat_exists + worker = create_worker(1) + assert worker.master_setup_heartbeat_stale?, 'Should be stale when no heartbeat exists' + end + + def test_master_setup_heartbeat_not_stale_when_fresh + # Set a fresh heartbeat + @redis.set('build:42:master-setup-heartbeat', CI::Queue.time_now.to_f) + + worker = create_worker(1, master_setup_heartbeat_timeout: 30) + refute worker.master_setup_heartbeat_stale?, 'Should not be stale when heartbeat is fresh' + end + + def test_master_setup_heartbeat_stale_when_old + # Set a stale heartbeat (older than timeout) + old_time = CI::Queue.time_now.to_f - 60 # 60 seconds ago + @redis.set('build:42:master-setup-heartbeat', old_time) + + worker = create_worker(1, master_setup_heartbeat_timeout: 30) + assert worker.master_setup_heartbeat_stale?, 'Should be stale when heartbeat is older than timeout' + end + + def test_master_setup_heartbeat_exactly_at_timeout + # Set heartbeat exactly at timeout boundary + timeout = 30 + boundary_time = CI::Queue.time_now.to_f - timeout + @redis.set('build:42:master-setup-heartbeat', boundary_time) + + worker = create_worker(1, master_setup_heartbeat_timeout: timeout) + assert worker.master_setup_heartbeat_stale?, 'Should be stale at exactly timeout' + end + + # === send_master_setup_heartbeat Tests === + + def test_send_master_setup_heartbeat_sets_key + worker = create_worker(1) + worker.send(:send_master_setup_heartbeat) + + heartbeat = @redis.get('build:42:master-setup-heartbeat') + refute_nil heartbeat, 'Heartbeat key should be set' + assert_in_delta CI::Queue.time_now.to_f, heartbeat.to_f, 1.0 + end + + def test_send_master_setup_heartbeat_sets_ttl + worker = create_worker(1) + worker.send(:send_master_setup_heartbeat) + + ttl = @redis.ttl('build:42:master-setup-heartbeat') + assert ttl > 0, 'Heartbeat should have TTL set' + end + + # === acquire_master_role? Tests === + + def test_acquire_master_role_sets_initial_heartbeat + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + heartbeat = @redis.get('build:42:master-setup-heartbeat') + refute_nil heartbeat, 'Initial heartbeat should be set after acquiring master' + end + + def test_acquire_master_role_sets_master_worker_id + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + master_id = @redis.get('build:42:master-worker-id') + assert_equal '1', master_id + end + + # === attempt_master_takeover Tests === + + def test_attempt_master_takeover_fails_when_already_master + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + worker.populate(tests, random: Random.new(0)) + + refute worker.send(:attempt_master_takeover), 'Should not attempt takeover when already master' + end + + def test_attempt_master_takeover_fails_when_heartbeat_fresh + # Set up initial master + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + @redis.set('build:42:master-setup-heartbeat', CI::Queue.time_now.to_f) + + # Second worker tries to take over - should fail + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + refute worker2.send(:attempt_master_takeover), 'Should not takeover when heartbeat is fresh' + + # Original master should still be set + assert_equal '1', @redis.get('build:42:master-worker-id') + end + + def test_attempt_master_takeover_succeeds_when_heartbeat_stale + # Set up initial master with stale heartbeat + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Second worker tries to take over - should succeed + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + assert worker2.send(:attempt_master_takeover), 'Should takeover when heartbeat is stale' + + # Worker 2 should now be master + assert_equal '2', @redis.get('build:42:master-worker-id') + assert worker2.master?, 'Worker 2 should be master after takeover' + end + + def test_attempt_master_takeover_fails_when_status_not_setup + # Set up master that completed setup + @redis.set('build:42:master-status', 'ready') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Second worker tries to take over - should fail because status is 'ready' + worker2 = create_worker(2, master_setup_heartbeat_timeout: 30) + refute worker2.send(:attempt_master_takeover), 'Should not takeover when status is not setup' + end + + def test_attempt_master_takeover_only_one_wins + # Set up initial master with stale heartbeat + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Multiple workers try to take over simultaneously + workers = (2..5).map { |i| create_worker(i, master_setup_heartbeat_timeout: 30) } + + results = workers.map { |w| w.send(:attempt_master_takeover) } + + # Exactly one should succeed + assert_equal 1, results.count(true), 'Exactly one worker should win takeover' + end + + # === with_master_setup_heartbeat Tests === + + def test_with_master_setup_heartbeat_sends_heartbeats + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + + heartbeat_times = [] + worker.send(:with_master_setup_heartbeat) do + 3.times do + sleep 0.15 + heartbeat = @redis.get('build:42:master-setup-heartbeat') + heartbeat_times << heartbeat.to_f if heartbeat + end + end + + # Should have received multiple heartbeats + assert heartbeat_times.size >= 2, "Should have at least 2 heartbeat readings, got #{heartbeat_times.size}" + # Heartbeats should be progressively newer + assert heartbeat_times == heartbeat_times.sort, 'Heartbeats should be progressively newer' + end + + def test_with_master_setup_heartbeat_stops_after_block + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + + worker.send(:with_master_setup_heartbeat) do + sleep 0.2 + end + + initial_heartbeat = @redis.get('build:42:master-setup-heartbeat').to_f + sleep 0.3 + final_heartbeat = @redis.get('build:42:master-setup-heartbeat').to_f + + # Heartbeat should not have updated after block completed + assert_equal initial_heartbeat, final_heartbeat, 'Heartbeat should stop updating after block' + end + + # === wait_for_master with Takeover Tests === + + def test_wait_for_master_triggers_takeover_when_master_dead + # Set up a dead master (status = setup, but stale heartbeat) + @redis.set('build:42:master-status', 'setup') + @redis.set('build:42:master-worker-id', '1') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:42:master-setup-heartbeat', old_time) + + # Worker 2 waits for master - should trigger takeover + worker2 = create_worker( + 2, + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Populate will trigger wait_for_master for non-master workers + # We need to simulate this by having populate fail to acquire master + # but then wait_for_master should detect dead master and takeover + + # First, create the index without populating + worker2.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + worker2.instance_variable_set(:@total, tests.size) + worker2.instance_variable_set(:@random, Random.new(0)) + worker2.instance_variable_set(:@tests, tests) + worker2.instance_variable_set(:@master, false) + + # Now call wait_for_master - should detect dead master and take over + result = worker2.wait_for_master(timeout: 5) + + assert result, 'wait_for_master should return true after takeover' + assert worker2.master?, 'Worker 2 should be master after takeover' + assert_equal '2', @redis.get('build:42:master-worker-id') + end + + def test_wait_for_master_does_not_takeover_when_master_alive + # Master worker sets up and sends heartbeats + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Start master in background thread + master_thread = Thread.new do + worker1.populate(tests, random: Random.new(0)) + end + + # Worker 2 waits - should wait for master to finish, not take over + worker2 = create_worker( + 2, + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1, + populate: false + ) + + result = worker2.wait_for_master(timeout: 5) + master_thread.join + + assert result, 'wait_for_master should return true' + refute worker2.master?, 'Worker 2 should not be master' + assert worker1.master?, 'Worker 1 should be master' + end + + # === Push with WATCH/MULTI Tests === + + def test_push_aborts_when_master_id_changes + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up worker1 as master + worker1.instance_variable_set(:@master, true) + worker1.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + @redis.set('build:42:master-worker-id', '1') + + # Simulate race: another worker changed master-worker-id between WATCH and MULTI + # We can't directly test the race, but we can verify the behavior + # when master-worker-id doesn't match + + # Change master-worker-id before push + @redis.set('build:42:master-worker-id', '2') + + worker1.send(:push, TEST_LIST) + + # Worker1 should have detected the race and aborted + refute worker1.master?, 'Worker1 should lose master role after race detection' + end + + def test_push_succeeds_when_master_id_matches + worker1 = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Actually populate to set up master correctly + worker1.populate(tests, random: Random.new(0)) + + assert worker1.master?, 'Worker1 should be master' + assert_equal 'ready', @redis.get('build:42:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:42:queue') + end + + # === execute_master_setup Tests === + + def test_execute_master_setup_runs_with_heartbeat + worker = create_worker(1, master_setup_heartbeat_interval: 0.1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up worker as master with required state + worker.instance_variable_set(:@master, true) + worker.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + worker.instance_variable_set(:@tests, tests) + worker.instance_variable_set(:@random, Random.new(0)) + @redis.set('build:42:master-worker-id', '1') + + worker.send(:execute_master_setup) + + # Queue should be populated + assert_equal 'ready', @redis.get('build:42:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:42:queue') + end + + def test_execute_master_setup_does_nothing_when_not_master + worker = create_worker(1) + tests = TEST_LIST.map { |id| MockTest.new(id) } + + worker.instance_variable_set(:@master, false) + worker.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + + worker.send(:execute_master_setup) + + # Queue should not be populated + assert_nil @redis.get('build:42:master-status') + end + + private + + class MockTest + attr_reader :id + + def initialize(id) + @id = id + end + + def <=>(other) + id <=> other.id + end + + def flaky? + false + end + + def tests + [self] + end + end + + def create_worker(id, **options) + skip_populate = options.delete(:populate) == false + config_options = { + build_id: '42', + worker_id: id.to_s, + timeout: 0.2, + timing_redis_url: @redis_url, + master_setup_heartbeat_interval: options.delete(:master_setup_heartbeat_interval) || 5, + master_setup_heartbeat_timeout: options.delete(:master_setup_heartbeat_timeout) || 30 + }.merge(options) + + CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new(**config_options) + ) + end + end + end + end +end diff --git a/ruby/test/integration/master_setup_heartbeat_integration_test.rb b/ruby/test/integration/master_setup_heartbeat_integration_test.rb new file mode 100644 index 0000000..9e4d902 --- /dev/null +++ b/ruby/test/integration/master_setup_heartbeat_integration_test.rb @@ -0,0 +1,325 @@ +# frozen_string_literal: true + +require 'test_helper' +require 'timeout' + +module Integration + class MasterSetupHeartbeatIntegrationTest < Minitest::Test + TEST_LIST = %w[ + ATest#test_foo + ATest#test_bar + BTest#test_foo + BTest#test_bar + CTest#test_foo + ].freeze + + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = ::Redis.new(url: @redis_url) + @redis.flushdb + @threads = [] + end + + def teardown + @threads.each(&:kill) + @threads.each { |t| t.join(1) } + @redis.flushdb + end + + # === Scenario: Master dies during setup, follower takes over === + + def test_follower_takes_over_when_master_dies_during_setup + # Simulate master that starts setup but dies before completing + # by setting up the 'setup' state without completing it + + # Create a "dead" master state + @redis.set('build:100:master-status', 'setup') + @redis.set('build:100:master-worker-id', 'dead_master') + # Set a stale heartbeat (old enough to trigger takeover) + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:100:master-setup-heartbeat', old_time) + + # Start a follower worker that should detect dead master and take over + tests = TEST_LIST.map { |id| MockTest.new(id) } + follower = create_worker( + 1, + build_id: '100', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.2 + ) + + # Populate will trigger wait_for_master which should detect dead master + follower.populate(tests, random: Random.new(0)) + + # Follower should have taken over + assert follower.master?, 'Follower should become master after takeover' + assert_equal '1', @redis.get('build:100:master-worker-id') + assert_equal 'ready', @redis.get('build:100:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:100:queue') + end + + def test_multiple_followers_competing_for_takeover + # Simulate dead master + @redis.set('build:101:master-status', 'setup') + @redis.set('build:101:master-worker-id', 'dead_master') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:101:master-setup-heartbeat', old_time) + + tests = TEST_LIST.map { |id| MockTest.new(id) } + workers = [] + results = [] + mutex = Mutex.new + + # Start multiple followers simultaneously + 5.times do |i| + @threads << Thread.new do + worker = create_worker( + i + 1, + build_id: '101', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + begin + worker.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + mutex.synchronize do + workers << worker + results << worker.master? + end + rescue StandardError => e + mutex.synchronize { results << false } + end + end + end + + # Wait for all threads to complete + @threads.each { |t| t.join(10) } + + # Exactly one worker should be master + master_count = results.count(true) + assert_equal 1, master_count, "Expected exactly 1 master, got #{master_count}" + + # Queue should be properly set up + assert_equal 'ready', @redis.get('build:101:master-status') + assert_equal TEST_LIST.size, @redis.llen('build:101:queue') + end + + def test_follower_waits_for_healthy_master_to_complete + tests = TEST_LIST.map { |id| MockTest.new(id) } + master_completed = false + follower_completed = false + + # Start master worker that takes time to complete setup + @threads << Thread.new do + master = create_worker( + 1, + build_id: '102', + master_setup_heartbeat_interval: 0.1 + ) + # Simulate slow setup by adding sleep in the middle + # The with_master_setup_heartbeat should keep heartbeat fresh + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + master_completed = true + end + + # Give master time to start setup + sleep 0.2 + + # Start follower - should wait for master, not take over + @threads << Thread.new do + follower = create_worker( + 2, + build_id: '102', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + follower_completed = true + refute follower.master?, 'Follower should not become master when master is healthy' + end + + # Wait for both to complete + @threads.each { |t| t.join(10) } + + assert master_completed, 'Master should have completed' + assert follower_completed, 'Follower should have completed' + + # Master should still be worker 1 + assert_equal '1', @redis.get('build:102:master-worker-id') + end + + def test_heartbeat_prevents_premature_takeover + # Test that a fresh heartbeat prevents takeover even with short timeout + + # First, a real master starts up and begins setup + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Create master and acquire role + master = create_worker( + 1, + build_id: '103', + master_setup_heartbeat_interval: 0.1 + ) + + # Manually acquire master role to control timing + master.instance_variable_set(:@index, tests.map { |t| [t.id, t] }.to_h) + master.instance_variable_set(:@total, tests.size) + master.instance_variable_set(:@tests, tests) + master.instance_variable_set(:@random, Random.new(0)) + + # Acquire master role - this sets up initial state + master.send(:acquire_master_role?) + assert master.master?, 'Worker 1 should be master' + + # Send a fresh heartbeat + master.send(:send_master_setup_heartbeat) + + # Now try to have another worker take over - should fail + worker2 = create_worker( + 2, + build_id: '103', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + + # Worker 2 should not be able to take over because heartbeat is fresh + refute worker2.send(:attempt_master_takeover), 'Should not takeover when heartbeat is fresh' + assert_equal '1', @redis.get('build:103:master-worker-id') + end + + def test_end_to_end_normal_operation_with_heartbeat + # Test that normal operation with heartbeat works correctly + tests = TEST_LIST.map { |id| MockTest.new(id) } + processed_tests = [] + mutex = Mutex.new + + # Start master + master = create_worker( + 1, + build_id: '104', + master_setup_heartbeat_interval: 0.2 + ) + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Start follower + follower = create_worker( + 2, + build_id: '104', + master_setup_heartbeat_interval: 0.2 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Process tests from both workers + [master, follower].each do |worker| + @threads << Thread.new do + worker.poll do |test| + mutex.synchronize { processed_tests << test.id } + worker.acknowledge(test) + end + end + end + + # Wait for processing to complete + @threads.each { |t| t.join(10) } + + # All tests should be processed + assert_equal TEST_LIST.sort, processed_tests.sort + assert master.exhausted? + assert follower.exhausted? + end + + def test_takeover_with_immediate_queue_initialization + # Test that after takeover, the new master properly initializes the queue + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Set up dead master state + @redis.set('build:105:master-status', 'setup') + @redis.set('build:105:master-worker-id', 'dead_master') + old_time = CI::Queue.time_now.to_f - 60 + @redis.set('build:105:master-setup-heartbeat', old_time) + + # New worker takes over + new_master = create_worker( + 1, + build_id: '105', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + new_master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Verify queue is fully functional + assert new_master.master? + assert new_master.queue_initialized? + refute new_master.exhausted? + assert_equal TEST_LIST.size, new_master.size + end + + def test_no_takeover_when_master_finishes_quickly + # Test that no takeover happens when master finishes before timeout + tests = TEST_LIST.map { |id| MockTest.new(id) } + + # Master completes setup quickly + master = create_worker( + 1, + build_id: '106', + master_setup_heartbeat_interval: 0.1 + ) + master.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Give it a moment + sleep 0.1 + + # Follower joins - should not attempt takeover + follower = create_worker( + 2, + build_id: '106', + master_setup_heartbeat_timeout: 30, + master_setup_heartbeat_interval: 0.1 + ) + follower.populate(tests.map { |t| MockTest.new(t.id) }, random: Random.new(0)) + + # Master should still be worker 1 + assert master.master? + refute follower.master? + assert_equal '1', @redis.get('build:106:master-worker-id') + end + + private + + class MockTest + attr_reader :id + + def initialize(id) + @id = id + end + + def <=>(other) + id <=> other.id + end + + def flaky? + false + end + + def tests + [self] + end + end + + def create_worker(id, **options) + build_id = options.delete(:build_id) || '42' + config_options = { + build_id: build_id, + worker_id: id.to_s, + timeout: 1, + timing_redis_url: @redis_url, + master_setup_heartbeat_interval: options.delete(:master_setup_heartbeat_interval) || 5, + master_setup_heartbeat_timeout: options.delete(:master_setup_heartbeat_timeout) || 30 + }.merge(options) + + CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new(**config_options) + ) + end + end +end