Skip to content

Commit 0ed4a0a

Browse files
committed
[core] task.Manager finer-grained locking and task.State propagation
1 parent 9015006 commit 0ed4a0a

1 file changed

Lines changed: 34 additions & 5 deletions

File tree

core/task/manager.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,10 @@ func (m*Manager) NewTaskForMesosOffer(offer *mesos.Offer, descriptor *Descriptor
8787
offerId: offer.ID.Value,
8888
taskId: newId,
8989
executorId: executorId.Value,
90-
getTaskClass: nil,
90+
GetTaskClass: nil,
9191
bindPorts: nil,
9292
}
93-
t.getTaskClass = func() *TaskClass {
93+
t.GetTaskClass = func() *TaskClass {
9494
return m.GetTaskClass(t.className)
9595
}
9696
t.bindPorts = make(map[string]uint64)
@@ -286,7 +286,7 @@ func (m *Manager) AcquireTasks(envId uuid.Array, taskDescriptors Descriptors) (e
286286
func (m *Manager) TeardownTasks(roleNames []string) error {
287287
m.mu.Lock()
288288
defer m.mu.Unlock()
289-
//TODO: implement
289+
//FIXME: implement
290290

291291
return nil
292292
}
@@ -325,11 +325,11 @@ func (m *Manager) releaseTask(envId uuid.Array, task *Task) error {
325325

326326
func (m *Manager) ConfigureTasks(envId uuid.Array, tasks Tasks) error {
327327
m.mu.Lock()
328-
defer m.mu.Unlock()
329328

330329
notify := make(chan controlcommands.MesosCommandResponse)
331330
receivers, err := tasks.GetMesosCommandTargets()
332331
if err != nil {
332+
m.mu.Unlock()
333333
return err
334334
}
335335

@@ -350,9 +350,11 @@ func (m *Manager) ConfigureTasks(envId uuid.Array, tasks Tasks) error {
350350
args := make(controlcommands.PropertyMapsMap)
351351
args, err = tasks.BuildPropertyMaps(bindMap)
352352
if err != nil {
353+
m.mu.Unlock()
353354
return err
354355
}
355356
log.WithField("map", pp.Sprint(args)).Debug("pushing configuration to tasks")
357+
m.mu.Unlock()
356358

357359
cmd := controlcommands.NewMesosCommand_Transition(receivers, src, event, dest, args)
358360
m.cq.Enqueue(cmd, notify)
@@ -424,7 +426,34 @@ func (m *Manager) GetTasks() Tasks {
424426
return m.roster
425427
}
426428

427-
func (m *Manager) UpdateTask(status *mesos.TaskStatus) {
429+
func (m *Manager) GetTask(id string) *Task {
430+
if m == nil {
431+
return nil
432+
}
433+
for _, t := range m.roster {
434+
if t.taskId == id {
435+
return t
436+
}
437+
}
438+
return nil
439+
}
440+
441+
func (m *Manager) UpdateTaskState(taskId string, state string) {
442+
m.mu.Lock()
443+
defer m.mu.Unlock()
444+
445+
taskPtr := m.roster.GetByTaskId(taskId)
446+
if taskPtr == nil {
447+
log.WithField("taskId", taskId).
448+
Warn("attempted state update of task not in roster")
449+
return
450+
}
451+
452+
st := StateFromString(state)
453+
taskPtr.parent.UpdateState(st)
454+
}
455+
456+
func (m *Manager) UpdateTaskStatus(status *mesos.TaskStatus) {
428457
m.mu.Lock()
429458
defer m.mu.Unlock()
430459

0 commit comments

Comments
 (0)