Skip to content

Commit 488164e

Browse files
committed
[executor] Forward DeviceEvents from OCC to core
1 parent dec1341 commit 488164e

3 files changed

Lines changed: 706 additions & 75 deletions

File tree

executor/executor.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"time"
4545

4646
"github.com/AliceO2Group/Control/common"
47+
"github.com/AliceO2Group/Control/common/event"
4748
"github.com/AliceO2Group/Control/common/logger"
4849
"github.com/AliceO2Group/Control/executor/executorcmd"
4950
"github.com/AliceO2Group/Control/executor/protos"
@@ -491,10 +492,10 @@ func launch(state *internalState, task mesos.TaskInfo) {
491492
} else {
492493
log.WithField("state", response.GetState()).WithField("task", task.Name).Debug("task status queried")
493494
}
495+
// NOTE: we acquire the transitioner-dependent STANDBY equivalent state
494496
reachedState := state.rpcClients[task.TaskID].FromDeviceState(response.GetState())
495497
state.mu.RUnlock()
496498

497-
// FIXME: we should compare the reached state against transitioner-dependent IDLE instead of plain IDLE
498499
if reachedState == "STANDBY" && err == nil {
499500
log.WithField("id", task.TaskID.Value).
500501
WithField("task", task.Name).
@@ -519,6 +520,23 @@ func launch(state *internalState, task mesos.TaskInfo) {
519520
}
520521
}
521522

523+
// Set up event stream from task
524+
state.mu.RLock()
525+
esc, err := state.rpcClients[task.TaskID].EventStream(context.TODO(), &pb.EventStreamRequest{}, grpc.EmptyCallOption{})
526+
state.mu.RUnlock()
527+
if err != nil {
528+
log.WithField("task", task.Name).WithError(err).Error("cannot set up event stream from task")
529+
status.State = mesos.TASK_FAILED.Enum()
530+
status.Message = protoString(err.Error())
531+
532+
state.mu.Lock()
533+
state.failedTasks[task.TaskID] = status
534+
state.rpcClients[task.TaskID].Close()
535+
delete(state.rpcClients, task.TaskID)
536+
state.mu.Unlock()
537+
return
538+
}
539+
522540
log.WithField("task", task.Name).Debug("notifying of task running state")
523541
status.State = mesos.TASK_RUNNING.Enum()
524542

@@ -561,6 +579,44 @@ func launch(state *internalState, task mesos.TaskInfo) {
561579
}
562580
state.mu.Unlock()
563581

582+
// Process events from task
583+
go func() {
584+
deo := event.DeviceEventOrigin{
585+
AgentId: task.AgentID,
586+
ExecutorId: task.GetExecutor().ExecutorID,
587+
TaskId: task.TaskID,
588+
}
589+
for {
590+
esr, err := esc.Recv()
591+
if err == io.EOF {
592+
log.WithError(err).Warning("event stream EOF")
593+
break
594+
}
595+
if err != nil {
596+
log.WithError(err).Warning("error receiving event from task")
597+
continue
598+
}
599+
ev := esr.GetEvent()
600+
601+
deviceEvent := event.NewDeviceEvent(deo, ev.GetType())
602+
603+
jsonEvent, err := json.Marshal(deviceEvent)
604+
if err != nil {
605+
log.WithError(err).Warning("error marshaling event from task")
606+
continue
607+
}
608+
609+
state.mu.RLock()
610+
state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(jsonEvent)))
611+
log.WithFields(logrus.Fields{
612+
"task": task.TaskID.Value,
613+
"event": string(jsonEvent),
614+
}).
615+
Debug("event sent")
616+
state.mu.RUnlock()
617+
}
618+
}()
619+
564620
err = taskCmd.Wait()
565621

566622
state.mu.Lock()

executor/executorcmd/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (r *RpcClient) doTransition(ei transitioner.EventInfo) (newState string, er
9898
log.WithField("event", ei.Evt).Debug("executor<->occplugin interface requesting transition")
9999
var response *pb.TransitionReply
100100
response, err = r.Transition(context.TODO(), &pb.TransitionRequest{
101-
Event: ei.Evt,
101+
TransitionEvent: ei.Evt,
102102
Arguments: func() (cfg []*pb.ConfigEntry) {
103103
cfg = make([]*pb.ConfigEntry, 0)
104104
if len(ei.Args) == 0 {
@@ -140,7 +140,7 @@ func (r *RpcClient) doTransition(ei transitioner.EventInfo) (newState string, er
140140
if response != nil &&
141141
response.GetOk() &&
142142
response.GetTrigger() == pb.StateChangeTrigger_EXECUTOR &&
143-
response.GetEvent() == ei.Evt &&
143+
response.GetTransitionEvent() == ei.Evt &&
144144
response.GetState() == ei.Dst {
145145
newState = response.GetState()
146146
err = nil
@@ -149,7 +149,7 @@ func (r *RpcClient) doTransition(ei transitioner.EventInfo) (newState string, er
149149
err = errors.New(fmt.Sprintf("transition unsuccessful: ok: %s, trigger: %s, event: %s, state: %s",
150150
strconv.FormatBool(response.GetOk()),
151151
response.GetTrigger().String(),
152-
response.GetEvent(),
152+
response.GetTransitionEvent(),
153153
response.GetState()))
154154
} else {
155155
newState = ""

0 commit comments

Comments
 (0)