Skip to content

Commit 632017b

Browse files
committed
Add libnetwork agent mode support
libnetwork agent mode is a mode where libnetwork can act as a local agent for network and discovery plumbing alone while the state management is done elsewhere. This completes the support for making libnetwork and its associated drivers to be completely independent of a k/v store(if needed) and work purely based on the state information passed along by some some external controller or manager. This does not mean that libnetwork support for decentralized state management via a k/v store is removed. Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
1 parent 26e40eb commit 632017b

9 files changed

Lines changed: 495 additions & 17 deletions

File tree

agent.go

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
package libnetwork
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"os"
7+
"strings"
8+
9+
"github.com/Sirupsen/logrus"
10+
"github.com/docker/go-events"
11+
"github.com/docker/libnetwork/datastore"
12+
"github.com/docker/libnetwork/discoverapi"
13+
"github.com/docker/libnetwork/driverapi"
14+
"github.com/docker/libnetwork/networkdb"
15+
)
16+
17+
type agent struct {
18+
networkDB *networkdb.NetworkDB
19+
bindAddr string
20+
epTblCancel func()
21+
driverCancelFuncs map[string][]func()
22+
}
23+
24+
func getBindAddr(ifaceName string) (string, error) {
25+
iface, err := net.InterfaceByName(ifaceName)
26+
if err != nil {
27+
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
28+
}
29+
30+
addrs, err := iface.Addrs()
31+
if err != nil {
32+
return "", fmt.Errorf("failed to get interface addresses: %v", err)
33+
}
34+
35+
for _, a := range addrs {
36+
addr, ok := a.(*net.IPNet)
37+
if !ok {
38+
continue
39+
}
40+
addrIP := addr.IP
41+
42+
if addrIP.IsLinkLocalUnicast() {
43+
continue
44+
}
45+
46+
return addrIP.String(), nil
47+
}
48+
49+
return "", fmt.Errorf("failed to get bind address")
50+
}
51+
52+
func resolveAddr(addrOrInterface string) (string, error) {
53+
// Try and see if this is a valid IP address
54+
if net.ParseIP(addrOrInterface) != nil {
55+
return addrOrInterface, nil
56+
}
57+
58+
// If not a valid IP address, it should be a valid interface
59+
return getBindAddr(addrOrInterface)
60+
}
61+
62+
func (c *controller) agentInit(bindAddrOrInterface string) error {
63+
if !c.cfg.Daemon.IsAgent {
64+
return nil
65+
}
66+
67+
bindAddr, err := resolveAddr(bindAddrOrInterface)
68+
if err != nil {
69+
return err
70+
}
71+
72+
hostname, _ := os.Hostname()
73+
nDB, err := networkdb.New(&networkdb.Config{
74+
BindAddr: bindAddr,
75+
NodeName: hostname,
76+
})
77+
78+
if err != nil {
79+
return err
80+
}
81+
82+
ch, cancel := nDB.Watch("endpoint_table", "", "")
83+
84+
c.agent = &agent{
85+
networkDB: nDB,
86+
bindAddr: bindAddr,
87+
epTblCancel: cancel,
88+
driverCancelFuncs: make(map[string][]func()),
89+
}
90+
91+
go c.handleTableEvents(ch, c.handleEpTableEvent)
92+
return nil
93+
}
94+
95+
func (c *controller) agentJoin(remotes []string) error {
96+
if c.agent == nil {
97+
return nil
98+
}
99+
100+
return c.agent.networkDB.Join(remotes)
101+
}
102+
103+
func (c *controller) agentDriverNotify(d driverapi.Driver) {
104+
if c.agent == nil {
105+
return
106+
}
107+
108+
d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
109+
Address: c.agent.bindAddr,
110+
Self: true,
111+
})
112+
}
113+
114+
func (c *controller) agentClose() {
115+
if c.agent == nil {
116+
return
117+
}
118+
119+
for _, cancelFuncs := range c.agent.driverCancelFuncs {
120+
for _, cancel := range cancelFuncs {
121+
cancel()
122+
}
123+
}
124+
c.agent.epTblCancel()
125+
126+
c.agent.networkDB.Close()
127+
}
128+
129+
func (n *network) isClusterEligible() bool {
130+
if n.driverScope() != datastore.GlobalScope {
131+
return false
132+
}
133+
134+
c := n.getController()
135+
if c.agent == nil {
136+
return false
137+
}
138+
139+
return true
140+
}
141+
142+
func (n *network) joinCluster() error {
143+
if !n.isClusterEligible() {
144+
return nil
145+
}
146+
147+
c := n.getController()
148+
return c.agent.networkDB.JoinNetwork(n.ID())
149+
}
150+
151+
func (n *network) leaveCluster() error {
152+
if !n.isClusterEligible() {
153+
return nil
154+
}
155+
156+
c := n.getController()
157+
return c.agent.networkDB.LeaveNetwork(n.ID())
158+
}
159+
160+
func (ep *endpoint) addToCluster() error {
161+
n := ep.getNetwork()
162+
if !n.isClusterEligible() {
163+
return nil
164+
}
165+
166+
c := n.getController()
167+
if !ep.isAnonymous() && ep.Iface().Address() != nil {
168+
if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
169+
return err
170+
}
171+
}
172+
173+
for _, te := range ep.joinInfo.driverTableEntries {
174+
if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
175+
return err
176+
}
177+
}
178+
179+
return nil
180+
}
181+
182+
func (ep *endpoint) deleteFromCluster() error {
183+
n := ep.getNetwork()
184+
if !n.isClusterEligible() {
185+
return nil
186+
}
187+
188+
c := n.getController()
189+
if !ep.isAnonymous() {
190+
if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
191+
return err
192+
}
193+
}
194+
195+
if ep.joinInfo == nil {
196+
return nil
197+
}
198+
199+
for _, te := range ep.joinInfo.driverTableEntries {
200+
if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
201+
return err
202+
}
203+
}
204+
205+
return nil
206+
}
207+
208+
func (n *network) addDriverWatches() {
209+
if !n.isClusterEligible() {
210+
return
211+
}
212+
213+
c := n.getController()
214+
for _, tableName := range n.driverTables {
215+
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
216+
c.Lock()
217+
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
218+
c.Unlock()
219+
220+
go c.handleTableEvents(ch, n.handleDriverTableEvent)
221+
d, err := n.driver(false)
222+
if err != nil {
223+
logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
224+
return
225+
}
226+
227+
c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
228+
d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
229+
return false
230+
})
231+
}
232+
}
233+
234+
func (n *network) cancelDriverWatches() {
235+
if !n.isClusterEligible() {
236+
return
237+
}
238+
239+
c := n.getController()
240+
c.Lock()
241+
cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
242+
delete(c.agent.driverCancelFuncs, n.ID())
243+
c.Unlock()
244+
245+
for _, cancel := range cancelFuncs {
246+
cancel()
247+
}
248+
}
249+
250+
func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
251+
for {
252+
select {
253+
case ev, ok := <-ch:
254+
if !ok {
255+
return
256+
}
257+
258+
fn(ev)
259+
}
260+
}
261+
}
262+
263+
func (n *network) handleDriverTableEvent(ev events.Event) {
264+
d, err := n.driver(false)
265+
if err != nil {
266+
logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
267+
return
268+
}
269+
270+
var (
271+
etype driverapi.EventType
272+
tname string
273+
key string
274+
value []byte
275+
)
276+
277+
switch event := ev.(type) {
278+
case networkdb.CreateEvent:
279+
tname = event.Table
280+
key = event.Key
281+
value = event.Value
282+
etype = driverapi.Create
283+
case networkdb.DeleteEvent:
284+
tname = event.Table
285+
key = event.Key
286+
value = event.Value
287+
etype = driverapi.Delete
288+
case networkdb.UpdateEvent:
289+
tname = event.Table
290+
key = event.Key
291+
value = event.Value
292+
etype = driverapi.Delete
293+
}
294+
295+
d.EventNotify(etype, n.ID(), tname, key, value)
296+
}
297+
298+
func (c *controller) handleEpTableEvent(ev events.Event) {
299+
var (
300+
id string
301+
value string
302+
isAdd bool
303+
)
304+
305+
switch event := ev.(type) {
306+
case networkdb.CreateEvent:
307+
id = event.NetworkID
308+
value = string(event.Value)
309+
isAdd = true
310+
case networkdb.DeleteEvent:
311+
id = event.NetworkID
312+
value = string(event.Value)
313+
case networkdb.UpdateEvent:
314+
logrus.Errorf("Unexpected update service table event = %#v", event)
315+
}
316+
317+
nw, err := c.NetworkByID(id)
318+
if err != nil {
319+
logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
320+
return
321+
}
322+
n := nw.(*network)
323+
324+
pair := strings.Split(value, "=")
325+
if len(pair) < 2 {
326+
logrus.Errorf("Incorrect service table value = %s", value)
327+
return
328+
}
329+
330+
name := pair[0]
331+
ip := net.ParseIP(pair[1])
332+
333+
if name == "" || ip == nil {
334+
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
335+
return
336+
}
337+
338+
if isAdd {
339+
n.addSvcRecords(name, ip, nil, true)
340+
} else {
341+
n.deleteSvcRecords(name, ip, nil, true)
342+
}
343+
}

