Skip to content

Commit 8ff70f6

Browse files
author
Santhosh Manohar
committed
Merge pull request #1149 from mrjana/agent
Add libnetwork agent mode support
2 parents 4033ff5 + 42a48a2 commit 8ff70f6

17 files changed

Lines changed: 719 additions & 53 deletions

File tree

agent.go

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package libnetwork
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"os"
7+
"strings"
8+
9+
"github.com/Sirupsen/logrus"
10+
"github.com/docker/go-events"
11+
"github.com/docker/libnetwork/datastore"
12+
"github.com/docker/libnetwork/discoverapi"
13+
"github.com/docker/libnetwork/driverapi"
14+
"github.com/docker/libnetwork/networkdb"
15+
)
16+
17+
type agent struct {
18+
networkDB *networkdb.NetworkDB
19+
bindAddr string
20+
epTblCancel func()
21+
driverCancelFuncs map[string][]func()
22+
}
23+
24+
func getBindAddr(ifaceName string) (string, error) {
25+
iface, err := net.InterfaceByName(ifaceName)
26+
if err != nil {
27+
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
28+
}
29+
30+
addrs, err := iface.Addrs()
31+
if err != nil {
32+
return "", fmt.Errorf("failed to get interface addresses: %v", err)
33+
}
34+
35+
for _, a := range addrs {
36+
addr, ok := a.(*net.IPNet)
37+
if !ok {
38+
continue
39+
}
40+
addrIP := addr.IP
41+
42+
if addrIP.IsLinkLocalUnicast() {
43+
continue
44+
}
45+
46+
return addrIP.String(), nil
47+
}
48+
49+
return "", fmt.Errorf("failed to get bind address")
50+
}
51+
52+
func resolveAddr(addrOrInterface string) (string, error) {
53+
// Try and see if this is a valid IP address
54+
if net.ParseIP(addrOrInterface) != nil {
55+
return addrOrInterface, nil
56+
}
57+
58+
// If not a valid IP address, it should be a valid interface
59+
return getBindAddr(addrOrInterface)
60+
}
61+
62+
func (c *controller) agentInit(bindAddrOrInterface string) error {
63+
if !c.cfg.Daemon.IsAgent {
64+
return nil
65+
}
66+
67+
bindAddr, err := resolveAddr(bindAddrOrInterface)
68+
if err != nil {
69+
return err
70+
}
71+
72+
hostname, _ := os.Hostname()
73+
nDB, err := networkdb.New(&networkdb.Config{
74+
BindAddr: bindAddr,
75+
NodeName: hostname,
76+
})
77+
78+
if err != nil {
79+
return err
80+
}
81+
82+
ch, cancel := nDB.Watch("endpoint_table", "", "")
83+
84+
c.agent = &agent{
85+
networkDB: nDB,
86+
bindAddr: bindAddr,
87+
epTblCancel: cancel,
88+
driverCancelFuncs: make(map[string][]func()),
89+
}
90+
91+
go c.handleTableEvents(ch, c.handleEpTableEvent)
92+
return nil
93+
}
94+
95+
func (c *controller) agentJoin(remotes []string) error {
96+
if c.agent == nil {
97+
return nil
98+
}
99+
100+
return c.agent.networkDB.Join(remotes)
101+
}
102+
103+
func (c *controller) agentDriverNotify(d driverapi.Driver) {
104+
if c.agent == nil {
105+
return
106+
}
107+
108+
d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
109+
Address: c.agent.bindAddr,
110+
Self: true,
111+
})
112+
}
113+
114+
func (c *controller) agentClose() {
115+
if c.agent == nil {
116+
return
117+
}
118+
119+
for _, cancelFuncs := range c.agent.driverCancelFuncs {
120+
for _, cancel := range cancelFuncs {
121+
cancel()
122+
}
123+
}
124+
c.agent.epTblCancel()
125+
126+
c.agent.networkDB.Close()
127+
}
128+
129+
func (n *network) isClusterEligible() bool {
130+
if n.driverScope() != datastore.GlobalScope {
131+
return false
132+
}
133+
134+
c := n.getController()
135+
if c.agent == nil {
136+
return false
137+
}
138+
139+
return true
140+
}
141+
142+
func (n *network) joinCluster() error {
143+
if !n.isClusterEligible() {
144+
return nil
145+
}
146+
147+
c := n.getController()
148+
return c.agent.networkDB.JoinNetwork(n.ID())
149+
}
150+
151+
func (n *network) leaveCluster() error {
152+
if !n.isClusterEligible() {
153+
return nil
154+
}
155+
156+
c := n.getController()
157+
return c.agent.networkDB.LeaveNetwork(n.ID())
158+
}
159+
160+
func (ep *endpoint) addToCluster() error {
161+
n := ep.getNetwork()
162+
if !n.isClusterEligible() {
163+
return nil
164+
}
165+
166+
c := n.getController()
167+
if !ep.isAnonymous() && ep.Iface().Address() != nil {
168+
if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
169+
return err
170+
}
171+
}
172+
173+
for _, te := range ep.joinInfo.driverTableEntries {
174+
if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
175+
return err
176+
}
177+
}
178+
179+
return nil
180+
}
181+
182+
func (ep *endpoint) deleteFromCluster() error {
183+
n := ep.getNetwork()
184+
if !n.isClusterEligible() {
185+
return nil
186+
}
187+
188+
c := n.getController()
189+
if !ep.isAnonymous() {
190+
if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
191+
return err
192+
}
193+
}
194+
195+
if ep.joinInfo == nil {
196+
return nil
197+
}
198+
199+
for _, te := range ep.joinInfo.driverTableEntries {
200+
if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
201+
return err
202+
}
203+
}
204+
205+
return nil
206+
}
207+
208+
func (n *network) addDriverWatches() {
209+
if !n.isClusterEligible() {
210+
return
211+
}
212+
213+
c := n.getController()
214+
for _, tableName := range n.driverTables {
215+
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
216+
c.Lock()
217+
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
218+
c.Unlock()
219+
220+
go c.handleTableEvents(ch, n.handleDriverTableEvent)
221+
d, err := n.driver(false)
222+
if err != nil {
223+
logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
224+
return
225+
}
226+
227+
c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
228+
d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
229+
return false
230+
})
231+
}
232+
}
233+
234+
func (n *network) cancelDriverWatches() {
235+
if !n.isClusterEligible() {
236+
return
237+
}
238+
239+
c := n.getController()
240+
c.Lock()
241+
cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
242+
delete(c.agent.driverCancelFuncs, n.ID())
243+
c.Unlock()
244+
245+
for _, cancel := range cancelFuncs {
246+
cancel()
247+
}
248+
}
249+
250+
func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
251+
for {
252+
select {
253+
case ev, ok := <-ch:
254+
if !ok {
255+
return
256+
}
257+
258+
fn(ev)
259+
}
260+
}
261+
}
262+
263+
func (n *network) handleDriverTableEvent(ev events.Event) {
264+
d, err := n.driver(false)
265+
if err != nil {
266+
logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
267+
return
268+
}
269+
270+
var (
271+
etype driverapi.EventType
272+
tname string
273+
key string
274+
value []byte
275+
)
276+
277+
switch event := ev.(type) {
278+
case networkdb.CreateEvent:
279+
tname = event.Table
280+
key = event.Key
281+
value = event.Value
282+
etype = driverapi.Create
283+
case networkdb.DeleteEvent:
284+
tname = event.Table
285+
key = event.Key
286+
value = event.Value
287+
etype = driverapi.Delete
288+
case networkdb.UpdateEvent:
289+
tname = event.Table
290+
key = event.Key
291+
value = event.Value
292+
etype = driverapi.Delete
293+
}
294+
295+
d.EventNotify(etype, n.ID(), tname, key, value)
296+
}
297+
298+
func (c *controller) handleEpTableEvent(ev events.Event) {
299+
var (
300+
id string
301+
value string
302+
isAdd bool
303+
)
304+
305+
switch event := ev.(type) {
306+
case networkdb.CreateEvent:
307+
id = event.NetworkID
308+
value = string(event.Value)
309+
isAdd = true
310+
case networkdb.DeleteEvent:
311+
id = event.NetworkID
312+
value = string(event.Value)
313+
case networkdb.UpdateEvent:
314+
logrus.Errorf("Unexpected update service table event = %#v", event)
315+
}
316+
317+
nw, err := c.NetworkByID(id)
318+
if err != nil {
319+
logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
320+
return
321+
}
322+
n := nw.(*network)
323+
324+
pair := strings.Split(value, "=")
325+
if len(pair) < 2 {
326+
logrus.Errorf("Incorrect service table value = %s", value)
327+
return
328+
}
329+
330+
name := pair[0]
331+
ip := net.ParseIP(pair[1])
332+
333+
if name == "" || ip == nil {
334+
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
335+
return
336+
}
337+
338+
if isAdd {
339+
n.addSvcRecords(name, ip, nil, true)
340+
} else {
341+
n.deleteSvcRecords(name, ip, nil, true)
342+
}
343+
}

