Skip to content

Commit da88f7e

Browse files
committed
[core] Use confsys.Service and/or configuration.ROSource
1 parent 4716fff commit da88f7e

5 files changed

Lines changed: 23 additions & 21 deletions

File tree

core/environment/manager.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,27 @@ package environment
2727
import (
2828
"errors"
2929
"fmt"
30+
"strings"
3031
"sync"
31-
"github.com/pborman/uuid"
32+
33+
"github.com/AliceO2Group/Control/core/confsys"
3234
"github.com/AliceO2Group/Control/core/task"
3335
"github.com/AliceO2Group/Control/core/workflow"
34-
"github.com/AliceO2Group/Control/configuration"
35-
"strings"
36+
"github.com/pborman/uuid"
3637
)
3738

3839
type Manager struct {
3940
mu sync.RWMutex
4041
m map[uuid.Array]*Environment
4142
taskman *task.Manager
42-
cfg configuration.Source
43+
confSvc *confsys.Service
4344
}
4445

45-
func NewEnvManager(tm *task.Manager, cfg configuration.Source) *Manager {
46+
func NewEnvManager(tm *task.Manager, confSvc *confsys.Service) *Manager {
4647
return &Manager{
4748
m: make(map[uuid.Array]*Environment),
4849
taskman: tm,
49-
cfg: cfg,
50+
confSvc: confSvc,
5051
}
5152
}
5253

@@ -137,5 +138,5 @@ func (envs *Manager) loadWorkflow(workflowPath string, parent workflow.Updatable
137138
if strings.Contains(workflowPath, "://") {
138139
return nil, errors.New("workflow loading from file not implemented yet")
139140
}
140-
return workflow.Load(envs.cfg, workflowPath, parent)
141+
return workflow.Load(envs.confSvc.GetROSource(), workflowPath, parent)
141142
}

core/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ func (m *RpcServer) GetWorkflowTemplates(cxt context.Context, req *pb.GetWorkflo
462462
m.state.RLock()
463463
defer m.state.RUnlock()
464464

465-
wfTree, err := m.state.cfgman.GetRecursive("o2/control/workflows")
465+
wfTree, err := m.state.confSvc.GetROSource().GetRecursive("o2/control/workflows")
466466
if err != nil {
467467
return nil, status.New(codes.FailedPrecondition, "cannot query available workflows").Err()
468468
}

core/state.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ import (
3232
"time"
3333
"sync"
3434

35+
"github.com/AliceO2Group/Control/core/confsys"
3536
"github.com/mesos/mesos-go/api/v1/lib"
3637
"github.com/mesos/mesos-go/api/v1/lib/backoff"
3738
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
3839
"github.com/looplab/fsm"
3940
"github.com/AliceO2Group/Control/core/environment"
40-
"github.com/AliceO2Group/Control/configuration"
4141
"encoding/json"
4242
"github.com/AliceO2Group/Control/core/controlcommands"
4343
"context"
@@ -61,9 +61,12 @@ func newInternalState(cfg Config, shutdown func()) (*internalState, error) {
6161
return nil, err
6262
}
6363

64-
cfgman, err := configuration.NewSource(cfg.configurationUri)
64+
confSvc, err := confsys.NewService(cfg.configurationUri)
65+
if err != nil {
66+
return nil, err
67+
}
6568
if cfg.veryVerbose {
66-
cfgDump, err := cfgman.GetRecursive("o2/control")
69+
cfgDump, err := confSvc.GetROSource().GetRecursive("o2/control")
6770
if err != nil {
6871
log.WithError(err).Fatal("cannot retrieve configuration")
6972
return nil, err
@@ -75,9 +78,6 @@ func newInternalState(cfg Config, shutdown func()) (*internalState, error) {
7578
}
7679
log.WithField("data", string(cfgBytes)).Debug("configuration dump")
7780
}
78-
if err != nil {
79-
return nil, err
80-
}
8181

8282
resourceOffersDone := make(chan task.DeploymentMap)
8383
tasksToDeploy := make(chan task.Descriptors)
@@ -96,7 +96,7 @@ func newInternalState(cfg Config, shutdown func()) (*internalState, error) {
9696
random: rand.New(rand.NewSource(time.Now().Unix())),
9797
shutdown: shutdown,
9898
environments: nil,
99-
cfgman: cfgman,
99+
confSvc: confSvc,
100100
}
101101

102102
state.servent = controlcommands.NewServent(
@@ -106,14 +106,14 @@ func newInternalState(cfg Config, shutdown func()) (*internalState, error) {
106106
)
107107
state.commandqueue = controlcommands.NewCommandQueue(state.servent)
108108

109-
taskman := task.NewManager(cfgman, resourceOffersDone,
109+
taskman := task.NewManager(confSvc.GetROSource(), resourceOffersDone,
110110
tasksToDeploy, reviveOffersTrg, state.commandqueue)
111111
err = taskman.RefreshClasses()
112112
if err != nil {
113113
log.WithField("error", err).Warning("bad configuration, some task templates were not refreshed")
114114
}
115115
state.taskman = taskman
116-
state.environments = environment.NewEnvManager(state.taskman, state.cfgman)
116+
state.environments = environment.NewEnvManager(state.taskman, confSvc)
117117
state.commandqueue.Start() // FIXME: there should be 1 cq per env
118118

119119
return state, nil
@@ -149,8 +149,9 @@ type internalState struct {
149149
sm *fsm.FSM
150150
environments *environment.Manager
151151
taskman *task.Manager
152-
cfgman configuration.Source
153152
commandqueue *controlcommands.CommandQueue
154153
servent *controlcommands.Servent
154+
155+
confSvc *confsys.Service
155156
}
156157

core/task/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ type Manager struct {
4646
classes map[string]*TaskClass
4747
roster Tasks
4848

49-
cfgman configuration.Source
49+
cfgman configuration.ROSource
5050
resourceOffersDone <-chan DeploymentMap
5151
tasksToDeploy chan<- Descriptors
5252
reviveOffersTrg chan struct{}
5353
cq *controlcommands.CommandQueue
5454
}
5555

56-
func NewManager(cfgman configuration.Source,
56+
func NewManager(cfgman configuration.ROSource,
5757
resourceOffersDone <-chan DeploymentMap,
5858
tasksToDeploy chan<- Descriptors,
5959
reviveOffersTrg chan struct{},

core/workflow/load.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
)
3333

3434
// FIXME: workflowPath should be of type configuration.Path, not string
35-
func Load(cfg configuration.Source, workflowPath string, parent Updatable) (workflow Role, err error) {
35+
func Load(cfg configuration.ROSource, workflowPath string, parent Updatable) (workflow Role, err error) {
3636
completePath := fmt.Sprintf("%s/%s", ConfigBasePath, strings.Trim(workflowPath, "/"))
3737
var yamlDoc []byte
3838
yamlDoc, err = cfg.GetRecursiveYaml(completePath)

0 commit comments

Comments
 (0)