Skip to content

Commit d93607e

Browse files
committed
ADNetClient is now an abstract interface. ZMQ and MPINet functionality moved to derived classes.
1 parent 2aab945 commit d93607e

14 files changed

Lines changed: 267 additions & 157 deletions

app/pshutdown.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ int main(int argc, char** argv){
1010
std::cout << "Usage: pshutdown <server address e.g. tcp://localhost:5559>" << std::endl;
1111
return 1;
1212
}
13-
ADNetClient ad;
13+
ADZMQNetClient ad;
1414
ad.connect_ps(0,0,argv[1]);
1515
ad.stopServer();
1616
ad.disconnect_ps();

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 84 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace chimbuko{
1818
public:
1919
ADNetClient();
2020

21-
~ADNetClient();
21+
virtual ~ADNetClient();
2222

2323
/**
2424
* @brief check if the parameter server is in use
@@ -35,13 +35,13 @@ namespace chimbuko{
3535
* @param srank server process rank. If using ZMQnet this is not applicable
3636
* @param sname server name. If using ZMQNet this is the server ip address, for MPINet it is not applicable
3737
*/
38-
void connect_ps(int rank, int srank = 0, std::string sname="MPINET");
38+
virtual void connect_ps(int rank, int srank = 0, std::string sname="MPINET") = 0;
3939
/**
4040
* @brief disconnect from the connected parameter server
4141
*
4242
* Called automatically by destructor if not previously called
4343
*/
44-
void disconnect_ps();
44+
virtual void disconnect_ps() = 0;
4545

4646
/**
4747
* @brief Return the MPI rank of the parameter server
@@ -58,7 +58,7 @@ namespace chimbuko{
5858
* @param msg The message
5959
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
6060
*/
61-
std::string send_and_receive(const Message &msg) const;
61+
virtual std::string send_and_receive(const Message &msg) const = 0;
6262

6363
/**
6464
* @brief Send a message to the parameter server and receive the response both as Message objects
@@ -74,7 +74,50 @@ namespace chimbuko{
7474
*/
7575
void linkPerf(PerfStats* perf){ m_perf = perf; }
7676

77+
protected:
78+
bool m_use_ps; /**< true if the parameter server is in use */
79+
int m_rank; /**< MPI rank of current process */
80+
int m_srank; /**< server process rank */
81+
PerfStats * m_perf; /**< Performance monitoring */
82+
};
83+
84+
7785
#ifdef _USE_ZMQNET
86+
/**
87+
* @brief Implementation of the ADNetClient interface for the ZMQNet network
88+
*/
89+
class ADZMQNetClient: public ADNetClient{
90+
public:
91+
92+
ADZMQNetClient();
93+
94+
~ADZMQNetClient();
95+
96+
/**
97+
* @brief connect to the parameter server
98+
*
99+
* @param rank this process rank
100+
* @param srank Ignored for this class
101+
* @param sname The server ip address
102+
*/
103+
void connect_ps(int rank, int srank = 0, std::string sname="MPINET") override;
104+
/**
105+
* @brief disconnect from the connected parameter server
106+
*
107+
* Called automatically by destructor if not previously called
108+
*/
109+
void disconnect_ps() override;
110+
111+
using ADNetClient::send_and_receive;
112+
113+
/**
114+
* @brief Send a message to the parameter server and receive the response in a serialized format
115+
* @param msg The message
116+
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
117+
*/
118+
std::string send_and_receive(const Message &msg) const override;
119+
120+
78121
/**
79122
* @brief Set the timeout on blocking receives. Must be called prior to connecting
80123
*/
@@ -94,22 +137,50 @@ namespace chimbuko{
94137
* @brief Issue a stop command to the server. The server will then stop once all clients have disconnected and all messages processed
95138
*/
96139
void stopServer() const;
97-
#endif
98-
99140

100141
private:
101-
bool m_use_ps; /**< true if the parameter server is in use */
102-
int m_rank; /**< MPI rank of current process */
103-
int m_srank; /**< server process rank */
104-
#ifdef _USE_MPINET
105-
MPI_Comm m_comm; /**< Instance of the MPI communicator */
106-
#else
107142
void* m_context; /**< ZeroMQ context */
108143
void* m_socket; /**< ZeroMQ socket */
109144
int m_recv_timeout_ms; /**< Timeout (in ms) on blocking receives (default 30s)*/
145+
};
110146
#endif
111-
PerfStats * m_perf; /**< Performance monitoring */
147+
148+
#ifdef _USE_MPINET
149+
/**
150+
* @brief Implementation of the ADNetClient interface for the MPINet network
151+
*/
152+
class ADMPINetClient: public ADNetClient{
153+
public:
154+
~ADMPINetClient();
155+
156+
/**
157+
* @brief connect to the parameter server
158+
*
159+
* @param rank this process rank
160+
* @param srank server process rank
161+
* @param sname Ignored for this class
162+
*/
163+
void connect_ps(int rank, int srank = 0, std::string sname="MPINET") override;
164+
/**
165+
* @brief disconnect from the connected parameter server
166+
*
167+
* Called automatically by destructor if not previously called
168+
*/
169+
void disconnect_ps() override;
170+
171+
/**
172+
* @brief Send a message to the parameter server and receive the response in a serialized format
173+
* @param msg The message
174+
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
175+
*/
176+
std::string send_and_receive(const Message &msg) const override;
177+
178+
179+
private:
180+
MPI_Comm m_comm; /**< Instance of the MPI communicator */
112181
};
182+
183+
#endif
113184

114185

115186

0 commit comments

Comments
 (0)