Skip to content

Commit bb8d9c2

Browse files
committed
To ADNetClient interface added function prototype for async_send, defaults to blocking send
ADThreadNetClient is now a proper derived class of the ADNetClient interface
1 parent a81237c commit bb8d9c2

2 files changed

Lines changed: 41 additions & 39 deletions

File tree

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ namespace chimbuko{
6262
* @param msg The message
6363
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
6464
*/
65-
virtual std::string send_and_receive(const Message &msg) const = 0;
65+
virtual std::string send_and_receive(const Message &msg) = 0;
6666

6767
/**
6868
* @brief Send a message to the parameter server and receive the response both as Message objects
@@ -73,6 +73,18 @@ namespace chimbuko{
7373
*/
7474
void send_and_receive(Message &recv, const Message &send);
7575

76+
/**
77+
* @brief Perform a non-blocking send operation
78+
*
79+
* Not all net clients support non-blocking sends, in which case it will default to a blocking send
80+
*/
81+
virtual void async_send(const Message &send);
82+
83+
/**
84+
* @brief Check if a net client supports non-blocking sends
85+
*/
86+
virtual bool async_send_supported() const{ return false; }
87+
7688
/**
7789
* @brief If linked timing and packet size information will be gathered
7890
*/
@@ -124,7 +136,7 @@ namespace chimbuko{
124136
* @param msg The message
125137
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
126138
*/
127-
std::string send_and_receive(const Message &msg) const override;
139+
std::string send_and_receive(const Message &msg) override;
128140

129141

130142
/**
@@ -145,7 +157,7 @@ namespace chimbuko{
145157
/**
146158
* @brief Issue a stop command to the server. The server will then stop once all clients have disconnected and all messages processed
147159
*/
148-
void stopServer() const;
160+
void stopServer();
149161

150162
private:
151163
void* m_context; /**< ZeroMQ context */
@@ -184,7 +196,7 @@ namespace chimbuko{
184196
* @param msg The message
185197
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
186198
*/
187-
std::string send_and_receive(const Message &msg) const override;
199+
std::string send_and_receive(const Message &msg) override;
188200

189201

190202
private:
@@ -222,14 +234,14 @@ namespace chimbuko{
222234
* @param msg The message
223235
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
224236
*/
225-
std::string send_and_receive(const Message &msg) const override;
237+
std::string send_and_receive(const Message &msg) override;
226238
};
227239

228240

229241
/**
230242
* @brief ADNetClient inside a worker thread with blocking send/receive and non-blocking send
231243
*/
232-
class ADThreadNetClient{
244+
class ADThreadNetClient: public ADNetClient{
233245
public:
234246
/**
235247
* @brief Virtual class representing actions performed by the worker thread
@@ -257,12 +269,7 @@ namespace chimbuko{
257269
std::thread worker;
258270
mutable std::mutex m;
259271
std::queue<ClientAction*> queue; /**< The queue of net operations*/
260-
261-
int m_rank;
262-
int m_srank;
263-
bool m_use_ps;
264-
PerfStats * m_perf;
265-
272+
266273
/**
267274
* @brief Get the number of outstanding net operations
268275
*/
@@ -302,35 +309,24 @@ namespace chimbuko{
302309
*/
303310
void disconnect_ps();
304311

312+
using ADNetClient::send_and_receive;
313+
305314
/**
306315
* @brief Perform a blocking send and receive operation
316+
* @param msg The message
317+
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
307318
*/
308-
void send_and_receive(Message &recv, const Message &send);
319+
std::string send_and_receive(const Message &send) override;
309320

310321
/**
311322
* @brief Perform a non-blocking send operation
312323
*/
313-
void async_send(const Message &send);
314-
315-
/**
316-
* @brief Is the parameter server in use / connected?
317-
*/
318-
bool use_ps() const { return m_use_ps; }
319-
320-
/**
321-
* @brief Link a performance monitoring instance
322-
*/
323-
void linkPerf(PerfStats* perf){ m_perf = perf; }
324+
void async_send(const Message &send) override;
324325

325326
/**
326-
* @brief Get the MPI rank of the server (MPINet)
327+
* @brief Check if a net client supports non-blocking sends
327328
*/
328-
int get_server_rank() const{ return m_srank; }
329-
330-
/**
331-
* @brief Get the MPI rank of the client
332-
*/
333-
int get_client_rank() const{ return m_rank; }
329+
bool async_send_supported() const override{ return true; }
334330

335331
/**
336332
* @brief Set a timeout (in ms) on receiving a response message

src/ad/ADNetClient.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ void ADNetClient::send_and_receive(Message &recv, const Message &send){
1313
recv.set_msg( send_and_receive(send), true );
1414
}
1515

16+
void ADNetClient::async_send(const Message &send){
17+
send_and_receive(send);
18+
}
19+
1620

1721
#ifdef _USE_ZMQNET
1822

@@ -89,7 +93,7 @@ void ADZMQNetClient::disconnect_ps() {
8993
}
9094

9195

92-
std::string ADZMQNetClient::send_and_receive(const Message &msg) const{
96+
std::string ADZMQNetClient::send_and_receive(const Message &msg){
9397
PerfTimer timer;
9498
std::string send_msg = msg.data(), recv_msg;
9599
//Send local parameters to PS
@@ -109,7 +113,7 @@ std::string ADZMQNetClient::send_and_receive(const Message &msg) const{
109113
return recv_msg;
110114
}
111115

112-
void ADZMQNetClient::stopServer() const{
116+
void ADZMQNetClient::stopServer(){
113117
verboseStream << "Client is sending stop request to server" << std::endl;
114118
Message msg;
115119
msg.set_info(0, 0, MessageType::REQ_QUIT, MessageKind::CMD);
@@ -190,7 +194,7 @@ void ADMPINetClient::disconnect_ps() {
190194
m_use_ps = false;
191195
}
192196

193-
std::string ADMPINetClient::send_and_receive(const Message &msg) const{
197+
std::string ADMPINetClient::send_and_receive(const Message &msg){
194198
PerfTimer timer;
195199
std::string send_msg = msg.data(), recv_msg;
196200
MessageType req_type = MessageType(msg.type());
@@ -245,7 +249,7 @@ void ADLocalNetClient::disconnect_ps(){
245249
}
246250

247251

248-
std::string ADLocalNetClient::send_and_receive(const Message &msg) const{
252+
std::string ADLocalNetClient::send_and_receive(const Message &msg){
249253
if(!m_use_ps) fatal_error("User should call connect_ps prior to sending/receiving messages");
250254

251255
PerfTimer timer;
@@ -302,14 +306,14 @@ struct ClientActionWait: public ADThreadNetClient::ClientAction{
302306
struct ClientActionBlockingSendReceive: public ADThreadNetClient::ClientAction{
303307
std::mutex m;
304308
std::condition_variable cv;
305-
Message *recv;
309+
std::string *recv;
306310
Message const *send; //it's blocking so we know that the object will live long enough
307311
bool complete;
308312

309-
ClientActionBlockingSendReceive(Message *recv, Message const *send): send(send), recv(recv), complete(false){}
313+
ClientActionBlockingSendReceive(std::string *recv, Message const *send): send(send), recv(recv), complete(false){}
310314

311315
void perform(ADNetClient &client){
312-
client.send_and_receive(*recv, *send);
316+
*recv = client.send_and_receive(*send);
313317

314318
{
315319
std::unique_lock<std::mutex> lk(m);
@@ -418,10 +422,12 @@ void ADThreadNetClient::connect_ps(int rank, int srank, std::string sname){
418422
void ADThreadNetClient::disconnect_ps(){
419423
enqueue_action(new ClientActionDisconnect());
420424
}
421-
void ADThreadNetClient::send_and_receive(Message &recv, const Message &send){
425+
std::string ADThreadNetClient::send_and_receive(const Message &send){
426+
std::string recv;
422427
ClientActionBlockingSendReceive action(&recv, &send);
423428
enqueue_action(&action);
424429
action.wait_for();
430+
return recv;
425431
}
426432
void ADThreadNetClient::async_send(const Message &send){
427433
enqueue_action(new ClientActionAsyncSend(send));

0 commit comments

Comments
 (0)