Skip to content

Commit f7943ec

Browse files
committed
Add UnknownConfigure function for durable periodic jobs
Here, add an `UnknownConfigure` function to the pilot for use with rescuing and reconfiguring unknown durable periodic jobs. Full explanation in the counterpart pull request.
1 parent 92cdd61 commit f7943ec

16 files changed

Lines changed: 361 additions & 290 deletions

client.go

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"log/slog"
1010
"os"
1111
"regexp"
12+
"slices"
1213
"strings"
1314
"sync"
1415
"time"
@@ -900,11 +901,29 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
900901
}
901902

902903
{
904+
905+
unknownConfigureFunc := client.pilot.PeriodicJobUnknownConfigure()
906+
903907
periodicJobEnqueuer, err := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
904908
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
905909
Insert: client.insertMany,
906910
Pilot: client.pilot,
907911
Schema: config.Schema,
912+
UnknownConfigure: func(job *riverpilot.PeriodicJob) *maintenance.UnknownConfigureResult {
913+
if unknownConfigureFunc == nil {
914+
return nil
915+
}
916+
917+
unknownConfigureRes := unknownConfigureFunc(job)
918+
919+
return &maintenance.UnknownConfigureResult{
920+
JobConstructor: func() (*rivertype.JobInsertParams, error) {
921+
jobArgs, insertOpts := unknownConfigureRes.JobConstructor()
922+
return insertParamsFromConfigArgsAndOptions(archetype, config, jobArgs, insertOpts)
923+
},
924+
Schedule: unknownConfigureRes.Schedule.Next,
925+
}
926+
},
908927
}, driver.GetExecutor())
909928
if err != nil {
910929
return nil, err
@@ -1510,6 +1529,14 @@ func (c *Client[TTx]) ID() string {
15101529
return c.config.ID
15111530
}
15121531

1532+
// Regular expression to which the format of tags must comply. Mainly, no
1533+
// special characters, and with hyphens in the middle.
1534+
//
1535+
// A key property here (in case this is relaxed in the future) is that commas
1536+
// must never be allowed because they're used as a delimiter during batch job
1537+
// insertion for the `riverdatabasesql` driver.
1538+
var tagRE = regexp.MustCompile(`\A[\w][\w\-]+[\w]\z`)
1539+
15131540
func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, config *Config, args JobArgs, insertOpts *InsertOpts) (*rivertype.JobInsertParams, error) {
15141541
encodedArgs, err := json.Marshal(args)
15151542
if err != nil {
@@ -1562,11 +1589,11 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
15621589
var uniqueOpts UniqueOpts
15631590
if !config.Test.DisableUniqueEnforcement {
15641591
uniqueOpts = insertOpts.UniqueOpts
1565-
if uniqueOpts.isEmpty() {
1592+
if uniqueOptsIsEmpty(&uniqueOpts) {
15661593
uniqueOpts = jobInsertOpts.UniqueOpts
15671594
}
15681595
}
1569-
if err := uniqueOpts.validate(); err != nil {
1596+
if err := uniqueOptsValidate(&uniqueOpts); err != nil {
15701597
return nil, err
15711598
}
15721599

@@ -1587,7 +1614,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
15871614
State: rivertype.JobStateAvailable,
15881615
Tags: tags,
15891616
}
1590-
if !uniqueOpts.isEmpty() {
1617+
if !uniqueOptsIsEmpty(&uniqueOpts) {
15911618
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
15921619
insertParams.UniqueKey, err = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
15931620
if err != nil {
@@ -2709,3 +2736,70 @@ func defaultClientIDWithHost(startedAt time.Time, host string) string {
27092736

27102737
return host + "_" + strings.Replace(startedAt.Format(rfc3339Compact), ".", "_", 1)
27112738
}
2739+
2740+
// uniqueOptsIsEmpty returns true for an empty, uninitialized options struct.
2741+
//
2742+
// This is required because we can't check against `UniqueOpts{}` because slices
2743+
// aren't comparable. Unfortunately it makes things a little more brittle
2744+
// comparatively because any new options must also be considered here for things
2745+
// to work.
2746+
//
2747+
// This is a function instance of an instance function so that it doesn't have
2748+
// to be exported from `rivertype` and doesn't become part of the public API.
2749+
func uniqueOptsIsEmpty(opts *rivertype.UniqueOpts) bool {
2750+
return !opts.ByArgs &&
2751+
opts.ByPeriod == time.Duration(0) &&
2752+
!opts.ByQueue &&
2753+
opts.ByState == nil
2754+
}
2755+
2756+
var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals
2757+
2758+
var requiredV3states = []rivertype.JobState{ //nolint:gochecknoglobals
2759+
rivertype.JobStateAvailable,
2760+
rivertype.JobStatePending,
2761+
rivertype.JobStateRunning,
2762+
rivertype.JobStateScheduled,
2763+
}
2764+
2765+
// uniqueOptsValidate validates the given rivertype.UniqueOpts.
2766+
//
2767+
// This is a function instance of an instance function so that it doesn't have
2768+
// to be exported from `rivertype` and doesn't become part of the public API.
2769+
func uniqueOptsValidate(opts *rivertype.UniqueOpts) error {
2770+
if uniqueOptsIsEmpty(opts) {
2771+
return nil
2772+
}
2773+
2774+
if opts.ByPeriod != time.Duration(0) && opts.ByPeriod < 1*time.Second {
2775+
return errors.New("UniqueOpts.ByPeriod should not be less than 1 second")
2776+
}
2777+
2778+
// Job states are typed, but since the underlying type is a string, users
2779+
// can put anything they want in there.
2780+
for _, state := range opts.ByState {
2781+
// This could be turned to a map lookup, but last I checked the speed
2782+
// difference for tiny slice sizes is negligible, and map lookup might
2783+
// even be slower.
2784+
if !slices.Contains(jobStateAll, state) {
2785+
return fmt.Errorf("UniqueOpts.ByState contains invalid state %q", state)
2786+
}
2787+
}
2788+
2789+
// Skip required states validation if no custom states were provided.
2790+
if len(opts.ByState) == 0 {
2791+
return nil
2792+
}
2793+
2794+
var missingStates []string
2795+
for _, state := range requiredV3states {
2796+
if !slices.Contains(opts.ByState, state) {
2797+
missingStates = append(missingStates, string(state))
2798+
}
2799+
}
2800+
if len(missingStates) > 0 {
2801+
return fmt.Errorf("UniqueOpts.ByState must contain all required states, missing: %s", strings.Join(missingStates, ", "))
2802+
}
2803+
2804+
return nil
2805+
}

client_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8082,3 +8082,81 @@ func (f JobArgsWithHooksFunc) Hooks() []rivertype.Hook {
80828082
func (JobArgsWithHooksFunc) MarshalJSON() ([]byte, error) { return []byte("{}"), nil }
80838083

80848084
func (JobArgsWithHooksFunc) UnmarshalJSON([]byte) error { return nil }
8085+
8086+
func TestTagRE(t *testing.T) {
8087+
t.Parallel()
8088+
8089+
require.Regexp(t, tagRE, "aaa")
8090+
require.Regexp(t, tagRE, "_aaa")
8091+
require.Regexp(t, tagRE, "aaa_")
8092+
require.Regexp(t, tagRE, "777")
8093+
require.Regexp(t, tagRE, "my-tag")
8094+
require.Regexp(t, tagRE, "my_tag")
8095+
require.Regexp(t, tagRE, "my-longer-tag")
8096+
require.Regexp(t, tagRE, "my_longer_tag")
8097+
require.Regexp(t, tagRE, "My_Capitalized_Tag")
8098+
require.Regexp(t, tagRE, "ALL_CAPS")
8099+
require.Regexp(t, tagRE, "1_2_3")
8100+
8101+
require.NotRegexp(t, tagRE, "a")
8102+
require.NotRegexp(t, tagRE, "aa")
8103+
require.NotRegexp(t, tagRE, "-aaa")
8104+
require.NotRegexp(t, tagRE, "aaa-")
8105+
require.NotRegexp(t, tagRE, "special@characters$banned")
8106+
require.NotRegexp(t, tagRE, "commas,never,allowed")
8107+
}
8108+
8109+
func TestUniqueOptsIsEmpty(t *testing.T) {
8110+
t.Parallel()
8111+
8112+
require.True(t, uniqueOptsIsEmpty(&UniqueOpts{}))
8113+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByArgs: true}))
8114+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByPeriod: 1 * time.Nanosecond}))
8115+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByQueue: true}))
8116+
require.False(t, uniqueOptsIsEmpty(&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobStateAvailable}}))
8117+
}
8118+
8119+
func TestUniqueOptsValidate(t *testing.T) {
8120+
t.Parallel()
8121+
8122+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{}))
8123+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{
8124+
ByArgs: true,
8125+
ByPeriod: 1 * time.Second,
8126+
ByQueue: true,
8127+
}))
8128+
8129+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByPeriod: 1 * time.Millisecond}), "UniqueOpts.ByPeriod should not be less than 1 second")
8130+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: []rivertype.JobState{rivertype.JobState("invalid")}}), `UniqueOpts.ByState contains invalid state "invalid"`)
8131+
8132+
requiredStates := []rivertype.JobState{
8133+
rivertype.JobStateAvailable,
8134+
rivertype.JobStatePending,
8135+
rivertype.JobStateRunning,
8136+
rivertype.JobStateScheduled,
8137+
}
8138+
8139+
for _, state := range requiredStates {
8140+
// Test with each state individually removed from requiredStates to ensure
8141+
// it's validated.
8142+
8143+
// Create a copy of requiredStates without the current state
8144+
var testStates []rivertype.JobState
8145+
for _, s := range requiredStates {
8146+
if s != state {
8147+
testStates = append(testStates, s)
8148+
}
8149+
}
8150+
8151+
// Test validation
8152+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: testStates}), "UniqueOpts.ByState must contain all required states, missing: "+string(state))
8153+
}
8154+
8155+
// test with more than one required state missing:
8156+
require.EqualError(t, uniqueOptsValidate(&UniqueOpts{ByState: []rivertype.JobState{
8157+
rivertype.JobStateAvailable,
8158+
rivertype.JobStateScheduled,
8159+
}}), "UniqueOpts.ByState must contain all required states, missing: pending, running")
8160+
8161+
require.NoError(t, uniqueOptsValidate(&UniqueOpts{ByState: rivertype.JobStates()}))
8162+
}

insert_opts_test.go

Lines changed: 0 additions & 78 deletions
This file was deleted.

0 commit comments

Comments
 (0)