Skip to content

Commit 6bf15f9

Browse files
committed
[core] Handle incoming DeviceEvents
1 parent 488164e commit 6bf15f9

1 file changed

Lines changed: 87 additions & 26 deletions

File tree

core/scheduler.go

Lines changed: 87 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ import (
3737
"strings"
3838
"time"
3939

40+
"github.com/AliceO2Group/Control/common/event"
4041
"github.com/AliceO2Group/Control/core/controlcommands"
42+
"github.com/AliceO2Group/Control/core/environment"
4143
"github.com/AliceO2Group/Control/core/task"
4244
"github.com/AliceO2Group/Control/core/task/constraint"
45+
"github.com/AliceO2Group/Control/executor/protos"
4346
"github.com/mesos/mesos-go/api/v1/lib"
4447
"github.com/mesos/mesos-go/api/v1/lib/backoff"
4548
xmetrics "github.com/mesos/mesos-go/api/v1/lib/extras/metrics"
@@ -237,45 +240,103 @@ func incomingMessageHandler(state *internalState, fidStore store.Singleton) even
237240

238241
data := mesosMessage.GetData()
239242

240-
var incoming struct {
241-
CommandName string `json:"name"`
243+
var incomingType struct {
244+
MessageType string `json:"_messageType"`
242245
}
243-
err = json.Unmarshal(data, &incoming)
246+
err = json.Unmarshal(data, &incomingType)
244247
if err != nil {
245248
return
246249
}
247250

248-
log.WithPrefix("scheduler").WithField("commandName", incoming.CommandName).Debug("processing incoming MESSAGE")
249-
switch incoming.CommandName {
250-
case "MesosCommand_Transition":
251-
var res controlcommands.MesosCommandResponse_Transition
252-
err = json.Unmarshal(data, &res)
251+
switch incomingType.MessageType {
252+
case "DeviceEvent":
253+
var incomingEvent struct {
254+
Type pb.DeviceEventType `json:"type"`
255+
Origin event.DeviceEventOrigin `json:"origin"`
256+
}
257+
err = json.Unmarshal(data, &incomingEvent)
253258
if err != nil {
254-
log.WithPrefix("scheduler").WithFields(logrus.Fields{
255-
"commandName": incoming.CommandName,
256-
"agentId": agentId.GetValue(),
257-
"executorId": executorId.GetValue(),
258-
"message": string(data[:]),
259-
"error": err.Error(),
259+
return
260+
}
261+
ev := event.NewDeviceEvent(incomingEvent.Origin, incomingEvent.Type)
262+
if ev != nil {
263+
handleDeviceEvent(state, ev)
264+
} else {
265+
log.WithFields(logrus.Fields{
266+
"type": incomingEvent.Type.String(),
267+
"originTask": incomingEvent.Origin.TaskId.Value,
260268
}).
261-
Error("cannot unmarshal incoming MESSAGE")
269+
Error("cannot handle incoming device event")
270+
}
271+
272+
case "MesosCommandResponse":
273+
var incomingCommand struct {
274+
CommandName string `json:"name"`
275+
}
276+
err = json.Unmarshal(data, &incomingCommand)
277+
if err != nil {
262278
return
263279
}
264-
sender := controlcommands.MesosCommandTarget{
265-
AgentId: agentId,
266-
ExecutorId: executorId,
267-
TaskId: mesos.TaskID{Value: res.TaskId},
280+
281+
log.WithPrefix("scheduler").WithField("commandName", incomingCommand.CommandName).Debug("processing incoming MESSAGE")
282+
switch incomingCommand.CommandName {
283+
case "MesosCommand_Transition":
284+
var res controlcommands.MesosCommandResponse_Transition
285+
err = json.Unmarshal(data, &res)
286+
if err != nil {
287+
log.WithPrefix("scheduler").WithFields(logrus.Fields{
288+
"commandName": incomingCommand.CommandName,
289+
"agentId": agentId.GetValue(),
290+
"executorId": executorId.GetValue(),
291+
"message": string(data[:]),
292+
"error": err.Error(),
293+
}).
294+
Error("cannot unmarshal incoming MESSAGE")
295+
return
296+
}
297+
sender := controlcommands.MesosCommandTarget{
298+
AgentId: agentId,
299+
ExecutorId: executorId,
300+
TaskId: mesos.TaskID{Value: res.TaskId},
301+
}
302+
303+
go func() {
304+
state.taskman.UpdateTaskState(res.TaskId, res.CurrentState)
305+
state.servent.ProcessResponse(&res, sender)
306+
}()
307+
return
308+
default:
309+
return errors.New(fmt.Sprintf("unrecognized response for controlcommand %s", incomingCommand.CommandName))
268310
}
311+
}
312+
return
313+
}
314+
}
315+
316+
func handleDeviceEvent(state *internalState, evt event.DeviceEvent) {
317+
if evt == nil {
318+
log.WithPrefix("scheduler").Error("cannot handle null DeviceEvent")
319+
return
320+
}
269321

270-
go func() {
271-
state.taskman.UpdateTaskState(res.TaskId, res.CurrentState)
272-
state.servent.ProcessResponse(&res, sender)
273-
}()
322+
switch evt.GetType() {
323+
case pb.DeviceEventType_END_OF_DATA:
324+
taskId := evt.GetOrigin().TaskId
325+
t := state.taskman.GetTask(taskId.Value)
326+
if t == nil {
327+
log.WithPrefix("scheduler").Error("cannot find task for DeviceEvent")
274328
return
275-
default:
276-
return errors.New(fmt.Sprintf("unrecognized response for controlcommand %s", incoming.CommandName))
277329
}
278-
return
330+
env, err := state.environments.Environment(t.GetEnvironmentId().UUID())
331+
if err != nil {
332+
log.WithPrefix("scheduler").WithError(err).Error("cannot find environment for DeviceEvent")
333+
}
334+
if env.CurrentState() == "RUNNING" {
335+
err = env.TryTransition(environment.NewStopActivityTransition(state.taskman))
336+
if err != nil {
337+
log.WithPrefix("scheduler").WithError(err).Error("cannot stop run after END_OF_DATA event")
338+
}
339+
}
279340
}
280341
}
281342

0 commit comments

Comments
 (0)