Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `Config.ReindexerIndexNames` and `ReindexerIndexNamesDefault()` so the reindexer's target indexes can be customized from the public API. [PR #1194](https://github.com/riverqueue/river/pull/1194).

### Fixed

- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
Expand Down
51 changes: 42 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ const (
QueueNumWorkersMax = 10_000
)

var postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
var (
postgresSchemaNameRE = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)

reindexerIndexNamesDefault = []string{ //nolint:gochecknoglobals
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}
)

// TestConfig contains configuration specific to test environments.
type TestConfig struct {
Expand Down Expand Up @@ -272,6 +284,11 @@ type Config struct {
// reindexer will run at midnight UTC every day.
ReindexerSchedule PeriodicSchedule

// ReindexerIndexNames customizes which indexes River periodically reindexes.
// If nil, River uses [ReindexerIndexNamesDefault]. If non-nil, the provided
// slice is used as the exact list.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

ReindexerIndexNames []string

// ReindexerTimeout is the amount of time to wait for the reindexer to run a
// single reindex operation before cancelling it via context. Set to -1 to
// disable the timeout.
Expand Down Expand Up @@ -374,12 +391,26 @@ type Config struct {
schedulerInterval time.Duration
}

// ReindexerIndexNamesDefault returns the default set of indexes reindexed by River.
func ReindexerIndexNamesDefault() []string {
indexNames := make([]string, len(reindexerIndexNamesDefault))
copy(indexNames, reindexerIndexNamesDefault)

return indexNames
}

// WithDefaults returns a copy of the Config with all default values applied.
func (c *Config) WithDefaults() *Config {
if c == nil {
c = &Config{}
}

reindexerIndexNames := ReindexerIndexNamesDefault()
if c.ReindexerIndexNames != nil {
reindexerIndexNames = make([]string, len(c.ReindexerIndexNames))
copy(reindexerIndexNames, c.ReindexerIndexNames)
}

// Use the existing logger if set, otherwise create a default one.
logger := c.Logger
if logger == nil {
Expand Down Expand Up @@ -420,6 +451,7 @@ func (c *Config) WithDefaults() *Config {
PeriodicJobs: c.PeriodicJobs,
PollOnly: c.PollOnly,
Queues: c.Queues,
ReindexerIndexNames: reindexerIndexNames,
ReindexerSchedule: c.ReindexerSchedule,
ReindexerTimeout: cmp.Or(c.ReindexerTimeout, maintenance.ReindexerTimeoutDefault),
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
Expand Down Expand Up @@ -603,14 +635,14 @@ type Client[TTx any] struct {
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
pilot riverpilot.Pilot
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queueMaintainerLeader *maintenance.QueueMaintainerLeader
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queueMaintainerLeader *maintenance.QueueMaintainerLeader
queues *QueueBundle
services []startstop.Service
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -936,6 +968,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
}

reindexer := maintenance.NewReindexer(archetype, &maintenance.ReindexerConfig{
IndexNames: config.ReindexerIndexNames,
ScheduleFunc: scheduleFunc,
Schema: config.Schema,
Timeout: config.ReindexerTimeout,
Expand Down
96 changes: 89 additions & 7 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7185,6 +7185,39 @@ func Test_Client_Start_Error(t *testing.T) {
})
}

func Test_Config_WithDefaults(t *testing.T) {
t.Parallel()

t.Run("ReindexerIndexNamesEmptyStaysNonNil", func(t *testing.T) {
t.Parallel()

config := (&Config{ReindexerIndexNames: []string{}}).WithDefaults()

require.NotNil(t, config.ReindexerIndexNames)
require.Empty(t, config.ReindexerIndexNames)
})

t.Run("ReindexerIndexNamesNilGetsDefaults", func(t *testing.T) {
t.Parallel()

config := (&Config{}).WithDefaults()

require.Equal(t, ReindexerIndexNamesDefault(), config.ReindexerIndexNames)
})

t.Run("ReindexerIndexNamesSliceIsCopied", func(t *testing.T) {
t.Parallel()

input := []string{"custom_index", "other_index"}
config := (&Config{ReindexerIndexNames: input}).WithDefaults()

require.Equal(t, input, config.ReindexerIndexNames)

input[0] = "mutated"
require.Equal(t, []string{"custom_index", "other_index"}, config.ReindexerIndexNames)
})
}

func Test_NewClient_BaseServiceName(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -7279,13 +7312,8 @@ func Test_NewClient_Defaults(t *testing.T) {
require.False(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.Contains(t, reindexer.Config.IndexNames, "river_job_args_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_kind")
require.Contains(t, reindexer.Config.IndexNames, "river_job_metadata_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_pkey")
require.Contains(t, reindexer.Config.IndexNames, "river_job_prioritized_fetching_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_state_and_finalized_at_index")
require.Contains(t, reindexer.Config.IndexNames, "river_job_unique_idx")
require.Equal(t, ReindexerIndexNamesDefault(), client.config.ReindexerIndexNames)
require.Equal(t, ReindexerIndexNamesDefault(), reindexer.Config.IndexNames)
now := time.Now().UTC()
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))
Expand Down Expand Up @@ -7349,6 +7377,7 @@ func Test_NewClient_Overrides(t *testing.T) {
Logger: logger,
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerIndexNames: []string{"custom_index", "other_index"},
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
ReindexerTimeout: 125 * time.Millisecond,
RetryPolicy: retryPolicy,
Expand All @@ -7373,6 +7402,8 @@ func Test_NewClient_Overrides(t *testing.T) {
require.True(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{"custom_index", "other_index"}, reindexer.Config.IndexNames)
now := time.Now().UTC()
require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now))

Expand All @@ -7391,6 +7422,37 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Len(t, client.config.WorkerMiddleware, 1)
}

func Test_NewClient_ReindexerIndexNamesExplicitEmptyOverride(t *testing.T) {
t.Parallel()

ctx := context.Background()

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
)

workers := NewWorkers()
AddWorker(workers, &noOpWorker{})

client, err := NewClient(driver, &Config{
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerIndexNames: []string{},
Schema: schema,
TestOnly: true,
Workers: workers,
})
require.NoError(t, err)

require.NotNil(t, client.config.ReindexerIndexNames)
require.Empty(t, client.config.ReindexerIndexNames)

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.NotNil(t, reindexer.Config.IndexNames)
require.Empty(t, reindexer.Config.IndexNames)
}

func Test_NewClient_MissingParameters(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -7817,6 +7879,26 @@ func Test_NewClient_Validations(t *testing.T) {
}
}

func TestReindexerIndexNamesDefault(t *testing.T) {
t.Parallel()

indexNames := ReindexerIndexNamesDefault()

// Assert the exact list so index list changes require explicit test updates.
require.Equal(t, []string{
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}, indexNames)

indexNames[0] = "mutated"
require.Equal(t, "river_job_args_index", ReindexerIndexNamesDefault()[0])
}

type timeoutTestArgs struct {
TimeoutValue time.Duration `json:"timeout_value"`
}
Expand Down
2 changes: 1 addition & 1 deletion insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
// what to do about the job that can't be scheduled. We can't send feedback to
// the caller at this point, so probably the best we could do is leave it in
// this untransitionable state until the `running` job finished, which isn't
// particularly satsifactory.
// particularly satisfactory.
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
rivertype.JobStateAvailable,
rivertype.JobStatePending,
Expand Down
78 changes: 57 additions & 21 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ const (
ReindexerTimeoutDefault = 1 * time.Minute
)

var defaultIndexNames = []string{ //nolint:gochecknoglobals
"river_job_args_index",
"river_job_kind",
"river_job_metadata_index",
"river_job_pkey",
"river_job_prioritized_fetching_index",
"river_job_state_and_finalized_at_index",
"river_job_unique_idx",
}

// ReindexerTestSignals are internal signals used exclusively in tests.
type ReindexerTestSignals struct {
Reindexed testsignal.TestSignal[struct{}] // notifies when a run finishes executing reindexes for all indexes
Expand All @@ -49,7 +39,8 @@ func (ts *ReindexerTestSignals) Init(tb testutil.TestingTB) {
}

type ReindexerConfig struct {
// IndexNames is a list of indexes to reindex on each run.
// IndexNames is the exact list of indexes to reindex on each run. It must
// be non-nil. An empty slice disables reindex work.
IndexNames []string

// ScheduleFunc returns the next scheduled run time for the reindexer given the
Expand All @@ -66,6 +57,9 @@ type ReindexerConfig struct {
}

func (c *ReindexerConfig) mustValidate() *ReindexerConfig {
if c.IndexNames == nil {
panic("ReindexerConfig.IndexNames must be set")
}
if c.ScheduleFunc == nil {
panic("ReindexerConfig.ScheduleFunc must be set")
}
Expand All @@ -91,11 +85,13 @@ type Reindexer struct {
}

func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exec riverdriver.Executor) *Reindexer {
indexNames := defaultIndexNames
if config.IndexNames != nil {
indexNames = config.IndexNames
if config.IndexNames == nil {
panic("ReindexerConfig.IndexNames must be set")
}

indexNames := make([]string, len(config.IndexNames))
copy(indexNames, config.IndexNames)

scheduleFunc := config.ScheduleFunc
if scheduleFunc == nil {
scheduleFunc = (&DefaultReindexerSchedule{}).Next
Expand Down Expand Up @@ -133,11 +129,27 @@ func (s *Reindexer) Start(ctx context.Context) error {
s.Logger.DebugContext(ctx, s.Name+": Scheduling first run", slog.Time("next_run_at", nextRunAt))

timerUntilNextRun := time.NewTimer(time.Until(nextRunAt))
scheduleNextRun := func() {
// Advance from the previous scheduled time, not "now", so retries
// stay aligned with the configured cadence and don't immediately
// refire after a timer that has already elapsed.
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
timerUntilNextRun.Reset(time.Until(nextRunAt))
}

for {
select {
case <-timerUntilNextRun.C:
for _, indexName := range s.Config.IndexNames {
reindexableIndexNames, err := s.reindexableIndexNames(ctx)
if err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error listing reindexable indexes", slog.String("error", err.Error()))
}
scheduleNextRun()
continue
}

for _, indexName := range reindexableIndexNames {
if _, err := s.reindexOne(ctx, indexName); err != nil {
if !errors.Is(err, context.Canceled) {
s.Logger.ErrorContext(ctx, s.Name+": Error reindexing", slog.String("error", err.Error()), slog.String("index_name", indexName))
Expand All @@ -151,15 +163,11 @@ func (s *Reindexer) Start(ctx context.Context) error {
// On each run, we calculate the new schedule based on the
// previous run's start time. This ensures that we don't
// accidentally skip a run as time elapses during the run.
nextRunAt = s.Config.ScheduleFunc(nextRunAt)
scheduleNextRun()

// TODO: maybe we should log differently if some of these fail?
s.Logger.DebugContext(ctx, s.Name+riversharedmaintenance.LogPrefixRanSuccessfully,
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(s.Config.IndexNames)))

// Reset the timer after the insert loop has finished so it's
// paused during work. Makes its firing more deterministic.
timerUntilNextRun.Reset(time.Until(nextRunAt))
slog.Time("next_run_at", nextRunAt), slog.Int("num_reindexes_initiated", len(reindexableIndexNames)))

case <-ctx.Done():
// Clean up timer resources. We know it has _not_ received from
Expand All @@ -176,6 +184,34 @@ func (s *Reindexer) Start(ctx context.Context) error {
return nil
}

func (s *Reindexer) reindexableIndexNames(ctx context.Context) ([]string, error) {
indexesExist, err := s.exec.IndexesExist(ctx, &riverdriver.IndexesExistParams{
IndexNames: s.Config.IndexNames,
Schema: s.Config.Schema,
})
if err != nil {
return nil, err
}

indexNames := make([]string, 0, len(s.Config.IndexNames))
missingIndexNames := make([]string, 0)
for _, indexName := range s.Config.IndexNames {
if indexesExist[indexName] {
indexNames = append(indexNames, indexName)
continue
}

missingIndexNames = append(missingIndexNames, indexName)
}

if len(missingIndexNames) > 0 {
s.Logger.WarnContext(ctx, s.Name+": Configured reindex indexes do not exist; run migrations or update ReindexerIndexNames",
slog.Any("index_names", missingIndexNames))
}

return indexNames, nil
}

func (s *Reindexer) reindexOne(ctx context.Context, indexName string) (bool, error) {
var cancel func()
if s.Config.Timeout > -1 {
Expand Down
Loading
Loading