Skip to content

Commit bd4314e

Browse files
committed
[core] Implementation of vastly expanded control API
1 parent 58491c0 commit bd4314e

2 files changed

Lines changed: 252 additions & 21 deletions

File tree

core/server.go

Lines changed: 96 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"runtime"
3131
"time"
3232

33+
"github.com/AliceO2Group/Control/core/task/channel"
3334
"golang.org/x/net/context"
3435
"google.golang.org/grpc"
3536

@@ -90,11 +91,13 @@ func (m *RpcServer) GetFrameworkInfo(context.Context, *pb.GetFrameworkInfoReques
9091
m.logMethod()
9192
m.state.RLock()
9293
defer m.state.RUnlock()
94+
9395
r := &pb.GetFrameworkInfoReply{
9496
FrameworkId: store.GetIgnoreErrors(m.fidStore)(),
9597
EnvironmentsCount: int32(len(m.state.environments.Ids())),
9698
TasksCount: int32(m.state.taskman.TaskCount()),
9799
State: m.state.sm.Current(),
100+
HostsCount: int32(m.state.taskman.AgentCache.Count()),
98101
}
99102
return r, nil
100103
}
@@ -124,16 +127,14 @@ func (m *RpcServer) GetEnvironments(context.Context, *pb.GetEnvironmentsRequest)
124127
continue
125128
}
126129
tasks := env.Workflow().GetTasks()
127-
taskNames := make([]string, len(tasks))
128-
for i, task := range tasks {
129-
taskNames[i] = task.GetName()
130-
}
131130
e := &pb.EnvironmentInfo{
132131
Id: env.Id().String(),
133132
CreatedWhen: env.CreatedWhen().Format(time.RFC3339),
134133
State: env.CurrentState(),
135-
Tasks: taskNames,
134+
Tasks: tasksToShortTaskInfos(tasks),
135+
RootRole: env.Workflow().GetName(),
136136
}
137+
137138
r.Environments = append(r.Environments, e)
138139
}
139140
return r, nil
@@ -164,7 +165,7 @@ func (m *RpcServer) NewEnvironment(cxt context.Context, request *pb.NewEnvironme
164165
}
165166

166167
// Create new Environment instance with some roles, we get back a UUID
167-
id, err := m.state.environments.CreateEnvironment(request.GetWorkflow())
168+
id, err := m.state.environments.CreateEnvironment(request.GetWorkflowTemplate())
168169
if err != nil {
169170
return nil, status.Newf(codes.Internal, "cannot create new environment: %s", err.Error()).Err()
170171
}
@@ -174,9 +175,15 @@ func (m *RpcServer) NewEnvironment(cxt context.Context, request *pb.NewEnvironme
174175
return nil, status.Newf(codes.Internal, "cannot get newly created environment: %s", err.Error()).Err()
175176
}
176177

178+
tasks := newEnv.Workflow().GetTasks()
177179
r := &pb.NewEnvironmentReply{
178-
Id: id.String(),
179-
State: newEnv.Sm.Current(),
180+
Environment: &pb.EnvironmentInfo{
181+
Id: newEnv.Id().String(),
182+
CreatedWhen: newEnv.CreatedWhen().Format(time.RFC3339),
183+
State: newEnv.CurrentState(),
184+
Tasks: tasksToShortTaskInfos(tasks),
185+
RootRole: newEnv.Workflow().GetName(),
186+
},
180187
}
181188

182189
return r, nil
@@ -197,17 +204,15 @@ func (m *RpcServer) GetEnvironment(cxt context.Context, req *pb.GetEnvironmentRe
197204
}
198205

