Skip to content

Commit 955ac46

Browse files
committed
[occ] Emit END_OF_DATA event from OccServer
1 parent 934211c commit 955ac46

4 files changed

Lines changed: 83 additions & 11 deletions

File tree

occ/occlib/OccServer.cxx

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,42 @@ OccServer::~OccServer()
5151
m_checkerThread.join();
5252
}
5353

54+
grpc::Status OccServer::EventStream(grpc::ServerContext* context,
55+
const occ_pb::EventStreamRequest* request,
56+
grpc::ServerWriter<occ_pb::EventStreamReply>* writer)
57+
{
58+
std::cout << "[request StateStream] handler BEGIN" << std::endl;
59+
60+
boost::uuids::basic_random_generator<boost::mt19937> gen;
61+
std::string id = boost::uuids::to_string(gen());
62+
63+
boost::lockfree::queue<pb::DeviceEvent*> eventQueue;
64+
m_eventQueues[id] = &eventQueue;
65+
DEFER({
66+
m_eventQueues.erase(id);
67+
});
68+
69+
bool isStreamOpen = true;
70+
while (!m_destroying && isStreamOpen) {
71+
pb::DeviceEvent *newEvent;
72+
bool ok = eventQueue.pop(newEvent);
73+
if (!ok) { // queue empty, sleep and retry
74+
std::this_thread::sleep_for(2ms);
75+
continue;
76+
}
77+
78+
pb::EventStreamReply response;
79+
if (newEvent) {
80+
response.mutable_event()->CopyFrom(*newEvent);
81+
isStreamOpen = writer->Write(response);
82+
delete newEvent;
83+
}
84+
}
85+
86+
std::cout << "[request StateStream] handler END" << std::endl;
87+
return ::grpc::Status::OK;
88+
}
89+
5490
grpc::Status OccServer::StateStream(grpc::ServerContext* context,
5591
const pb::StateStreamRequest* request,
5692
grpc::ServerWriter<pb::StateStreamReply>* writer)
@@ -132,9 +168,9 @@ grpc::Status OccServer::Transition(grpc::ServerContext* context,
132168
}
133169

134170
std::string srcStateStr = request->srcstate();
135-
std::string event = request->event();
171+
std::string event = request->transitionevent();
136172
auto arguments = request->arguments();
137-
const std::string finalState = EXPECTED_FINAL_STATE.at(request->event());
173+
const std::string finalState = EXPECTED_FINAL_STATE.at(request->transitionevent());
138174

139175
t_State currentState = m_rco->getState();
140176
std::string currentStateStr = getStringFromState(currentState);
@@ -161,7 +197,7 @@ grpc::Status OccServer::Transition(grpc::ServerContext* context,
161197
std::string newStateStr = getStringFromState(newState);
162198

163199
response->set_state(newStateStr);
164-
response->set_event(request->event());
200+
response->set_transitionevent(request->transitionevent());
165201
response->set_ok(newStateStr == finalState);
166202
if (newState == error) { // ERROR state
167203
response->set_trigger(pb::DEVICE_ERROR);
@@ -329,6 +365,16 @@ void OccServer::publishState(t_State s)
329365
}
330366
}
331367

368+
void OccServer::pushEvent(pb::DeviceEvent* event)
369+
{
370+
for (auto item : m_eventQueues) {
371+
item.second->push(event);
372+
}
373+
printf("Object: %s - pushing event = %s\n",
374+
m_rco->getName().c_str(),
375+
pb::DeviceEventType_Name(event->type()).c_str());
376+
}
377+
332378
bool OccServer::checkMachineDone()
333379
{
334380
std::lock_guard<std::mutex> lock(m_mu);
@@ -350,7 +396,12 @@ void OccServer::runChecker()
350396
// execute periodic actions, as defined for t_State::running
351397
if (currentState == t_State::running) {
352398
int err = m_rco->iterateRunning();
353-
if (err) {
399+
if (err == 1) { // signal EndOfData event
400+
auto eodEvent = new pb::DeviceEvent;
401+
eodEvent->set_type(pb::END_OF_DATA);
402+
pushEvent(eodEvent);
403+
}
404+
else if (err) {
354405
updateState(t_State::error);
355406
}
356407
}
@@ -367,4 +418,4 @@ void OccServer::runChecker()
367418
std::this_thread::sleep_for(1ms);
368419
}
369420
}
370-
}
421+
}

