Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9347dbd
Feature: Dynamic scheduled tasks
cupatea Apr 24, 2025
3a23087
Extract polling_interval to scheduler configuration
cupatea Jun 19, 2025
3a13962
Fix abstraction for RecurringSchedule and Process
cupatea Jun 19, 2025
d1c0efc
Add create and destroy recurring task helpers
cupatea Jun 19, 2025
77ff523
Update README with Recurring tasks info
cupatea Jul 12, 2025
f1028a0
Use task key instead of id
cupatea Aug 21, 2025
7dd67c8
Fix mismatches in readme
cupatea Aug 21, 2025
eef747f
Use correct asserts in tests
cupatea Feb 12, 2026
8e1be8b
Add missing platform to Gemfile.lock
cupatea Feb 12, 2026
5ba7384
Move some Scheduler::RecurringSchedule methods to private
cupatea Feb 12, 2026
dd67d58
Wrap reload in app executor
cupatea Feb 12, 2026
b90cce5
Fix empty? method in Scheduler::RecurringSchedule
cupatea Feb 12, 2026
6b81c95
Prevent stale change by adding clear_changes in scheduler's loop
cupatea Feb 12, 2026
99ba202
Add test for enqueuing the job
cupatea Feb 12, 2026
79fca8d
Improve create_recurring_task - use RecurringTask .from_configuration
cupatea Feb 12, 2026
7ca2eb7
Fix names for public methods
cupatea Feb 12, 2026
890dabe
Prevent stale configured_tasks value
cupatea Feb 12, 2026
abbf54e
Use wrap_in_app_executor for refresh_registered_process
cupatea Feb 12, 2026
4ec36d6
Update tests and README
cupatea Feb 12, 2026
e1c1601
Fix style to match project conventions
rosa Feb 24, 2026
2f16558
Make dynamic recurring tasks opt-in and rename public API
rosa Feb 25, 2026
f7e39b5
Read static from options in from_configuration and rename to dynamic_…
rosa Feb 25, 2026
b012535
Refactor scheduler loop and add missing tests
rosa Feb 25, 2026
3d36ca4
Simplify recurring schedule with respect to dynamic task reloading
rosa Feb 25, 2026
b4f7c99
Clean a bit top-level SolidQueue methods to manage dynamic tasks
rosa Feb 25, 2026
7ded710
Avoid extra queries for dynamic tasks for process metadata and procline
rosa Feb 25, 2026
3b490ad
Merge right value of `static` correctly
rosa Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
- [Fork vs. async mode](#fork-vs-async-mode)
- [Configuration](#configuration)
- [Optional scheduler configuration](#optional-scheduler-configuration)
- [Queue order and priorities](#queue-order-and-priorities)
- [Queues specification and performance](#queues-specification-and-performance)
- [Threads, processes, and signals](#threads-processes-and-signals)
Expand All @@ -31,6 +32,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Puma plugin](#puma-plugin)
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
- [Recurring tasks](#recurring-tasks)
- [Scheduling and unscheduling recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically)
- [Inspiration](#inspiration)
- [License](#license)

Expand Down Expand Up @@ -209,7 +211,7 @@ By default, Solid Queue will try to find your configuration under `config/queue.
bin/jobs -c config/calendar.yml
```

You can also skip all recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.
You can also skip the scheduler process by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`.

This is what this configuration looks like:

Expand All @@ -227,6 +229,10 @@ production:
threads: 5
polling_interval: 0.1
processes: 3
scheduler:
dynamic_tasks_enabled: true
polling_interval: 5

```

Everything is optional. If no configuration at all is provided, Solid Queue will run with one dispatcher and one worker with default settings. If you want to run only dispatchers or workers, you just need to include that section alone in the configuration. For example, with the following configuration:
Expand Down Expand Up @@ -271,6 +277,19 @@ It is recommended to set this value less than or equal to the queue database's c
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.


### Optional scheduler configuration

Optionally, you can configure the scheduler process under the `scheduler` section in your `config/queue.yml` if you'd like to [schedule recurring tasks dynamically](#scheduling-and-unscheduling-recurring-tasks-dynamically).

```yaml
scheduler:
dynamic_tasks_enabled: true
polling_interval: 5
```

- `dynamic_tasks_enabled`: whether the scheduler should poll for [dynamically scheduled recurring tasks](#scheduling-and-unscheduling-recurring-tasks-dynamically). This is `false` by default. When enabled, the scheduler will poll the database at the given `polling_interval` to pick up tasks scheduled via `SolidQueue.schedule_recurring_task`.
- `polling_interval`: how frequently (in seconds) the scheduler checks for dynamic task changes. Defaults to `5`.

### Queue order and priorities

As mentioned above, if you specify a list of queues for a worker, these will be polled in the order given, such as for the list `real_time,background`, no jobs will be taken from `background` unless there aren't any more jobs waiting in `real_time`.
Expand Down Expand Up @@ -732,6 +751,38 @@ my_periodic_resque_job:

and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.

### Scheduling and unscheduling recurring tasks dynamically

You can schedule and unschedule recurring tasks at runtime, without editing the configuration file. To enable this, you need to set `dynamic_tasks_enabled: true` in the `scheduler` section of your `config/queue.yml`, [as explained earlier](#optional-scheduler-configuration).

```yaml
scheduler:
dynamic_tasks_enabled: true
```

Then you can use the following methods to add recurring tasks dynamically:

```ruby
SolidQueue.schedule_recurring_task(
"my_dynamic_task",
class: "MyJob",
args: [1, 2],
schedule: "every 10 minutes"
)
```

This accepts the same options as the YAML configuration: `class`, `args`, `command`, `schedule`, `queue`, `priority`, and `description`.

To remove a dynamically scheduled task:

```ruby
SolidQueue.unschedule_recurring_task("my_dynamic_task")
```

Only dynamic tasks can be unscheduled at runtime. Attempting to unschedule a static task (defined in `config/recurring.yml`) will raise an `ActiveRecord::RecordNotFound` error.

Tasks scheduled like this persist between Solid Queue's restarts and won't stop running until you manually unschedule them.

## Inspiration

Solid Queue has been inspired by [resque](https://github.com/resque/resque) and [GoodJob](https://github.com/bensheldon/good_job). We recommend checking out these projects as they're great examples from which we've learnt a lot.
Expand Down
11 changes: 10 additions & 1 deletion app/models/solid_queue/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RecurringTask < Record
validate :ensure_existing_job_class

scope :static, -> { where(static: true) }
scope :dynamic, -> { where(static: false) }

has_many :recurring_executions, foreign_key: :task_key, primary_key: :key

Expand All @@ -32,7 +33,15 @@ def from_configuration(key, **options)
queue_name: options[:queue].presence,
priority: options[:priority].presence,
description: options[:description],
static: true
static: options.fetch(:static, true)
end

def create_dynamic_task(key, **options)
from_configuration(key, **options.merge(static: false)).save!
end

def delete_dynamic_task(key)
RecurringTask.dynamic.find_by!(key: key).destroy
end

def create_or_update_all(tasks)
Expand Down
8 changes: 8 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ module SolidQueue

delegate :on_start, :on_stop, :on_exit, to: Supervisor

def schedule_recurring_task(key, **options)
RecurringTask.create_dynamic_task(key, **options)
end

def unschedule_recurring_task(key)
RecurringTask.delete_dynamic_task(key)
end

[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start(&block)
Expand Down
32 changes: 25 additions & 7 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ def instantiate
concurrency_maintenance_interval: 600
}

SCHEDULER_DEFAULTS = {
polling_interval: 5,
dynamic_tasks_enabled: false
}

DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"

Expand Down Expand Up @@ -137,8 +142,10 @@ def dispatchers
end

def schedulers
if !skip_recurring_tasks? && recurring_tasks.any?
[ Process.new(:scheduler, recurring_tasks: recurring_tasks) ]
return [] if skip_recurring_tasks?

if recurring_tasks.any? || dynamic_recurring_tasks_enabled?
[ Process.new(:scheduler, { recurring_tasks: recurring_tasks, **scheduler_options.with_defaults(SCHEDULER_DEFAULTS) }) ]
else
[]
end
Expand All @@ -154,17 +161,29 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def scheduler_options
@scheduler_options ||= processes_config.fetch(:scheduler, {}).dup.symbolize_keys
end

def dynamic_recurring_tasks_enabled?
scheduler_options.fetch(:dynamic_tasks_enabled, SCHEDULER_DEFAULTS[:dynamic_tasks_enabled])
end

def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
RecurringTask.from_configuration(id, **options.merge(static: true)) if options&.has_key?(:schedule)
end.compact
end

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
options.slice(:workers, :dispatchers, :scheduler).presence || options[:config_file],
keys: [ :workers, :dispatchers, :scheduler ],
fallback: {
workers: [ WORKER_DEFAULTS ],
dispatchers: [ DISPATCHER_DEFAULTS ],
scheduler: SCHEDULER_DEFAULTS
}
end

def recurring_tasks_config
Expand All @@ -173,7 +192,6 @@ def recurring_tasks_config
end
end


def config_from(file_or_hash, keys: [], fallback: {}, env: Rails.env)
load_config_from(file_or_hash).then do |config|
config = config[env.to_sym] ? config[env.to_sym] : config
Expand Down
4 changes: 4 additions & 0 deletions lib/solid_queue/processes/registrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,9 @@ def heartbeat
self.process = nil
wake_up
end

def reload_metadata
wrap_in_app_executor { process&.update(metadata: metadata.compact) }
end
end
end
27 changes: 23 additions & 4 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class Scheduler < Processes::Base
include Processes::Runnable
include LifecycleHooks

attr_reader :recurring_schedule
attr_reader :recurring_schedule, :polling_interval

after_boot :run_start_hooks
after_boot :schedule_recurring_tasks
Expand All @@ -14,7 +14,10 @@ class Scheduler < Processes::Base
after_shutdown :run_exit_hooks

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
@dynamic_tasks_enabled = options[:dynamic_tasks_enabled]
@polling_interval = options[:polling_interval]
@recurring_schedule = RecurringSchedule.new(recurring_tasks, dynamic_tasks_enabled: @dynamic_tasks_enabled)

super(**options)
end
Expand All @@ -24,13 +27,16 @@ def metadata
end

private
SLEEP_INTERVAL = 60 # Right now it doesn't matter, can be set to 1 in the future for dynamic tasks

STATIC_SLEEP_INTERVAL = 60

def run
loop do
break if shutting_down?

interruptible_sleep(SLEEP_INTERVAL)
reload_dynamic_schedule if dynamic_tasks_enabled?

interruptible_sleep(sleep_interval)
end
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
Expand All @@ -46,10 +52,23 @@ def unschedule_recurring_tasks
recurring_schedule.unschedule_tasks
end

def reload_dynamic_schedule
recurring_schedule.reschedule_dynamic_tasks
reload_metadata
end

def dynamic_tasks_enabled?
@dynamic_tasks_enabled
end

def all_work_completed?
recurring_schedule.empty?
end

def sleep_interval
dynamic_tasks_enabled? ? polling_interval : STATIC_SLEEP_INTERVAL
end

def set_procline
procline "scheduling #{recurring_schedule.task_keys.join(",")}"
end
Expand Down
72 changes: 61 additions & 11 deletions lib/solid_queue/scheduler/recurring_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@ module SolidQueue
class Scheduler::RecurringSchedule
include AppExecutor

attr_reader :configured_tasks, :scheduled_tasks
attr_reader :scheduled_tasks

def initialize(static_tasks, dynamic_tasks_enabled: false)
@static_tasks = Array(static_tasks).map { |task| RecurringTask.wrap(task) }.select(&:valid?)
@dynamic_tasks_enabled = dynamic_tasks_enabled

def initialize(tasks)
@configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?)
@scheduled_tasks = Concurrent::Hash.new
end

def configured_tasks
static_tasks + dynamic_tasks
end

def empty?
configured_tasks.empty?
scheduled_tasks.empty? && dynamic_tasks.empty?
end

def schedule_tasks
wrap_in_app_executor do
persist_tasks
reload_tasks
persist_static_tasks
reload_static_tasks
reload_dynamic_tasks
end

configured_tasks.each do |task|
Expand All @@ -39,14 +46,57 @@ def task_keys
configured_tasks.map(&:key)
end

def reschedule_dynamic_tasks
wrap_in_app_executor do
reload_dynamic_tasks
schedule_created_dynamic_tasks
unschedule_deleted_dynamic_tasks
end
end

private
def persist_tasks
SolidQueue::RecurringTask.static.where.not(key: task_keys).delete_all
SolidQueue::RecurringTask.create_or_update_all configured_tasks
attr_reader :static_tasks

def static_task_keys
static_tasks.map(&:key)
end

def dynamic_tasks
@dynamic_tasks ||= load_dynamic_tasks
end

def dynamic_tasks_enabled?
@dynamic_tasks_enabled
end

def schedule_created_dynamic_tasks
RecurringTask.dynamic.where.not(key: scheduled_tasks.keys).each do |task|
schedule_task(task)
end
end

def unschedule_deleted_dynamic_tasks
(scheduled_tasks.keys - RecurringTask.pluck(:key)).each do |key|
scheduled_tasks[key].cancel
scheduled_tasks.delete(key)
end
end

def persist_static_tasks
RecurringTask.static.where.not(key: static_task_keys).delete_all
RecurringTask.create_or_update_all static_tasks
end

def reload_static_tasks
@static_tasks = RecurringTask.static.where(key: static_task_keys).to_a
end

def reload_dynamic_tasks
@dynamic_tasks = load_dynamic_tasks
end

def reload_tasks
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a
def load_dynamic_tasks
dynamic_tasks_enabled? ? RecurringTask.dynamic.to_a : []
end

def schedule(task)
Expand Down
Loading