Skip to content

Commit fb0bc6b

Browse files
committed
[core] Initial plumbing for task cleanup support
1 parent 14ae6b8 commit fb0bc6b

6 files changed

Lines changed: 622 additions & 332 deletions

File tree

coconut/protos/o2control.pb.go

Lines changed: 267 additions & 151 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/protos/o2control.pb.go

Lines changed: 267 additions & 151 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/protos/o2control.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,10 @@ message TaskInfo {
227227
message CleanupTasksRequest {
228228
repeated string taskIds = 1;
229229
}
230-
message CleanupTasksReply {}
230+
message CleanupTasksReply {
231+
repeated ShortTaskInfo killedTasks = 1;
232+
repeated ShortTaskInfo runningTasks = 2;
233+
}
231234

232235
////////////////////////////////////////
233236
// Roles

core/server.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"runtime"
3131
"time"
3232

33+
"github.com/AliceO2Group/Control/core/task"
3334
"github.com/AliceO2Group/Control/core/task/channel"
3435
"golang.org/x/net/context"
3536
"google.golang.org/grpc"
@@ -348,12 +349,29 @@ func (m *RpcServer) GetTask(cxt context.Context, req *pb.GetTaskRequest) (*pb.Ge
348349
return rep, nil
349350
}
350351

351-
func (m *RpcServer) CleanupTasks(context.Context, *pb.CleanupTasksRequest) (*pb.CleanupTasksReply, error) {
352-
log.WithPrefix("rpcserver").
353-
WithField("method", "CleanupTasks").
354-
Debug("implement me")
352+
func (m *RpcServer) CleanupTasks(cxt context.Context, req *pb.CleanupTasksRequest) (*pb.CleanupTasksReply, error) {
353+
m.logMethod()
354+
m.state.Lock()
355+
defer m.state.Unlock()
356+
idsToKill := req.GetTaskIds()
357+
var(
358+
killedTasks, runningTasks task.Tasks
359+
err error
360+
)
361+
if len(idsToKill) == 0 { // by default we try to kill all, best effort
362+
killedTasks, runningTasks, err = m.state.taskman.Cleanup()
363+
} else {
364+
killedTasks, runningTasks, err = m.state.taskman.KillTasks(idsToKill)
365+
}
366+
367+
if err != nil {
368+
log.WithError(err).Error("task cleanup error")
369+
}
370+
killed := tasksToShortTaskInfos(killedTasks)
371+
running := tasksToShortTaskInfos(runningTasks)
355372

356-
return &pb.CleanupTasksReply{}, status.New(codes.Unimplemented, "not implemented").Err()
373+
// FIXME: implement doKillTasks in task.Manager, then remove codes.Unimplemented
374+
return &pb.CleanupTasksReply{KilledTasks: killed, RunningTasks: running}, status.New(codes.Unimplemented, "not implemented").Err()
357375
}
358376

359377

core/task/errors.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,65 +32,65 @@ import (
3232

3333
type TaskError interface {
3434
error
35-
GetTaskName() string
35+
GetTaskId() string
3636
}
3737

3838
type TasksError interface {
3939
error
40-
GetTaskNames() []string
40+
GetTaskIds() []string
4141
}
4242

4343
type taskErrorBase struct {
44-
taskName string
44+
taskId string
4545
}
46-
func (r taskErrorBase) GetTaskName() string {
47-
return r.taskName
46+
func (r taskErrorBase) GetTaskId() string {
47+
return r.taskId
4848
}
4949

5050
type tasksErrorBase struct {
51-
taskNames []string
51+
taskIds []string
5252
}
53-
func (r tasksErrorBase) GetTaskNames() []string {
54-
return r.taskNames
53+
func (r tasksErrorBase) GetTaskIds() []string {
54+
return r.taskIds
5555
}
5656

5757
type GenericTaskError struct {
5858
taskErrorBase
5959
message string
6060
}
6161
func (r GenericTaskError) Error() string {
62-
return fmt.Sprintf("task %s error: %s", r.taskName, r.message)
62+
return fmt.Sprintf("task %s error: %s", r.taskId, r.message)
6363
}
6464

6565
type GenericTasksError struct {
6666
tasksErrorBase
6767
message string
6868
}
6969
func (r GenericTasksError) Error() string {
70-
return fmt.Sprintf("tasks [%s] error: %s", strings.Join(r.taskNames, ", "), r.message)
70+
return fmt.Sprintf("tasks [%s] error: %s", strings.Join(r.taskIds, ", "), r.message)
7171
}
7272

7373
type TasksDeploymentError tasksErrorBase
7474
func (r TasksDeploymentError) Error() string {
75-
return fmt.Sprintf("deployment failed for tasks [%s]", r.taskNames)
75+
return fmt.Sprintf("deployment failed for tasks [%s]", r.taskIds)
7676
}
7777

7878
type TaskAlreadyReleasedError taskErrorBase
7979
func (r TaskAlreadyReleasedError) Error() string {
80-
return fmt.Sprintf("task %s already released", r.taskName)
80+
return fmt.Sprintf("task %s already released", r.taskId)
8181
}
8282

8383
type TaskNotFoundError taskErrorBase
8484
func (r TaskNotFoundError) Error() string {
85-
return fmt.Sprintf("task %s not found", r.taskName)
85+
return fmt.Sprintf("task %s not found", r.taskId)
8686
}
8787

8888
type TaskLockedError struct {
8989
taskErrorBase
9090
envId uuid.Array
9191
}
9292
func (r TaskLockedError) Error() string {
93-
return fmt.Sprintf("task %s is locked by environment %s", r.taskName, r.envId)
93+
return fmt.Sprintf("task %s is locked by environment %s", r.taskId, r.envId)
9494
}
9595
func (r TaskLockedError) EnvironmentId() uuid.Array {
9696
return r.envId

core/task/manager.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (m *Manager) AcquireTasks(envId uuid.Array, taskDescriptors Descriptors) (e
265265
taskPtr.parent = nil
266266
deployedTaskNames = append(deployedTaskNames, taskPtr.name)
267267
}
268-
err = TasksDeploymentError{taskNames: deployedTaskNames}
268+
err = TasksDeploymentError{taskIds: deployedTaskNames}
269269
}
270270

271271
// Finally, we write to the roster. Point of no return!
@@ -283,14 +283,6 @@ func (m *Manager) AcquireTasks(envId uuid.Array, taskDescriptors Descriptors) (e
283283
return
284284
}
285285

286-
func (m *Manager) TeardownTasks(roleNames []string) error {
287-
m.mu.Lock()
288-
defer m.mu.Unlock()
289-
//FIXME: implement
290-
291-
return nil
292-
}
293-
294286
func (m *Manager) ReleaseTasks(envId uuid.Array, tasks Tasks) error {
295287
m.mu.Lock()
296288
defer m.mu.Unlock()
@@ -315,7 +307,7 @@ func (m *Manager) releaseTask(envId uuid.Array, task *Task) error {
315307
return TaskNotFoundError{}
316308
}
317309
if task.IsLocked() && task.GetEnvironmentId() != envId {
318-
return TaskLockedError{taskErrorBase: taskErrorBase{taskName: task.name}, envId: envId}
310+
return TaskLockedError{taskErrorBase: taskErrorBase{taskId: task.name}, envId: envId}
319311
}
320312

321313
task.parent = nil
@@ -477,4 +469,49 @@ func (m *Manager) UpdateTaskStatus(status *mesos.TaskStatus) {
477469
}
478470
}
479471

472+
func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
473+
m.mu.Lock()
474+
defer m.mu.Unlock()
475+
476+
toKill := m.roster.Filtered(func(t *Task) bool {
477+
return !t.IsLocked()
478+
})
479+
480+
killed, running, err = m.doKillTasks(toKill)
481+
return
482+
}
483+
484+
func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) {
485+
m.mu.Lock()
486+
defer m.mu.Unlock()
487+
488+
toKill := m.roster.Filtered(func(t *Task) bool {
489+
if t.IsLocked() {
490+
return false
491+
}
492+
for _, id := range taskIds {
493+
if t.taskId == id {
494+
return true
495+
}
496+
}
497+
return false
498+
})
499+
500+
if len(toKill) < len(taskIds) {
501+
err = TaskNotFoundError{}
502+
return
503+
}
504+
505+
killed, running, err = m.doKillTasks(toKill)
506+
return
507+
}
508+
509+
func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err error) {
510+
// FIXME: implement
511+
// transition to done
512+
// if it fails, kill them all
513+
log.Error("no tasks killed: not implemented yet")
514+
return make(Tasks, 0), m.roster, nil
515+
}
516+
480517

0 commit comments

Comments
 (0)