occ/occlib/OccServer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ class OccServer final : public pb::Occ::Service
7070
*/
7171
virtual ~OccServer();
7272

73+
grpc::Status EventStream(grpc::ServerContext* context,
74+
const pb::EventStreamRequest* request,
75+
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
76+
7377
grpc::Status StateStream(grpc::ServerContext* context,
7478
const pb::StateStreamRequest* request,
7579
grpc::ServerWriter<pb::StateStreamReply>* writer) override;
@@ -89,6 +93,7 @@ class OccServer final : public pb::Occ::Service
8993
void updateState(t_State s);
9094

9195
void publishState(t_State s);
96+
void pushEvent(pb::DeviceEvent* event);
9297

9398
void runChecker();
9499

@@ -100,6 +105,7 @@ class OccServer final : public pb::Occ::Service
100105
bool m_machineDone;
101106

102107
std::unordered_map<std::string, boost::lockfree::queue<t_State>* > m_stateQueues;
108+
std::unordered_map<std::string, boost::lockfree::queue<pb::DeviceEvent*>* > m_eventQueues;
103109
};
104110

105111

occ/plugin/OccPluginServer.cxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ OccPluginServer::Transition(grpc::ServerContext* context,
177177
}
178178

179179
std::string srcState = request->srcstate();
180-
std::string event = request->event();
180+
std::string event = request->transitionevent();
181181
auto arguments = request->arguments();
182182

183183
std::string currentState = fair::mq::PluginServices::ToStr(m_pluginServices->GetCurrentDeviceState());
@@ -311,8 +311,8 @@ OccPluginServer::Transition(grpc::ServerContext* context,
311311
return grpc::Status(grpc::INTERNAL, "cannot request transition, OCC plugin has no device control");
312312
}
313313
catch (std::out_of_range& e) {
314-
OLOG(ERROR) << "[request Transition] invalid transition name: " << request->event();
315-
return grpc::Status(grpc::INVALID_ARGUMENT, "argument " + request->event() + " is not a valid transition name");
314+
OLOG(ERROR) << "[request Transition] invalid transition name: " << request->transitionevent();
315+
return grpc::Status(grpc::INVALID_ARGUMENT, "argument " + request->transitionevent() + " is not a valid transition name");
316316
}
317317

318318
{
@@ -362,7 +362,7 @@ OccPluginServer::Transition(grpc::ServerContext* context,
362362
}
363363

364364
response->set_state(newStates.back());
365-
response->set_event(request->event());
365+
response->set_transitionevent(request->transitionevent());
366366
response->set_ok(newStates.back() == finalState);
367367
if (newStates.back() == "ERROR") { // ERROR state
368368
response->set_trigger(pb::DEVICE_ERROR);

occ/protos/occ.proto

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ option go_package = "pb";
3333
service Occ {
3434
// We have to have a notification stream because the FairMQDevice might transition
3535
// on its own for whatever reason.
36+
rpc EventStream (EventStreamRequest) returns (stream EventStreamReply) {}
3637
rpc StateStream (StateStreamRequest) returns (stream StateStreamReply) {}
3738
rpc GetState (GetStateRequest) returns (GetStateReply) {}
3839
rpc Transition (TransitionRequest) returns (TransitionReply) {}
@@ -49,12 +50,26 @@ enum StateType {
4950
STATE_INTERMEDIATE = 1;
5051
}
5152

53+
enum DeviceEventType {
54+
NULL_DEVICE_EVENT = 0;
55+
END_OF_DATA = 1;
56+
}
57+
5258
message StateStreamRequest {}
59+
5360
message StateStreamReply {
5461
StateType type = 1;
5562
string state = 2;
5663
}
5764

65+
message EventStreamRequest {}
66+
message DeviceEvent {
67+
DeviceEventType type = 1;
68+
}
69+
message EventStreamReply {
70+
DeviceEvent event = 1;
71+
}
72+
5873
message GetStateRequest {}
5974
message GetStateReply {
6075
string state = 1;
@@ -66,12 +81,12 @@ message ConfigEntry {
6681
}
6782
message TransitionRequest {
6883
string srcState = 1;
69-
string event = 2;
84+
string transitionEvent = 2;
7085
repeated ConfigEntry arguments = 3;
7186
}
7287
message TransitionReply {
7388
StateChangeTrigger trigger = 1;
7489
string state = 2;
75-
string event = 3;
90+
string transitionEvent = 3;
7691
bool ok = 4;
7792
}

0 commit comments

Comments
 (0)