api/api.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,17 @@ func procCreateNetwork(c libnetwork.NetworkController, vars map[string]string, b
307307
if len(create.DriverOpts) > 0 {
308308
options = append(options, libnetwork.NetworkOptionDriverOpts(create.DriverOpts))
309309
}
310-
nw, err := c.NewNetwork(create.NetworkType, create.Name, "", options...)
310+
311+
if len(create.IPv4Conf) > 0 {
312+
ipamV4Conf := &libnetwork.IpamConf{
313+
PreferredPool: create.IPv4Conf[0].PreferredPool,
314+
SubPool: create.IPv4Conf[0].SubPool,
315+
}
316+
317+
options = append(options, libnetwork.NetworkOptionIpam("default", "", []*libnetwork.IpamConf{ipamV4Conf}, nil, nil))
318+
}
319+
320+
nw, err := c.NewNetwork(create.NetworkType, create.Name, create.ID, options...)
311321
if err != nil {
312322
return nil, convertNetworkError(err)
313323
}
@@ -697,6 +707,7 @@ func procAttachBackend(c libnetwork.NetworkController, vars map[string]string, b
697707
if err != nil {
698708
return nil, convertNetworkError(err)
699709
}
710+
700711
return sb.Key(), &successResponse
701712
}
702713

api/types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,19 @@ type sandboxResource struct {
3232
Body types
3333
************/
3434

35+
type ipamConf struct {
36+
PreferredPool string
37+
SubPool string
38+
Gateway string
39+
AuxAddresses map[string]string
40+
}
41+
3542
// networkCreate is the expected body of the "create network" http request message
3643
type networkCreate struct {
3744
Name string `json:"name"`
45+
ID string `json:"id"`
3846
NetworkType string `json:"network_type"`
47+
IPv4Conf []ipamConf `json:"ipv4_configuration"`
3948
DriverOpts map[string]string `json:"driver_opts"`
4049
NetworkOpts map[string]string `json:"network_opts"`
4150
}

0 commit comments

Comments
 (0)