199206
tasks := env.Workflow().GetTasks()
200-
taskNames := make([]string, len(tasks))
201-
for i, task := range tasks {
202-
taskNames[i] = task.GetName()
203-
}
204207
r := &pb.GetEnvironmentReply{
205208
Environment: &pb.EnvironmentInfo{
206209
Id: env.Id().String(),
207210
CreatedWhen: env.CreatedWhen().Format(time.RFC3339),
208211
State: env.CurrentState(),
209-
Tasks: taskNames,
212+
Tasks: tasksToShortTaskInfos(tasks),
213+
RootRole: env.Workflow().GetName(),
210214
},
215+
Workflow: workflowToRoleTree(env.Workflow()),
211216
}
212217
return r, nil
213218
}
@@ -291,17 +296,87 @@ func (m *RpcServer) GetTasks(context.Context, *pb.GetTasksRequest) (*pb.GetTasks
291296
m.state.RLock()
292297
defer m.state.RUnlock()
293298

299+
tasks := m.state.taskman.GetTasks()
294300
r := &pb.GetTasksReply{
295-
Tasks: make([]*pb.TaskInfo, 0, 0),
301+
Tasks: tasksToShortTaskInfos(tasks),
296302
}
297303

298-
for _, task := range m.state.taskman.GetTasks() {
299-
ri := &pb.TaskInfo{
300-
Locked: task.IsLocked(),
301-
Hostname: task.GetHostname(),
302-
Name: task.GetName(),
304+
return r, nil
305+
}
306+
307+
func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.GetTaskReply, error) {
308+
m.logMethod()
309+
m.state.RLock()
310+
defer m.state.RUnlock()
311+
312+
task := m.state.taskman.GetTask(req.TaskId)
313+
if task == nil {
314+
return &pb.GetTaskReply{}, status.New(codes.NotFound, "task not found").Err()
315+
}
316+
taskClass := task.GetTaskClass()
317+
commandInfo := task.BuildTaskCommand()
318+
var outbound []channel.Outbound
319+
taskPath := ""
320+
// TODO: probably not the nicest way to do this... the outbound assignments should be cached
321+
// in the Task
322+
if task.IsLocked() {
323+
type parentRole interface {
324+
CollectOutboundChannels() []channel.Outbound
325+
GetPath() string
326+
}
327+
parent, ok := task.GetParentRole().(parentRole)
328+
if ok {
329+
outbound = parent.CollectOutboundChannels()
330+
taskPath = parent.GetPath()
303331
}
304-
r.Tasks = append(r.Tasks, ri)
305332
}
306-
return r, nil
333+
334+
rep := &pb.GetTaskReply{
335+
Task: &pb.TaskInfo{
336+
ShortInfo: taskToShortTaskInfo(task),
337+
ClassInfo: &pb.TaskClassInfo{
338+
Name: task.GetClassName(),
339+
ControlMode: taskClass.Control.Mode.String(),
340+
},
341+
InboundChannels: inboundChannelsToPbChannels(taskClass.Bind),
342+
OutboundChannels: outboundChannelsToPbChannels(outbound),
343+
CommandInfo: commandInfoToPbCommandInfo(commandInfo),
344+
TaskPath: taskPath,
345+
EnvId: task.GetEnvironmentId().String(),
346+
},
347+
}
348+
return rep, nil
307349
}
350+
351+
func (m *RpcServer) CleanupTasks(context.Context, *pb.CleanupTasksRequest) (*pb.CleanupTasksReply, error) {
352+
log.WithPrefix("rpcserver").
353+
WithField("method", "CleanupTasks").
354+
Debug("implement me")
355+
356+
return &pb.CleanupTasksReply{}, status.New(codes.Unimplemented, "not implemented").Err()
357+
}
358+
359+
360+
func (m *RpcServer) GetRoles(cxt context.Context, req *pb.GetRolesRequest) (*pb.GetRolesReply, error) {
361+
m.logMethod()
362+
m.state.RLock()
363+
defer m.state.RUnlock()
364+
365+
if req == nil || len(req.EnvId) == 0 {
366+
return nil, status.New(codes.InvalidArgument, "received nil request").Err()
367+
}
368+
369+
env, err := m.state.environments.Environment(uuid.Parse(req.EnvId))
370+
if err != nil {
371+
return nil, status.Newf(codes.NotFound, "environment not found: %s", err.Error()).Err()
372+
}
373+
374+
resultRoles := env.QueryRoles(req.PathSpec)
375+
376+
roleInfos := make([]*pb.RoleInfo, len(resultRoles))
377+
for i, rr := range resultRoles {
378+
roleInfos[i] = workflowToRoleTree(rr)
379+
}
380+
return &pb.GetRolesReply{Roles: roleInfos}, nil
381+
}
382+

core/serverutil.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2018 CERN and copyright holders of ALICE O².
5+
* Author: Teo Mrnjavac <teo.mrnjavac@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
25+
package core
26+
27+
import (
28+
"github.com/AliceO2Group/Control/common"
29+
"github.com/AliceO2Group/Control/core/protos"
30+
"github.com/AliceO2Group/Control/core/task/channel"
31+
32+
"github.com/AliceO2Group/Control/core/task"
33+
"github.com/AliceO2Group/Control/core/workflow"
34+
)
35+
36+
func commandInfoToPbCommandInfo(c *common.TaskCommandInfo) (pci *pb.CommandInfo) {
37+
if c == nil {
38+
return
39+
}
40+
pci = &pb.CommandInfo{
41+
Env: c.GetEnv(),
42+
Shell: c.GetShell(),
43+
Value: c.GetValue(),
44+
Arguments: c.GetArguments(),
45+
User: c.GetUser(),
46+
}
47+
return
48+
}
49+
50+
func inboundChannelsToPbChannels(chs []channel.Inbound) (pchs []*pb.ChannelInfo) {
51+
if chs == nil {
52+
return
53+
}
54+
pchs = make([]*pb.ChannelInfo, len(chs))
55+
for i, c := range chs {
56+
pchs[i] = inboundChannelToPbChannel(c)
57+
}
58+
return
59+
}
60+
61+
func inboundChannelToPbChannel(ch channel.Inbound) (pch *pb.ChannelInfo) {
62+
pch = &pb.ChannelInfo{
63+
Name: ch.Name,
64+
Type: ch.Type.String(),
65+
}
66+
return
67+
}
68+
69+
func outboundChannelsToPbChannels(chs []channel.Outbound) (pchs []*pb.ChannelInfo) {
70+
if chs == nil {
71+
return
72+
}
73+
pchs = make([]*pb.ChannelInfo, len(chs))
74+
for i, c := range chs {
75+
pchs[i] = outboundChannelToPbChannel(c)
76+
}
77+
return
78+
}
79+
80+
func outboundChannelToPbChannel(ch channel.Outbound) (pch *pb.ChannelInfo) {
81+
pch = &pb.ChannelInfo{
82+
Name: ch.Name,
83+
Type: ch.Type.String(),
84+
Target: ch.Target,
85+
}
86+
return
87+
}
88+
89+
func taskToShortTaskInfo(t *task.Task) (sti *pb.ShortTaskInfo) {
90+
if t == nil {
91+
return
92+
}
93+
sti = &pb.ShortTaskInfo{
94+
Name: t.GetName(),
95+
Locked: t.IsLocked(),
96+
TaskId: t.GetTaskId(),
97+
Status: "UNKNOWN",
98+
State: "UNKNOWN",
99+
ClassName: t.GetClassName(),
100+
DeploymentInfo: &pb.TaskDeploymentInfo{
101+
Hostname: t.GetHostname(),
102+
AgentId: t.GetAgentId(),
103+
OfferId: t.GetOfferId(),
104+
ExecutorId: t.GetExecutorId(),
105+
},
106+
}
107+
parentRole, ok := t.GetParentRole().(workflow.Role)
108+
if ok && parentRole != nil {
109+
sti.Status = parentRole.GetStatus().String()
110+
sti.State = parentRole.GetState().String()
111+
}
112+
return
113+
}
114+
115+
func tasksToShortTaskInfos(tasks []*task.Task) (stis []*pb.ShortTaskInfo) {
116+
if tasks == nil {
117+
return
118+
}
119+
stis = make([]*pb.ShortTaskInfo, len(tasks))
120+
for i, t := range tasks {
121+
shortTaskInfo := taskToShortTaskInfo(t)
122+
stis[i] = shortTaskInfo
123+
}
124+
return
125+
}
126+
127+
func tasksToTaskIds(tasks []*task.Task) (ss []string) {
128+
if tasks == nil {
129+
return
130+
}
131+
ss = make([]string, len(tasks))
132+
for i, t := range tasks {
133+
ss[i] = t.GetTaskId()
134+
}
135+
return
136+
}
137+
138+
func workflowToRoleTree(root workflow.Role) (ri *pb.RoleInfo) {
139+
if root == nil {
140+
return
141+
}
142+
childRoles := root.GetRoles()
143+
childRoleInfos := make([]*pb.RoleInfo, len(childRoles))
144+
for i, cr := range childRoles {
145+
childRoleInfos[i] = workflowToRoleTree(cr)
146+
}
147+
ri = &pb.RoleInfo{
148+
Name: root.GetName(),
149+
Status: root.GetStatus().String(),
150+
State: root.GetState().String(),
151+
FullPath: root.GetPath(),
152+
TaskIds: tasksToTaskIds(root.GetTasks()),
153+
Roles: childRoleInfos,
154+
}
155+
return
156+
}

0 commit comments

Comments
 (0)