Skip to content

Commit 382b52a

Browse files
committed
controller-runtime Source that emits a constant Event periodically
1 parent 9e3ad6f commit 382b52a

2 files changed

Lines changed: 188 additions & 0 deletions

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2021 - 2022 Crunchy Data Solutions, Inc.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package runtime
17+
18+
import (
19+
"context"
20+
"time"
21+
22+
"k8s.io/client-go/util/workqueue"
23+
"sigs.k8s.io/controller-runtime/pkg/event"
24+
"sigs.k8s.io/controller-runtime/pkg/handler"
25+
"sigs.k8s.io/controller-runtime/pkg/predicate"
26+
"sigs.k8s.io/controller-runtime/pkg/source"
27+
)
28+
29+
type ticker struct {
30+
time.Duration
31+
event.GenericEvent
32+
Immediate bool
33+
}
34+
35+
// NewTicker returns a Source that emits e every d.
36+
func NewTicker(d time.Duration, e event.GenericEvent) source.Source {
37+
return &ticker{Duration: d, GenericEvent: e}
38+
}
39+
40+
// NewTickerImmediate returns a Source that emits e at start and every d.
41+
func NewTickerImmediate(d time.Duration, e event.GenericEvent) source.Source {
42+
return &ticker{Duration: d, GenericEvent: e, Immediate: true}
43+
}
44+
45+
func (t ticker) String() string { return "every " + t.Duration.String() }
46+
47+
// Start is called by controller-runtime Controller and returns quickly.
48+
// It cleans up when ctx is cancelled.
49+
func (t ticker) Start(
50+
ctx context.Context, h handler.EventHandler,
51+
q workqueue.RateLimitingInterface, p ...predicate.Predicate,
52+
) error {
53+
ticker := time.NewTicker(t.Duration)
54+
55+
// Pass t.GenericEvent to h when it is not filtered out by p.
56+
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/source/internal#EventHandler
57+
emit := func() {
58+
for _, pp := range p {
59+
if !pp.Generic(t.GenericEvent) {
60+
return
61+
}
62+
}
63+
h.Generic(t.GenericEvent, q)
64+
}
65+
66+
if t.Immediate {
67+
emit()
68+
}
69+
70+
// Repeat until ctx is cancelled.
71+
go func() {
72+
defer ticker.Stop()
73+
74+
for {
75+
select {
76+
case <-ticker.C:
77+
emit()
78+
case <-ctx.Done():
79+
return
80+
}
81+
}
82+
}()
83+
84+
return nil
85+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2021 - 2022 Crunchy Data Solutions, Inc.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package runtime
17+
18+
import (
19+
"context"
20+
"testing"
21+
"time"
22+
23+
"gotest.tools/v3/assert"
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/client-go/util/workqueue"
26+
"sigs.k8s.io/controller-runtime/pkg/event"
27+
"sigs.k8s.io/controller-runtime/pkg/handler"
28+
"sigs.k8s.io/controller-runtime/pkg/predicate"
29+
)
30+
31+
func TestTickerString(t *testing.T) {
32+
assert.Equal(t, ticker{Duration: time.Millisecond}.String(), "every 1ms")
33+
assert.Equal(t, ticker{Duration: 10 * time.Second}.String(), "every 10s")
34+
assert.Equal(t, ticker{Duration: time.Hour}.String(), "every 1h0m0s")
35+
}
36+
37+
func TestTicker(t *testing.T) {
38+
t.Parallel()
39+
40+
var called []event.GenericEvent
41+
expected := event.GenericEvent{Object: new(corev1.ConfigMap)}
42+
43+
tq := workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter())
44+
th := handler.Funcs{GenericFunc: func(e event.GenericEvent, q workqueue.RateLimitingInterface) {
45+
called = append(called, e)
46+
47+
assert.Equal(t, q, tq, "should be called with the queue passed in Start")
48+
}}
49+
50+
t.Run("WithoutPredicates", func(t *testing.T) {
51+
called = nil
52+
53+
ticker := NewTicker(100*time.Millisecond, expected)
54+
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
55+
t.Cleanup(cancel)
56+
57+
// Start the ticker and wait for the deadline to pass.
58+
assert.NilError(t, ticker.Start(ctx, th, tq))
59+
<-ctx.Done()
60+
61+
assert.Equal(t, len(called), 2)
62+
assert.Equal(t, called[0], expected, "expected at 100ms")
63+
assert.Equal(t, called[1], expected, "expected at 200ms")
64+
})
65+
66+
t.Run("WithPredicates", func(t *testing.T) {
67+
called = nil
68+
69+
// Predicates that exclude events after a fixed number have passed.
70+
pLength := predicate.Funcs{GenericFunc: func(event.GenericEvent) bool { return len(called) < 3 }}
71+
pTrue := predicate.Funcs{GenericFunc: func(event.GenericEvent) bool { return true }}
72+
73+
ticker := NewTicker(50*time.Millisecond, expected)
74+
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
75+
t.Cleanup(cancel)
76+
77+
// Start the ticker and wait for the deadline to pass.
78+
assert.NilError(t, ticker.Start(ctx, th, tq, pTrue, pLength))
79+
<-ctx.Done()
80+
81+
assert.Equal(t, len(called), 3)
82+
assert.Equal(t, called[0], expected)
83+
assert.Equal(t, called[1], expected)
84+
assert.Equal(t, called[2], expected)
85+
})
86+
87+
t.Run("Immediate", func(t *testing.T) {
88+
called = nil
89+
90+
ticker := NewTickerImmediate(100*time.Millisecond, expected)
91+
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
92+
t.Cleanup(cancel)
93+
94+
// Start the ticker and wait for the deadline to pass.
95+
assert.NilError(t, ticker.Start(ctx, th, tq))
96+
<-ctx.Done()
97+
98+
assert.Equal(t, len(called), 3)
99+
assert.Equal(t, called[0], expected, "expected at 0ms")
100+
assert.Equal(t, called[1], expected, "expected at 100ms")
101+
assert.Equal(t, called[2], expected, "expected at 200ms")
102+
})
103+
}

0 commit comments

Comments
 (0)