Skip to content

Commit 560b750

Browse files
committed
temp debug changes: needs fix
1 parent 908a737 commit 560b750

10 files changed

Lines changed: 520 additions & 20 deletions

app/sstSinker.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
#include "chimbuko/util/commandLineParser.hpp"
55
#include "chimbuko/util/string.hpp"
66

7+
#ifndef ENABLE_PROVDB
8+
#error "Provdb flag not set"
9+
#endif
10+
711
using namespace chimbuko;
812
using namespace std::chrono;
913

include/chimbuko/ad/ADLocalCounterStatistics.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <chimbuko/ad/ADEvent.hpp>
66
#include <chimbuko/ad/ADCounter.hpp>
77
#include "chimbuko/util/PerfStats.hpp"
8+
//#include "chimbuko/chimbuko.hpp"
89

910
namespace chimbuko{
1011

@@ -76,7 +77,7 @@ namespace chimbuko{
7677
*
7778
* The message string is the output of get_json_state() in string format
7879
*/
79-
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client) const;
80+
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
8081

8182
/**
8283
* @brief Attach a PerfStats object into which performance metrics are accumulated
@@ -116,7 +117,7 @@ namespace chimbuko{
116117
* @param step step (or frame) number
117118
* @return std::pair<size_t, size_t> [sent, recv] message size
118119
*/
119-
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step);
120+
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
120121

121122
unsigned long m_program_idx; /**< Program idx*/
122123
int m_step; /**< io step */

include/chimbuko/ad/ADLocalFuncStatistics.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <chimbuko/ad/AnomalyData.hpp>
66
#include <chimbuko/util/Anomalies.hpp>
77
#include "chimbuko/util/PerfStats.hpp"
8+
#include "chimbuko/chimbuko.hpp"
89

910
namespace chimbuko{
1011

@@ -75,7 +76,7 @@ namespace chimbuko{
7576
*
7677
* The message communicated is the string dump of the output of get_json_state()
7778
*/
78-
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client) const;
79+
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
7980

8081

8182
/**
@@ -107,7 +108,7 @@ namespace chimbuko{
107108
* @param step step (or frame) number
108109
* @return std::pair<size_t, size_t> [sent, recv] message size
109110
*/
110-
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step);
111+
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
111112