config/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@ type Config struct {
2222
// DaemonCfg represents libnetwork core configuration
2323
type DaemonCfg struct {
2424
Debug bool
25+
IsAgent bool
2526
DataDir string
2627
DefaultNetwork string
2728
DefaultDriver string
29+
Bind string
30+
Neighbors []string
2831
Labels []string
2932
DriverCfg map[string]interface{}
3033
}
@@ -81,6 +84,27 @@ func ParseConfigOptions(cfgOptions ...Option) *Config {
8184
// to the controller
8285
type Option func(c *Config)
8386

87+
// OptionBind function returns an option setter for setting a bind interface or address
88+
func OptionBind(bind string) Option {
89+
return func(c *Config) {
90+
c.Daemon.Bind = bind
91+
}
92+
}
93+
94+
// OptionAgent function returns an option setter for setting agent mode
95+
func OptionAgent() Option {
96+
return func(c *Config) {
97+
c.Daemon.IsAgent = true
98+
}
99+
}
100+
101+
// OptionNeighbors function returns an option setter for setting a list of neighbors to join.
102+
func OptionNeighbors(neighbors []string) Option {
103+
return func(c *Config) {
104+
c.Daemon.Neighbors = neighbors
105+
}
106+
}
107+
84108
// OptionDefaultNetwork function returns an option setter for a default network
85109
func OptionDefaultNetwork(dn string) Option {
86110
return func(c *Config) {

0 commit comments

Comments
 (0)