Skip to content

Commit c6db54f

Browse files
committed
add the upstream containerd/pkg/shim to internal/shim
The existing runhcs shim management logic is tightly coupled, making it difficult to reuse for new shim implementations. Additionally, aligning with upstream's move toward BootstrapParams requires additional effort in existing logic. To maintain compatibility and reduce technical debt, this commit vendors the containerd/pkg/shim implementation. Source: https://github.com/containerd/containerd/tree/64ed272067a24c2d917064eea25a78e1479d632f/pkg/shim (based on v2.1.2) Signed-off-by: Harsh Rawat <harshrawat@microsoft.com>
1 parent b6f40e1 commit c6db54f

8 files changed

Lines changed: 1250 additions & 2 deletions

File tree

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ require (
3636
github.com/containerd/errdefs v1.0.0
3737
github.com/containerd/errdefs/pkg v0.3.0
3838
github.com/containerd/go-runc v1.1.0
39+
github.com/containerd/log v0.1.0
3940
github.com/containerd/platforms v1.0.0-rc.1
41+
github.com/containerd/plugin v1.0.0
4042
github.com/containerd/ttrpc v1.2.7
4143
github.com/containerd/typeurl/v2 v2.2.3
4244
github.com/google/go-cmp v0.7.0
@@ -76,8 +78,6 @@ require (
7678
github.com/checkpoint-restore/go-criu/v6 v6.3.0 // indirect
7779
github.com/containerd/continuity v0.4.5 // indirect
7880
github.com/containerd/fifo v1.1.0 // indirect
79-
github.com/containerd/log v0.1.0 // indirect
80-
github.com/containerd/plugin v1.0.0 // indirect
8181
github.com/containerd/protobuild v0.3.0 // indirect
8282
github.com/containerd/stargz-snapshotter/estargz v0.15.1 // indirect
8383
github.com/coreos/go-systemd/v22 v22.5.0 // indirect

internal/shim/publisher.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package shim
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
24+
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
25+
"github.com/containerd/containerd/api/types"
26+
"github.com/containerd/containerd/v2/core/events"
27+
"github.com/containerd/containerd/v2/pkg/namespaces"
28+
"github.com/containerd/containerd/v2/pkg/protobuf"
29+
"github.com/containerd/containerd/v2/pkg/ttrpcutil"
30+
"github.com/containerd/log"
31+
"github.com/containerd/ttrpc"
32+
"github.com/containerd/typeurl/v2"
33+
)
34+
35+
const (
36+
queueSize = 2048
37+
maxRequeue = 5
38+
)
39+
40+
type item struct {
41+
ev *types.Envelope
42+
ctx context.Context
43+
count int
44+
}
45+
46+
type publisherConfig struct {
47+
ttrpcOpts []ttrpc.ClientOpts
48+
}
49+
50+
type PublisherOpts func(*publisherConfig)
51+
52+
func WithPublishTTRPCOpts(opts ...ttrpc.ClientOpts) PublisherOpts {
53+
return func(cfg *publisherConfig) {
54+
cfg.ttrpcOpts = append(cfg.ttrpcOpts, opts...)
55+
}
56+
}
57+
58+
// NewPublisher creates a new remote events publisher
59+
func NewPublisher(address string, opts ...PublisherOpts) (*RemoteEventsPublisher, error) {
60+
client, err := ttrpcutil.NewClient(address)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
l := &RemoteEventsPublisher{
66+
client: client,
67+
closed: make(chan struct{}),
68+
requeue: make(chan *item, queueSize),
69+
}
70+
71+
go l.processQueue()
72+
return l, nil
73+
}
74+
75+
// RemoteEventsPublisher forwards events to a ttrpc server
76+
type RemoteEventsPublisher struct {
77+
client *ttrpcutil.Client
78+
closed chan struct{}
79+
closer sync.Once
80+
requeue chan *item
81+
}
82+
83+
// Done returns a channel which closes when done
84+
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
85+
return l.closed
86+
}
87+
88+
// Close closes the remote connection and closes the done channel
89+
func (l *RemoteEventsPublisher) Close() (err error) {
90+
err = l.client.Close()
91+
l.closer.Do(func() {
92+
close(l.closed)
93+
})
94+
return err
95+
}
96+
97+
func (l *RemoteEventsPublisher) processQueue() {
98+
for i := range l.requeue {
99+
if i.count > maxRequeue {
100+
log.L.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
101+
// drop the event
102+
continue
103+
}
104+
105+
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
106+
log.L.WithError(err).Error("forward event")
107+
l.queue(i)
108+
}
109+
}
110+
}
111+
112+
func (l *RemoteEventsPublisher) queue(i *item) {
113+
go func() {
114+
i.count++
115+
// re-queue after a short delay
116+
time.Sleep(time.Duration(1*i.count) * time.Second)
117+
l.requeue <- i
118+
}()
119+
}
120+
121+
// Publish publishes the event by forwarding it to the configured ttrpc server
122+
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
123+
ns, err := namespaces.NamespaceRequired(ctx)
124+
if err != nil {
125+
return err
126+
}
127+
evt, err := typeurl.MarshalAnyToProto(event)
128+
if err != nil {
129+
return err
130+
}
131+
i := &item{
132+
ev: &types.Envelope{
133+
Timestamp: protobuf.ToTimestamp(time.Now()),
134+
Namespace: ns,
135+
Topic: topic,
136+
Event: evt,
137+
},
138+
ctx: ctx,
139+
}
140+
141+
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
142+
l.queue(i)
143+
return err
144+
}
145+
146+
return nil
147+
}
148+
149+
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
150+
service, err := l.client.EventsService()
151+
if err == nil {
152+
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
153+
_, err = service.Forward(fCtx, req)
154+
cancel()
155+
if err == nil {
156+
return nil
157+
}
158+
}
159+
160+
if err != ttrpc.ErrClosed {
161+
return err
162+
}
163+
164+
// Reconnect and retry request
165+
if err = l.client.Reconnect(); err != nil {
166+
return err
167+
}
168+
169+
service, err = l.client.EventsService()
170+
if err != nil {
171+
return err
172+
}
173+
174+
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
175+
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
176+
_, err = service.Forward(fCtx, req)
177+
cancel()
178+
if err != nil {
179+
return err
180+
}
181+
182+
return nil
183+
}

0 commit comments

Comments
 (0)