Skip to content

Commit 78ac922

Browse files
committed
[occ] Provide basic EventStream in OCCPlugin
1 parent 94d4c35 commit 78ac922

2 files changed

Lines changed: 61 additions & 0 deletions

File tree

occ/plugin/OccPluginServer.cxx

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,63 @@ OccPluginServer::OccPluginServer(fair::mq::PluginServices* pluginServices)
4646

4747
}
4848

49+
grpc::Status
50+
OccPluginServer::EventStream(grpc::ServerContext* context,
51+
const occ_pb::EventStreamRequest* request,
52+
grpc::ServerWriter<occ_pb::EventStreamReply>* writer)
53+
{
54+
std::cout << "[request EventStream] handler BEGIN" << std::endl;
55+
(void) context;
56+
(void) request;
57+
58+
std::mutex writer_mu;
59+
std::condition_variable finished;
60+
std::mutex finished_mu;
61+
62+
auto onDeviceStateChange = [&](fair::mq::PluginServices::DeviceState reachedState) {
63+
OLOG(DEBUG) << "[request EventStream] onDeviceStateChange BEGIN";
64+
std::lock_guard<std::mutex> lock(writer_mu);
65+
auto state = fair::mq::PluginServices::ToStr(reachedState);
66+
67+
OLOG(DEBUG) << "[request EventStream] onDeviceStateChange RESPONSE state: " << state;
68+
69+
if (state == "EXITING") {
70+
std::unique_lock<std::mutex> finished_lk(finished_mu);
71+
72+
auto nilEvent = new pb::DeviceEvent();
73+
nilEvent->set_type(pb::NULL_DEVICE_EVENT);
74+
pb::EventStreamReply response;
75+
response.mutable_event()->CopyFrom(*nilEvent);
76+
77+
writer->WriteLast(response, grpc::WriteOptions());
78+
delete nilEvent;
79+
finished.notify_one();
80+
OLOG(DEBUG) << "[request EventStream] onDeviceStateChange NOTIFY FINISHED";
81+
}
82+
OLOG(DEBUG) << "[request EventStream] onDeviceStateChange END";
83+
};
84+
85+
auto id = generateSubscriptionId("EventStream");
86+
87+
OLOG(DEBUG) << "[request EventStream] subscribe, id: " << id;
88+
m_pluginServices->SubscribeToDeviceStateChange(id, onDeviceStateChange);
89+
DEFER({
90+
m_pluginServices->UnsubscribeFromDeviceStateChange(id);
91+
OLOG(DEBUG) << "[request EventStream] unsubscribe, id: " << id;
92+
});
93+
94+
{
95+
OLOG(DEBUG) << "[request EventStream] blocking until END transition";
96+
std::unique_lock<std::mutex> lk(finished_mu);
97+
finished.wait(lk);
98+
OLOG(DEBUG) << "[request EventStream] transitioned to END, closing stream";
99+
}
100+
101+
std::cout << "[request StateStream] handler END" << std::endl;
102+
return ::grpc::Status::OK;
103+
104+
}
105+
49106
grpc::Status
50107
OccPluginServer::StateStream(grpc::ServerContext* context,
51108
const pb::StateStreamRequest* request,

occ/plugin/OccPluginServer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class OccPluginServer final : public pb::Occ::Service
6767
virtual ~OccPluginServer()
6868
{}
6969

70+
grpc::Status EventStream(grpc::ServerContext* context,
71+
const pb::EventStreamRequest* request,
72+
grpc::ServerWriter<pb::EventStreamReply>* writer) override;
73+
7074
grpc::Status StateStream(grpc::ServerContext* context,
7175
const pb::StateStreamRequest* request,
7276
grpc::ServerWriter<pb::StateStreamReply>* writer) override;

0 commit comments

Comments
 (0)