Skip to content

Commit cf8c77e

Browse files
committed
add JobCancel to Pilot, add Pilot test coverage
1 parent 6184b2e commit cf8c77e

4 files changed

Lines changed: 344 additions & 1 deletion

File tree

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1428,7 +1428,7 @@ func (c *Client[TTx]) JobCancelTx(ctx context.Context, tx TTx, jobID int64) (*ri
14281428
}
14291429

14301430
func (c *Client[TTx]) jobCancel(ctx context.Context, exec riverdriver.Executor, jobID int64) (*rivertype.JobRow, error) {
1431-
return exec.JobCancel(ctx, &riverdriver.JobCancelParams{
1431+
return c.pilot.JobCancel(ctx, exec, &riverdriver.JobCancelParams{
14321432
ID: jobID,
14331433
CancelAttemptedAt: c.baseService.Time.NowUTC(),
14341434
ControlTopic: string(notifier.NotificationTopicControl),

client_pilot_test.go

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
package river
2+
3+
import (
4+
"context"
5+
"sync"
6+
"sync/atomic"
7+
"testing"
8+
"time"
9+
10+
"github.com/jackc/pgx/v5"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/riverqueue/river/riverdbtest"
14+
"github.com/riverqueue/river/riverdriver"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
"github.com/riverqueue/river/rivershared/baseservice"
17+
"github.com/riverqueue/river/rivershared/riverpilot"
18+
"github.com/riverqueue/river/rivershared/riversharedtest"
19+
"github.com/riverqueue/river/rivershared/testsignal"
20+
"github.com/riverqueue/river/rivershared/util/testutil"
21+
"github.com/riverqueue/river/rivertype"
22+
)
23+
24+
type pilotSpy struct {
25+
riverpilot.StandardPilot
26+
27+
jobCancelCalls atomic.Int64
28+
jobCleanerQueuesExcludedCalls atomic.Int64
29+
jobInsertManyCalls atomic.Int64
30+
jobRetryCalls atomic.Int64
31+
pilotInitCalls atomic.Int64
32+
33+
testSignals pilotSpyTestSignals
34+
}
35+
36+
type pilotSpyTestSignals struct {
37+
JobGetAvailable testsignal.TestSignal[struct{}]
38+
JobSetStateIfRunningMany testsignal.TestSignal[struct{}]
39+
PeriodicJobGetAll testsignal.TestSignal[struct{}]
40+
PeriodicJobKeepAlive testsignal.TestSignal[struct{}]
41+
PeriodicJobUpsertMany testsignal.TestSignal[struct{}]
42+
PilotInit testsignal.TestSignal[struct{}]
43+
ProducerInit testsignal.TestSignal[struct{}]
44+
ProducerKeepAlive testsignal.TestSignal[struct{}]
45+
ProducerShutdown testsignal.TestSignal[struct{}]
46+
QueueMetadataChanged testsignal.TestSignal[struct{}]
47+
}
48+
49+
func (ts *pilotSpyTestSignals) Init(tb testutil.TestingTB) {
50+
ts.JobGetAvailable.Init(tb)
51+
ts.JobSetStateIfRunningMany.Init(tb)
52+
ts.PeriodicJobGetAll.Init(tb)
53+
ts.PeriodicJobKeepAlive.Init(tb)
54+
ts.PeriodicJobUpsertMany.Init(tb)
55+
ts.PilotInit.Init(tb)
56+
ts.ProducerInit.Init(tb)
57+
ts.ProducerKeepAlive.Init(tb)
58+
ts.ProducerShutdown.Init(tb)
59+
ts.QueueMetadataChanged.Init(tb)
60+
}
61+
62+
func (p *pilotSpy) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
63+
p.jobCancelCalls.Add(1)
64+
return p.StandardPilot.JobCancel(ctx, exec, params)
65+
}
66+
67+
func (p *pilotSpy) JobCleanerQueuesExcluded() []string {
68+
p.jobCleanerQueuesExcludedCalls.Add(1)
69+
return p.StandardPilot.JobCleanerQueuesExcluded()
70+
}
71+
72+
func (p *pilotSpy) JobGetAvailable(ctx context.Context, exec riverdriver.Executor, state riverpilot.ProducerState, params *riverdriver.JobGetAvailableParams) ([]*rivertype.JobRow, error) {
73+
p.testSignals.JobGetAvailable.Signal(struct{}{})
74+
return p.StandardPilot.JobGetAvailable(ctx, exec, state, params)
75+
}
76+
77+
func (p *pilotSpy) JobInsertMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobInsertFastManyParams) ([]*riverdriver.JobInsertFastResult, error) {
78+
p.jobInsertManyCalls.Add(1)
79+
return p.StandardPilot.JobInsertMany(ctx, exec, params)
80+
}
81+
82+
func (p *pilotSpy) JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) {
83+
p.jobRetryCalls.Add(1)
84+
return p.StandardPilot.JobRetry(ctx, exec, params)
85+
}
86+
87+
func (p *pilotSpy) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) {
88+
p.testSignals.JobSetStateIfRunningMany.Signal(struct{}{})
89+
return p.StandardPilot.JobSetStateIfRunningMany(ctx, exec, params)
90+
}
91+
92+
func (p *pilotSpy) PeriodicJobGetAll(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobGetAllParams) ([]*riverpilot.PeriodicJob, error) {
93+
p.testSignals.PeriodicJobGetAll.Signal(struct{}{})
94+
return p.StandardPilot.PeriodicJobGetAll(ctx, exec, params)
95+
}
96+
97+
func (p *pilotSpy) PeriodicJobKeepAliveAndReap(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobKeepAliveAndReapParams) ([]*riverpilot.PeriodicJob, error) {
98+
p.testSignals.PeriodicJobKeepAlive.Signal(struct{}{})
99+
return p.StandardPilot.PeriodicJobKeepAliveAndReap(ctx, exec, params)
100+
}
101+
102+
func (p *pilotSpy) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.Executor, params *riverpilot.PeriodicJobUpsertManyParams) ([]*riverpilot.PeriodicJob, error) {
103+
p.testSignals.PeriodicJobUpsertMany.Signal(struct{}{})
104+
return p.StandardPilot.PeriodicJobUpsertMany(ctx, exec, params)
105+
}
106+
107+
func (p *pilotSpy) PilotInit(archetype *baseservice.Archetype, params *riverpilot.PilotInitParams) {
108+
p.pilotInitCalls.Add(1)
109+
p.testSignals.PilotInit.Signal(struct{}{})
110+
p.StandardPilot.PilotInit(archetype, params)
111+
}
112+
113+
func (p *pilotSpy) ProducerInit(ctx context.Context, exec riverdriver.Executor, params *riverpilot.ProducerInitParams) (int64, riverpilot.ProducerState, error) {
114+
p.testSignals.ProducerInit.Signal(struct{}{})
115+
return p.StandardPilot.ProducerInit(ctx, exec, params)
116+
}
117+
118+
func (p *pilotSpy) ProducerKeepAlive(ctx context.Context, exec riverdriver.Executor, params *riverdriver.ProducerKeepAliveParams) error {
119+
p.testSignals.ProducerKeepAlive.Signal(struct{}{})
120+
return p.StandardPilot.ProducerKeepAlive(ctx, exec, params)
121+
}
122+
123+
func (p *pilotSpy) ProducerShutdown(ctx context.Context, exec riverdriver.Executor, params *riverpilot.ProducerShutdownParams) error {
124+
p.testSignals.ProducerShutdown.Signal(struct{}{})
125+
return p.StandardPilot.ProducerShutdown(ctx, exec, params)
126+
}
127+
128+
func (p *pilotSpy) QueueMetadataChanged(ctx context.Context, exec riverdriver.Executor, params *riverpilot.QueueMetadataChangedParams) error {
129+
p.testSignals.QueueMetadataChanged.Signal(struct{}{})
130+
return p.StandardPilot.QueueMetadataChanged(ctx, exec, params)
131+
}
132+
133+
func Test_Client_PilotUsage(t *testing.T) {
134+
t.Parallel()
135+
136+
ctx := context.Background()
137+
138+
setup := func(t *testing.T, configMutate func(*Config)) (*Client[pgx.Tx], *pilotSpy) {
139+
t.Helper()
140+
141+
var (
142+
dbPool = riversharedtest.DBPool(ctx, t)
143+
driver = riverpgxv5.New(dbPool)
144+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
145+
config = newTestConfig(t, schema)
146+
)
147+
148+
if configMutate != nil {
149+
configMutate(config)
150+
}
151+
152+
pilot := &pilotSpy{}
153+
pluginDriver := newDriverWithPlugin(t, dbPool)
154+
pluginDriver.pilot = pilot
155+
156+
client, err := NewClient(pluginDriver, config)
157+
require.NoError(t, err)
158+
159+
return client, pilot
160+
}
161+
162+
withClientTx := func(t *testing.T, client *Client[pgx.Tx], callback func(tx pgx.Tx)) {
163+
t.Helper()
164+
165+
exec := client.Driver().GetExecutor()
166+
execTx, err := exec.Begin(ctx)
167+
require.NoError(t, err)
168+
169+
committed := false
170+
t.Cleanup(func() {
171+
if !committed {
172+
_ = execTx.Rollback(ctx)
173+
}
174+
})
175+
176+
tx := client.Driver().UnwrapTx(execTx)
177+
callback(tx)
178+
179+
require.NoError(t, execTx.Commit(ctx))
180+
committed = true
181+
}
182+
183+
t.Run("InitUsesPilot", func(t *testing.T) {
184+
t.Parallel()
185+
186+
client, pilot := setup(t, nil)
187+
require.NotNil(t, client)
188+
require.Equal(t, int64(1), pilot.jobCleanerQueuesExcludedCalls.Load())
189+
require.Equal(t, int64(1), pilot.pilotInitCalls.Load())
190+
})
191+
192+
t.Run("JobCancelUsesPilot", func(t *testing.T) {
193+
t.Parallel()
194+
195+
client, pilot := setup(t, nil)
196+
197+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
198+
ScheduledAt: time.Now().Add(5 * time.Minute),
199+
})
200+
require.NoError(t, err)
201+
202+
_, err = client.JobCancel(ctx, insertRes.Job.ID)
203+
require.NoError(t, err)
204+
require.Equal(t, int64(1), pilot.jobCancelCalls.Load())
205+
})
206+
207+
t.Run("JobCancelTxUsesPilot", func(t *testing.T) {
208+
t.Parallel()
209+
210+
client, pilot := setup(t, nil)
211+
212+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
213+
ScheduledAt: time.Now().Add(5 * time.Minute),
214+
})
215+
require.NoError(t, err)
216+
217+
withClientTx(t, client, func(tx pgx.Tx) {
218+
_, err = client.JobCancelTx(ctx, tx, insertRes.Job.ID)
219+
require.NoError(t, err)
220+
require.Equal(t, int64(1), pilot.jobCancelCalls.Load())
221+
})
222+
})
223+
224+
t.Run("JobInsertManyUsesPilot", func(t *testing.T) {
225+
t.Parallel()
226+
227+
client, pilot := setup(t, nil)
228+
229+
_, err := client.Insert(ctx, noOpArgs{}, nil)
230+
require.NoError(t, err)
231+
require.Equal(t, int64(1), pilot.jobInsertManyCalls.Load())
232+
})
233+
234+
t.Run("JobRetryUsesPilot", func(t *testing.T) {
235+
t.Parallel()
236+
237+
client, pilot := setup(t, nil)
238+
239+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
240+
ScheduledAt: time.Now().Add(5 * time.Minute),
241+
})
242+
require.NoError(t, err)
243+
244+
_, err = client.JobRetry(ctx, insertRes.Job.ID)
245+
require.NoError(t, err)
246+
require.Equal(t, int64(1), pilot.jobRetryCalls.Load())
247+
})
248+
249+
t.Run("JobRetryTxUsesPilot", func(t *testing.T) {
250+
t.Parallel()
251+
252+
client, pilot := setup(t, nil)
253+
254+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
255+
ScheduledAt: time.Now().Add(5 * time.Minute),
256+
})
257+
require.NoError(t, err)
258+
259+
withClientTx(t, client, func(tx pgx.Tx) {
260+
_, err = client.JobRetryTx(ctx, tx, insertRes.Job.ID)
261+
require.NoError(t, err)
262+
require.Equal(t, int64(1), pilot.jobRetryCalls.Load())
263+
})
264+
})
265+
266+
t.Run("PeriodicJobsUsePilot", func(t *testing.T) {
267+
t.Parallel()
268+
269+
client, pilot := setup(t, func(config *Config) {
270+
config.PeriodicJobs = []*PeriodicJob{
271+
NewPeriodicJob(PeriodicInterval(time.Second), func() (JobArgs, *InsertOpts) {
272+
return noOpArgs{}, nil
273+
}, &PeriodicJobOpts{
274+
ID: "pilot_periodic_job",
275+
RunOnStart: true,
276+
}),
277+
}
278+
})
279+
280+
client.testSignals.Init(t)
281+
pilot.testSignals.Init(t)
282+
283+
startClient(ctx, t, client)
284+
client.testSignals.electedLeader.WaitOrTimeout()
285+
286+
pilot.testSignals.PeriodicJobGetAll.WaitOrTimeout()
287+
pilot.testSignals.PeriodicJobUpsertMany.WaitOrTimeout()
288+
pilot.testSignals.PeriodicJobKeepAlive.WaitOrTimeout()
289+
})
290+
291+
t.Run("ProducerAndCompleterUsePilot", func(t *testing.T) {
292+
t.Parallel()
293+
294+
client, pilot := setup(t, nil)
295+
296+
jobDone := make(chan struct{})
297+
298+
type JobArgs struct {
299+
testutil.JobArgsReflectKind[JobArgs]
300+
}
301+
302+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
303+
close(jobDone)
304+
return nil
305+
}))
306+
307+
pilot.testSignals.Init(t)
308+
309+
require.NoError(t, client.Start(ctx))
310+
311+
stopOnce := sync.Once{}
312+
stopClient := func() {
313+
stopOnce.Do(func() {
314+
stopCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
315+
defer cancel()
316+
require.NoError(t, client.Stop(stopCtx))
317+
})
318+
}
319+
t.Cleanup(stopClient)
320+
321+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
322+
require.NoError(t, err)
323+
324+
riversharedtest.WaitOrTimeout(t, jobDone)
325+
require.NotZero(t, insertRes.Job.ID)
326+
327+
pilot.testSignals.JobGetAvailable.WaitOrTimeout()
328+
pilot.testSignals.JobSetStateIfRunningMany.WaitOrTimeout()
329+
pilot.testSignals.ProducerInit.WaitOrTimeout()
330+
pilot.testSignals.ProducerKeepAlive.WaitOrTimeout()
331+
pilot.testSignals.QueueMetadataChanged.WaitOrTimeout()
332+
333+
stopClient()
334+
335+
pilot.testSignals.ProducerShutdown.WaitOrTimeout()
336+
})
337+
}

rivershared/riverpilot/pilot.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
type Pilot interface {
2020
PilotPeriodicJob
2121

22+
JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error)
23+
2224
// JobCleanerQueuesExcluded returns queues that should be excluded from the
2325
// main River client's JobCleaner. If no queues should be omitted, this
2426
// function should return nil as opposed to an empty array. (Because the

rivershared/riverpilot/standard_pilot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex
2222
return exec.JobGetAvailable(ctx, params)
2323
}
2424

25+
func (p *StandardPilot) JobCancel(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobCancelParams) (*rivertype.JobRow, error) {
26+
return exec.JobCancel(ctx, params)
27+
}
28+
2529
func (p *StandardPilot) JobInsertMany(
2630
ctx context.Context,
2731
exec riverdriver.Executor,

0 commit comments

Comments
 (0)