112113
int m_step; /**< io step */
113114
unsigned long m_rank; /**< Rank*/

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 161 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
#endif
88
#include "chimbuko/message.hpp"
99
#include "chimbuko/util/PerfStats.hpp"
10-
10+
#include "chimbuko/util/string.hpp"
11+
#include "chimbuko/util/error.hpp"
12+
#include "chimbuko/util/time.hpp"
13+
1114
namespace chimbuko{
1215

1316

@@ -113,6 +116,163 @@ namespace chimbuko{
113116

114117

115118

119+
//Actions performed by the worker thread
120+
struct ClientAction{
121+
virtual void perform(ADNetClient &client) = 0;
122+
virtual bool do_delete() const = 0; //whether to delete the work object after completion
123+
virtual bool shutdown_worker() const{ return false; } //whether to shutdown the worker after completing the action
124+
virtual ~ClientAction(){}
125+
};
126+
127+
struct ClientActionConnect: public ClientAction{
128+
int rank;
129+
int srank;
130+
std::string sname;
131+
132+
ClientActionConnect(int rank, int srank, const std::string &sname): rank(rank), srank(srank), sname(sname){}
133+
134+
void perform(ADNetClient &client){
135+
std::cout << "Connecting to client" << std::endl;
136+
client.connect_ps(rank, srank, sname);
137+
}
138+
bool do_delete() const{ return true; }
139+
};
140+
141+
struct ClientActionDisconnect: public ClientAction{
142+
void perform(ADNetClient &client){
143+
std::cout << "Disconnecting from client" << std::endl;
144+
client.disconnect_ps();
145+
}
146+
bool do_delete() const{ return true; }
147+
bool shutdown_worker() const{ return true; }
148+
};
149+
150+
//Make the worker wait for some time, for testing
151+
struct ClientActionWait: public ClientAction{
152+
size_t wait_ms;
153+
ClientActionWait(size_t wait_ms): wait_ms(wait_ms){}
154+
155+
void perform(ADNetClient &client){
156+
std::cout << "Worker is waiting for "<< wait_ms << "ms" << std::endl;
157+
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
158+
}
159+
bool do_delete() const{ return true; }
160+
};
161+
162+
struct ClientActionBlockingSendReceive: public ClientAction{
163+
std::mutex m;
164+
std::condition_variable cv;
165+
Message *recv;
166+
Message const *send; //it's blocking so we know that the object will live long enough
167+
bool complete;
168+
169+
ClientActionBlockingSendReceive(Message *recv, Message const *send): send(send), recv(recv), complete(false){}
170+
171+
void perform(ADNetClient &client){
172+
std::cout << "Performing blocking send and receive" << std::endl;
173+
client.send_and_receive(*recv, *send);
174+
complete = true;
175+
cv.notify_one();
176+
}
177+
bool do_delete() const{ return false; }
178+
179+
void wait_for(){
180+
std::unique_lock<std::mutex> l(m);
181+
cv.wait(l, [&]{ return complete; });
182+
}
183+
};
184+
185+
//Return message is just dumped
186+
struct ClientActionAsyncSend: public ClientAction{
187+
Message send; //copy of send message because we don't know how long it will be before it sends
188+
189+
ClientActionAsyncSend(const Message &send): send(send){}
190+
191+
void perform(ADNetClient &client){
192+
std::cout << "Performing non-blocking send and receive" << std::endl;
193+
Message recv;
194+
client.send_and_receive(recv, send);
195+
std::cout << "Non-blocking send returned " << recv.buf() << std::endl;
196+
}
197+
bool do_delete() const{ return true; }
198+
};
199+
200+
//ADNetClient inside a worker thread with blocking send/receive and non-blocking send
201+
class ADThreadNetClient{
202+
std::thread worker;
203+
mutable std::mutex m;
204+
std::queue<ClientAction*> queue;
205+
206+
size_t getNwork() const{
207+
std::lock_guard<std::mutex> l(m);
208+
return queue.size();
209+
}
210+
ClientAction* getWorkItem(){
211+
std::lock_guard<std::mutex> l(m);
212+
ClientAction *work_item = queue.front();
213+
queue.pop();
214+
return work_item;
215+
}
216+
217+
void run(){
218+
std::cout << "Starting worker thread" << std::endl;
219+
worker = std::thread([&](){
220+
ADNetClient client;
221+
bool shutdown = false;
222+
223+
while(!shutdown){
224+
size_t nwork = getNwork();
225+
while(nwork > 0){
226+
ClientAction* work_item = getWorkItem();
227+
work_item->perform(client);
228+
shutdown = work_item->shutdown_worker();
229+
230+
if(work_item->do_delete()) delete work_item;
231+
nwork = getNwork();
232+
}
233+
if(shutdown){
234+
if(nwork > 0) fatal_error("Worker was shut down before emptying its queue!");
235+
std::cout << "Worker received shutdown request" << std::endl;
236+
}else{
237+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
238+
}
239+
}
240+
});
241+
}
242+
243+
public:
244+
ADThreadNetClient(){
245+
run();
246+
}
247+
248+
//Use only if you know what you are doing!
249+
void enqueue_action(ClientAction *action){
250+
std::lock_guard<std::mutex> l(m);
251+
queue.push(action);
252+
}
253+
254+
void connect_ps(int rank, int srank = 0, std::string sname="MPINET"){
255+
enqueue_action(new ClientActionConnect(rank, srank,sname));
256+
}
257+
void disconnect_ps(){
258+
enqueue_action(new ClientActionDisconnect());
259+
}
260+
void send_and_receive(Message &recv, const Message &send){
261+
ClientActionBlockingSendReceive action(&recv, &send);
262+
enqueue_action(&action);
263+
action.wait_for();
264+
}
265+
void async_send(const Message &send){
266+
enqueue_action(new ClientActionAsyncSend(send));
267+
}
268+
269+
~ADThreadNetClient(){
270+
std::cout << "Joining worker thread" << std::endl;
271+
worker.join();
272+
}
273+
274+
};
116275

276+
117277

118278
};
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#pragma once
2+
3+
#ifdef _USE_MPINET
4+
#include "chimbuko/net/mpi_net.hpp"
5+
#else
6+
#include "chimbuko/net/zmq_net.hpp"
7+
#endif
8+
#include "chimbuko/message.hpp"
9+
#include "chimbuko/util/PerfStats.hpp"
10+
11+
namespace chimbuko{
12+
13+
14+
/**
15+
* @brief A wrapper class to facilitate communications between the AD and the parameter server
16+
*/
17+
class ADNetClient{
18+
public:
19+
ADNetClient();
20+
21+
~ADNetClient();
22+
23+
/**
24+
* @brief check if the parameter server is in use
25+
*
26+
* @return true if the parameter server is in use
27+
* @return false if the parameter server is not in use
28+
*/
29+
bool use_ps() const { return m_use_ps; }
30+
31+
/**
32+
* @brief connect to the parameter server
33+
*
34+
* @param rank this process rank
35+
* @param srank server process rank. If using ZMQnet this is not applicable
36+
* @param sname server name. If using ZMQNet this is the server ip address, for MPINet it is not applicable
37+
*/
38+
void connect_ps(int rank, int srank = 0, std::string sname="MPINET");
39+
/**
40+
* @brief disconnect from the connected parameter server
41+
*
42+
* Called automatically by destructor if not previously called
43+
*/
44+
void disconnect_ps();
45+
46+
/**
47+
* @brief Return the MPI rank of the parameter server
48+
*/
49+
int get_server_rank() const{ return m_srank; }
50+
51+
/**
52+
* @brief Return the MPI rank of this client
53+
*/
54+
int get_client_rank() const{ return m_rank; }
55+
56+
/**
57+
* @brief Send a message to the parameter server and receive the response in a serialized format
58+
* @param msg The message
59+
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
60+
*/
61+
std::string send_and_receive(const Message &msg) const;
62+
63+
/**
64+
* @brief Send a message to the parameter server and receive the response both as Message objects
65+
* @param send The sent message
66+
* @param recv The received message
67+
*
68+
* Note recv and send can be the same object
69+
*/
70+
void send_and_receive(Message &recv, const Message &send);
71+
72+
/**
73+
* @brief If linked timing and packet size information will be gathered
74+
*/
75+
void linkPerf(PerfStats* perf){ m_perf = perf; }
76+
77+
#ifdef _USE_ZMQNET
78+
/**
79+
* @brief Set the timeout on blocking receives. Must be called prior to connecting
80+
*/
81+
void setRecvTimeout(const int timeout_ms){ m_recv_timeout_ms = timeout_ms; }
82+
83+
/**
84+
* @brief Get the zeroMQ socket
85+
*/
86+
void * getZMQsocket(){ return m_socket; }
87+
88+
/**
89+
* @brief Get the zeroMQ context
90+
*/
91+
void * getZMQcontext(){ return m_context; }
92+
93+
/**
94+
* @brief Issue a stop command to the server. The server will then stop once all clients have disconnected and all messages processed
95+
*/
96+
void stopServer() const;
97+
#endif
98+
99+
100+
private:
101+
bool m_use_ps; /**< true if the parameter server is in use */
102+
int m_rank; /**< MPI rank of current process */
103+
int m_srank; /**< server process rank */
104+
#ifdef _USE_MPINET
105+
MPI_Comm m_comm; /**< Instance of the MPI communicator */
106+
#else
107+
void* m_context; /**< ZeroMQ context */
108+
void* m_socket; /**< ZeroMQ socket */
109+
int m_recv_timeout_ms; /**< Timeout (in ms) on blocking receives (default 30s)*/
110+
#endif
111+
PerfStats * m_perf; /**< Performance monitoring */
112+
};
113+
114+
115+
116+
117+
118+
};

include/chimbuko/chimbuko.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ namespace chimbuko {
9494
*/
9595
bool use_ps() const { return m_outlier->use_ps(); }
9696

97+
#ifndef ENABLE_PROVDB
98+
#error "Provdb flag not set"
99+
#endif
100+
97101
#ifdef ENABLE_PROVDB
98102
/**
99103
* @brief Whether the provenance database is in use

0 commit comments

Comments
 (0)