diff --git a/README.md b/README.md index b49f9c60..f4f28ba7 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite, - [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler) - [Fork vs. async mode](#fork-vs-async-mode) - [Configuration](#configuration) + - [Optional scheduler configuration](#optional-scheduler-configuration) - [Queue order and priorities](#queue-order-and-priorities) - [Queues specification and performance](#queues-specification-and-performance) - [Threads, processes, and signals](#threads-processes-and-signals) @@ -31,6 +32,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite, - [Puma plugin](#puma-plugin) - [Jobs and transactional integrity](#jobs-and-transactional-integrity) - [Recurring tasks](#recurring-tasks) + - [Scheduling and unscheduling recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically) - [Inspiration](#inspiration) - [License](#license) @@ -209,7 +211,7 @@ By default, Solid Queue will try to find your configuration under `config/queue. bin/jobs -c config/calendar.yml ``` -You can also skip all recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`. +You can also skip the scheduler process by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`. This is what this configuration looks like: @@ -227,6 +229,10 @@ production: threads: 5 polling_interval: 0.1 processes: 3 + scheduler: + dynamic_tasks_enabled: true + polling_interval: 5 + ``` Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration: @@ -271,6 +277,19 @@ It is recommended to set this value less than or equal to the queue database's c - `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else. +### Optional scheduler configuration + +Optionally, you can configure the scheduler process under the `scheduler` section in your `config/queue.yml` if you'd like to [schedule recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically). + +```yaml +scheduler: + dynamic_tasks_enabled: true + polling_interval: 5 +``` + +- `dynamic_tasks_enabled`: whether the scheduler should poll for [dynamically scheduled recurring tasks](#scheduling-and-unscheduling-recurring-tasks-dynamically). This is `false` by default. When enabled, the scheduler will poll the database at the given `polling_interval` to pick up tasks scheduled via `SolidQueue.schedule_recurring_task`. +- `polling_interval`: how frequently (in seconds) the scheduler checks for dynamic task changes. Defaults to `5`. + ### Queue order and priorities As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`. @@ -732,6 +751,38 @@ my_periodic_resque_job: and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time. +### Scheduling and unscheduling recurring tasks dynamically + +You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. To enable this, you need to set `dynamic_tasks_enabled: true` in the `scheduler` section of your `config/queue.yml`, [as explained earlier](#optional-scheduler-configuration). + +```yaml +scheduler: + dynamic_tasks_enabled: true +``` + +Then you can use the following methods to add recurring tasks dynamically: + +```ruby +SolidQueue.schedule_recurring_task( + "my_dynamic_task", + class: "MyJob", + args: [1, 2], + schedule: "every 10 minutes" +) +``` + +This accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`. + +To remove a dynamically scheduled task: + +```ruby +SolidQueue.unschedule_recurring_task("my_dynamic_task") +``` + +Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error. + +Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them. + ## Inspiration Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot. diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index d248f992..9bb634e6 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -11,6 +11,7 @@ class RecurringTask < Record validate :ensure_existing_job_class scope :static, -> { where(static: true) } + scope :dynamic, -> { where(static: false) } has_many :recurring_executions, foreign_key: :task_key, primary_key: :key @@ -32,7 +33,15 @@ def from_configuration(key, **options) queue_name: options[:queue].presence, priority: options[:priority].presence, description: options[:description], - static: true + static: options.fetch(:static, true) + end + + def create_dynamic_task(key, **options) + from_configuration(key, **options.merge(static: false)).save! + end + + def delete_dynamic_task(key) + RecurringTask.dynamic.find_by!(key: key).destroy end def create_or_update_all(tasks) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..bd2269e5 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -43,6 +43,14 @@ module SolidQueue delegate :on_start, :on_stop, :on_exit, to: Supervisor + def schedule_recurring_task(key, **options) + RecurringTask.create_dynamic_task(key, **options) + end + + def unschedule_recurring_task(key) + RecurringTask.delete_dynamic_task(key) + end + [ Dispatcher, Scheduler, Worker ].each do |process| define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| process.on_start(&block) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 94169ca7..e63a000c 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -28,6 +28,11 @@ def instantiate concurrency_maintenance_interval: 600 } + SCHEDULER_DEFAULTS = { + polling_interval: 5, + dynamic_tasks_enabled: false + } + DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" @@ -137,8 +142,10 @@ def dispatchers end def schedulers - if !skip_recurring_tasks? && recurring_tasks.any? - [ Process.new(:scheduler, recurring_tasks: recurring_tasks) ] + return [] if skip_recurring_tasks? + + if recurring_tasks.any? || dynamic_recurring_tasks_enabled? + [ Process.new(:scheduler, { recurring_tasks: recurring_tasks, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ] else [] end @@ -154,17 +161,29 @@ def dispatchers_options .map { |options| options.dup.symbolize_keys } end + def scheduler_options + @scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys + end + + def dynamic_recurring_tasks_enabled? + scheduler_options.fetch(:dynamic_tasks_enabled, SCHEDULER_DEFAULTS[:dynamic_tasks_enabled]) + end + def recurring_tasks @recurring_tasks ||= recurring_tasks_config.map do |id, options| - RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule) + RecurringTask.from_configuration(id, **options.merge(static: true)) if options&.has_key?(:schedule) end.compact end def processes_config @processes_config ||= config_from \ - options.slice(:workers, :dispatchers).presence || options[:config_file], - keys: [ :workers, :dispatchers ], - fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] } + options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file], + keys: [ :workers, :dispatchers, :scheduler ], + fallback: { + workers: [ WORKER_DEFAULTS ], + dispatchers: [ DISPATCHER_DEFAULTS ], + scheduler: SCHEDULER_DEFAULTS + } end def recurring_tasks_config @@ -173,7 +192,6 @@ def recurring_tasks_config end end - def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env) load_config_from(file_or_hash).then do |config| config = config[env.to_sym] ? config[env.to_sym] : config diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index cd7769da..35b4e01b 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -59,5 +59,9 @@ def heartbeat self.process = nil wake_up end + + def reload_metadata + wrap_in_app_executor { process&.update(metadata: metadata.compact) } + end end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 3cec90fa..6022b338 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -5,7 +5,7 @@ class Scheduler < Processes::Base include Processes::Runnable include LifecycleHooks - attr_reader :recurring_schedule + attr_reader :recurring_schedule, :polling_interval after_boot :run_start_hooks after_boot :schedule_recurring_tasks @@ -14,7 +14,10 @@ class Scheduler < Processes::Base after_shutdown :run_exit_hooks def initialize(recurring_tasks:, **options) - @recurring_schedule = RecurringSchedule.new(recurring_tasks) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + @dynamic_tasks_enabled = options[:dynamic_tasks_enabled] + @polling_interval = options[:polling_interval] + @recurring_schedule = RecurringSchedule.new(recurring_tasks, dynamic_tasks_enabled: @dynamic_tasks_enabled) super(**options) end @@ -24,13 +27,16 @@ def metadata end private - SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks + + STATIC_SLEEP_INTERVAL = 60 def run loop do break if shutting_down? - interruptible_sleep(SLEEP_INTERVAL) + reload_dynamic_schedule if dynamic_tasks_enabled? + + interruptible_sleep(sleep_interval) end ensure SolidQueue.instrument(:shutdown_process, process: self) do @@ -46,10 +52,23 @@ def unschedule_recurring_tasks recurring_schedule.unschedule_tasks end + def reload_dynamic_schedule + recurring_schedule.reschedule_dynamic_tasks + reload_metadata + end + + def dynamic_tasks_enabled? + @dynamic_tasks_enabled + end + def all_work_completed? recurring_schedule.empty? end + def sleep_interval + dynamic_tasks_enabled? ? polling_interval : STATIC_SLEEP_INTERVAL + end + def set_procline procline "scheduling #{recurring_schedule.task_keys.join(",")}" end diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index b765edf1..a1e2409e 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -4,21 +4,28 @@ module SolidQueue class Scheduler::RecurringSchedule include AppExecutor - attr_reader :configured_tasks, :scheduled_tasks + attr_reader :scheduled_tasks + + def initialize(static_tasks, dynamic_tasks_enabled: false) + @static_tasks = Array(static_tasks).map { |task| RecurringTask.wrap(task) }.select(&:valid?) + @dynamic_tasks_enabled = dynamic_tasks_enabled - def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) @scheduled_tasks = Concurrent::Hash.new end + def configured_tasks + static_tasks + dynamic_tasks + end + def empty? - configured_tasks.empty? + scheduled_tasks.empty? && dynamic_tasks.empty? end def schedule_tasks wrap_in_app_executor do - persist_tasks - reload_tasks + persist_static_tasks + reload_static_tasks + reload_dynamic_tasks end configured_tasks.each do |task| @@ -39,14 +46,57 @@ def task_keys configured_tasks.map(&:key) end + def reschedule_dynamic_tasks + wrap_in_app_executor do + reload_dynamic_tasks + schedule_created_dynamic_tasks + unschedule_deleted_dynamic_tasks + end + end + private - def persist_tasks - SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all - SolidQueue::RecurringTask.create_or_update_all configured_tasks + attr_reader :static_tasks + + def static_task_keys + static_tasks.map(&:key) + end + + def dynamic_tasks + @dynamic_tasks ||= load_dynamic_tasks + end + + def dynamic_tasks_enabled? + @dynamic_tasks_enabled + end + + def schedule_created_dynamic_tasks + RecurringTask.dynamic.where.not(key: scheduled_tasks.keys).each do |task| + schedule_task(task) + end + end + + def unschedule_deleted_dynamic_tasks + (scheduled_tasks.keys - RecurringTask.pluck(:key)).each do |key| + scheduled_tasks[key].cancel + scheduled_tasks.delete(key) + end + end + + def persist_static_tasks + RecurringTask.static.where.not(key: static_task_keys).delete_all + RecurringTask.create_or_update_all static_tasks + end + + def reload_static_tasks + @static_tasks = RecurringTask.static.where(key: static_task_keys).to_a + end + + def reload_dynamic_tasks + @dynamic_tasks = load_dynamic_tasks end - def reload_tasks - @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a + def load_dynamic_tasks + dynamic_tasks_enabled? ? RecurringTask.dynamic.to_a : [] end def schedule(task) diff --git a/test/models/solid_queue/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb index 52312f12..dba9d6b9 100644 --- a/test/models/solid_queue/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -210,6 +210,56 @@ def perform assert_equal "SolidQueue::RecurringJob", SolidQueue::Job.last.class_name end + test "schedule recurring tasks dynamically" do + SolidQueue::RecurringTask.create_dynamic_task("test 1", command: "puts 1", schedule: "every hour") + SolidQueue::RecurringTask.create_dynamic_task("test 2", command: "puts 2", schedule: "every minute", static: true) + + assert SolidQueue::RecurringTask.exists?(key: "test 1", command: "puts 1", schedule: "every hour", static: false) + assert SolidQueue::RecurringTask.exists?(key: "test 2", command: "puts 2", schedule: "every minute", static: false) + end + + test "schedule recurring tasks dynamically with class and args (same keys as YAML config)" do + SolidQueue::RecurringTask.create_dynamic_task("test 3", class: "AddToBufferJob", args: [ 42 ], schedule: "every hour") + + task = SolidQueue::RecurringTask.find_by!(key: "test 3") + assert_equal "AddToBufferJob", task.class_name + assert_equal [ 42 ], task.arguments + assert_not task.static + end + + test "unschedule recurring tasks dynamically" do + dynamic_task = SolidQueue::RecurringTask.create!( + key: "dynamic", command: "puts 'd'", schedule: "every day", static: false + ) + + static_task = SolidQueue::RecurringTask.create!( + key: "static", command: "puts 's'", schedule: "every week", static: true + ) + + SolidQueue::RecurringTask.delete_dynamic_task(dynamic_task.key) + + assert_raises(ActiveRecord::RecordNotFound) do + SolidQueue::RecurringTask.delete_dynamic_task(static_task.key) + end + + assert_not SolidQueue::RecurringTask.exists?(key: "dynamic", static: false) + assert SolidQueue::RecurringTask.exists?(key: "static", static: true) + end + + test "scheduling dynamic recurring task with duplicate key raises error" do + SolidQueue::RecurringTask.create_dynamic_task("duplicate_test", command: "puts 1", schedule: "every hour") + + assert_raises(ActiveRecord::RecordNotUnique) do + SolidQueue::RecurringTask.create_dynamic_task("duplicate_test", command: "puts 2", schedule: "every minute") + end + end + + test "unscheduling dynamic recurring task with nonexistent key raises RecordNotFound" do + assert_raises(ActiveRecord::RecordNotFound) do + SolidQueue::RecurringTask.delete_dynamic_task("nonexistent_key") + end + end + private def enqueue_and_assert_performed_with_result(task, result) assert_difference [ -> { SolidQueue::Job.count }, -> { SolidQueue::ReadyExecution.count } ], +1 do diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 11c2a5ff..34f69658 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -87,6 +87,24 @@ class ConfigurationTest < ActiveSupport::TestCase assert_has_recurring_task scheduler, key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every second" end + test "scheduler starts with dynamic_tasks_enabled even without static tasks" do + configuration = SolidQueue::Configuration.new( + recurring_schedule_file: config_file_path(:empty_configuration), + scheduler: { dynamic_tasks_enabled: true } + ) + + assert_processes configuration, :scheduler, 1, dynamic_tasks_enabled: true + end + + test "no scheduler without static tasks or dynamic_tasks_enabled" do + configuration = SolidQueue::Configuration.new( + recurring_schedule_file: config_file_path(:empty_configuration), + scheduler: { dynamic_tasks_enabled: false } + ) + + assert_processes configuration, :scheduler, 0 + end + test "no recurring tasks configuration when explicitly excluded" do configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ], skip_recurring: true) assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index 3e838c50..e914a23c 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -3,7 +3,7 @@ class SchedulerTest < ActiveSupport::TestCase self.use_transactional_tests = false - test "recurring schedule" do + test "recurring schedule (only static)" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) @@ -17,6 +17,57 @@ class SchedulerTest < ActiveSupport::TestCase scheduler.stop end + test "recurring schedule (only dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "dynamic_task" ] + ensure + scheduler.stop + end + + test "recurring schedule (static + dynamic)" do + SolidQueue::RecurringTask.create( + key: "dynamic_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + recurring_tasks = { static_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks, dynamic_tasks_enabled: true).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_equal "Scheduler", process.kind + + assert_metadata process, recurring_schedule: [ "static_task", "dynamic_task" ] + ensure + scheduler.stop + end + + test "dynamic tasks in DB are ignored when dynamic_tasks_enabled is false" do + SolidQueue::RecurringTask.create!( + key: "ignored_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + recurring_tasks = { static_task: { class: "AddToBufferJob", schedule: "every hour", args: 42 } } + scheduler = SolidQueue::Scheduler.new(recurring_tasks: recurring_tasks).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + process = SolidQueue::Process.first + assert_metadata process, recurring_schedule: [ "static_task" ] + ensure + scheduler.stop + end + test "run more than one instance of the scheduler with recurring tasks" do recurring_tasks = { example_task: { class: "AddToBufferJob", schedule: "every second", args: 42 } } schedulers = 2.times.collect do @@ -37,4 +88,72 @@ class SchedulerTest < ActiveSupport::TestCase end end end + + test "dynamic task actually enqueues jobs" do + SolidQueue::RecurringTask.create!( + key: "dynamic_enqueue_task", static: false, class_name: "AddToBufferJob", schedule: "every second", arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + wait_while_with_timeout(3.seconds) { SolidQueue::Job.count < 1 } + + skip_active_record_query_cache do + assert SolidQueue::Job.count >= 1, "Expected at least one job to be enqueued by the dynamic task" + assert_equal SolidQueue::Job.count, SolidQueue::RecurringExecution.count + end + ensure + scheduler&.stop + end + + test "updates metadata after adding dynamic task post-start" do + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + process = SolidQueue::Process.first + assert_empty process.metadata + + SolidQueue::RecurringTask.create!( + key: "new_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + wait_while_with_timeout(3.seconds) { process.reload.metadata.empty? } + assert_metadata process, recurring_schedule: [ "new_dynamic_task" ] + end + ensure + scheduler&.stop + end + + test "updates metadata after removing dynamic task post-start" do + old_dynamic_task = SolidQueue::RecurringTask.create!( + key: "old_dynamic_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every second", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + process = SolidQueue::Process.first + assert_metadata process, recurring_schedule: [ "old_dynamic_task" ] + + old_dynamic_task.destroy + + wait_while_with_timeout(3.seconds) { process.reload.metadata.present? } + assert_empty process.metadata + end + ensure + scheduler&.stop + end end