Skip to content

Commit 7c80bbe

Browse files
committed
setRecvTimeout can be set from config file
1 parent fa3bc79 commit 7c80bbe

20 files changed

Lines changed: 123 additions & 61 deletions

app/driver.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ optionalArgsParser & getOptionalArgsParser(){
7171
addOptionalCommandLineArg(p, hbos_use_global_threshold, "Set true to use a global threshold in HBOS algorithm. Dafault is true.");
7272
addOptionalCommandLineArg(p, program_idx, "Set the index associated with the instrumented program. Use to label components of a workflow. (default 0)");
7373
addOptionalCommandLineArg(p, outlier_sigma, "Set the number of standard deviations that defines an anomalous event (default 6)");
74+
addOptionalCommandLineArg(p, net_recv_timeout, "Timeout (in ms) for blocking receives on client from parameter server");
7475
addOptionalCommandLineArg(p, pserver_addr, "Set the address of the parameter server. If empty (default) the pserver will not be used.");
7576
addOptionalCommandLineArg(p, hpserver_nthr, "Set the number of threads used by the hierarchical PS. This parameter is used to compute a port offset for the particular endpoint that this AD rank connects to (default 1)");
7677
addOptionalCommandLineArg(p, interval_msec, "Force the AD to pause for this number of ms at the end of each IO step (default 0)");
@@ -129,6 +130,7 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
129130
params.ad_algorithm = "hbos";
130131
params.hbos_threshold = 0.99;
131132
params.hbos_use_global_threshold = true;
133+
params.net_recv_timeout = 30000;
132134
params.pserver_addr = ""; //don't use pserver by default
133135
params.hpserver_nthr = 1;
134136
params.outlier_sigma = 6.0; // anomaly detection algorithm parameter

include/chimbuko/ad/ADLocalCounterStatistics.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ namespace chimbuko{
7777
*
7878
* The message string is the output of get_json_state() in string format
7979
*/
80-
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
80+
//std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
81+
std::pair<size_t, size_t> updateGlobalStatistics(ADThreadNetClient &net_client, int rank, std::string pserver_addr) const;
8182

8283
/**
8384
* @brief Attach a PerfStats object into which performance metrics are accumulated
@@ -117,7 +118,8 @@ namespace chimbuko{
117118
* @param step step (or frame) number
118119
* @return std::pair<size_t, size_t> [sent, recv] message size
119120
*/
120-
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
121+
//static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
122+
static std::pair<size_t, size_t> updateGlobalStatistics(ADThreadNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
121123

122124
unsigned long m_program_idx; /**< Program idx*/
123125
int m_step; /**< io step */

include/chimbuko/ad/ADLocalFuncStatistics.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ namespace chimbuko{
7676
*
7777
* The message communicated is the string dump of the output of get_json_state()
7878
*/
79-
std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
80-
79+
//std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const;
80+
std::pair<size_t, size_t> updateGlobalStatistics(ADThreadNetClient &net_client, int rank, std::string pserver_addr) const;
8181

8282
/**
8383
* @brief Get the current state as a JSON object
@@ -108,7 +108,8 @@ namespace chimbuko{
108108
* @param step step (or frame) number
109109
* @return std::pair<size_t, size_t> [sent, recv] message size
110110
*/
111-
static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
111+
//static std::pair<size_t, size_t> updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
112+
static std::pair<size_t, size_t> updateGlobalStatistics(ADThreadNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr);
112113

113114
int m_step; /**< io step */
114115
unsigned long m_rank; /**< Rank*/

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ namespace chimbuko{
153153
ClientActionWait(size_t wait_ms): wait_ms(wait_ms){}
154154

155155
void perform(ADNetClient &client){
156-
std::cout << "Worker is waiting for "<< wait_ms << "ms" << std::endl;
156+
//std::cout << "Worker is waiting for "<< wait_ms << "ms" << std::endl;
157157
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
158158
}
159159
bool do_delete() const{ return true; }
@@ -169,7 +169,7 @@ namespace chimbuko{
169169
ClientActionBlockingSendReceive(Message *recv, Message const *send): send(send), recv(recv), complete(false){}
170170

171171
void perform(ADNetClient &client){
172-
std::cout << "Performing blocking send and receive" << std::endl;
172+
//std::cout << "Performing blocking send and receive" << std::endl;
173173
client.send_and_receive(*recv, *send);
174174
complete = true;
175175
cv.notify_one();
@@ -189,14 +189,25 @@ namespace chimbuko{
189189
ClientActionAsyncSend(const Message &send): send(send){}
190190

191191
void perform(ADNetClient &client){
192-
std::cout << "Performing non-blocking send and receive" << std::endl;
192+
//std::cout << "Performing non-blocking send and receive" << std::endl;
193193
Message recv;
194194
client.send_and_receive(recv, send);
195-
std::cout << "Non-blocking send returned " << recv.buf() << std::endl;
195+
//std::cout << "Non-blocking send returned " << recv.buf() << std::endl;
196196
}
197197
bool do_delete() const{ return true; }
198198
};
199199

200+
struct ClientActionSetRecvTimeout: public ClientAction{
201+
int timeout;
202+
ClientActionSetRecvTimeout(const int timeout): timeout(timeout){}
203+
204+
void perform(ADNetClient &client){
205+
client.setRecvTimeout(timeout);
206+
}
207+
208+
bool do_delete() const{return true;}
209+
};
210+
200211
//ADNetClient inside a worker thread with blocking send/receive and non-blocking send
201212
class ADThreadNetClient{
202213
std::thread worker;
@@ -213,9 +224,9 @@ namespace chimbuko{
213224
queue.pop();
214225
return work_item;
215226
}
216-
227+
217228
void run(){
218-
std::cout << "Starting worker thread" << std::endl;
229+
//std::cout << "Starting worker thread" << std::endl;
219230
worker = std::thread([&](){
220231
ADNetClient client;
221232
bool shutdown = false;
@@ -232,9 +243,9 @@ namespace chimbuko{
232243
}
233244
if(shutdown){
234245
if(nwork > 0) fatal_error("Worker was shut down before emptying its queue!");
235-
std::cout << "Worker received shutdown request" << std::endl;
246+
//std::cout << "Worker received shutdown request" << std::endl;
236247
}else{
237-
std::this_thread::sleep_for(std::chrono::milliseconds(20));
248+
std::this_thread::sleep_for(std::chrono::milliseconds(80));
238249
}
239250
}
240251
});
@@ -252,7 +263,10 @@ namespace chimbuko{
252263
}
253264

254265
void connect_ps(int rank, int srank = 0, std::string sname="MPINET"){
266+
m_rank = rank;
267+
m_srank = srank;
255268
enqueue_action(new ClientActionConnect(rank, srank,sname));
269+
m_use_ps = true;
256270
}
257271
void disconnect_ps(){
258272
enqueue_action(new ClientActionDisconnect());
@@ -266,11 +280,28 @@ namespace chimbuko{
266280
enqueue_action(new ClientActionAsyncSend(send));
267281
}
268282

283+
bool use_ps() const { return m_use_ps; }
284+
285+
void linkPerf(PerfStats* perf){ m_perf = perf; }
286+
287+
int get_server_rank() const{ return m_srank; }
288+
289+
int get_client_rank() const{ return m_rank; }
290+
291+
void setRecvTimeout(const int timeout_ms) {
292+
enqueue_action(new ClientActionSetRecvTimeout(timeout_ms));
293+
}
294+
269295
~ADThreadNetClient(){
270-
std::cout << "Joining worker thread" << std::endl;
296+
//std::cout << "Joining worker thread" << std::endl;
271297
worker.join();
272298
}
273-
299+
300+
private:
301+
int m_rank;
302+
int m_srank;
303+
bool m_use_ps;
304+
PerfStats * m_perf;
274305
};
275306

276307

include/chimbuko/ad/ADOutlier.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ namespace chimbuko {
5757
/**
5858
* @brief Link the interface for communicating with the parameter server
5959
*/
60-
void linkNetworkClient(ADNetClient *client);
60+
//void linkNetworkClient(ADNetClient *client);
61+
void linkNetworkClient(ADThreadNetClient *client);
6162

6263
/**
6364
* @brief abstract method to run the implemented anomaly detection algorithm
@@ -112,8 +113,9 @@ namespace chimbuko {
112113
protected:
113114
int m_rank; /**< this process rank */
114115
bool m_use_ps; /**< true if the parameter server is in use */
115-
ADNetClient* m_net_client; /**< interface for communicating to parameter server */
116-
116+
//ADNetClient* m_net_client; /**< interface for communicating to parameter server */
117+
ADThreadNetClient* m_net_client; /**< interface for communicating to parameter server */
118+
117119
std::unordered_map< std::array<unsigned long, 4>, size_t, ArrayHasher<unsigned long,4> > m_local_func_exec_count; /**< Map(program id, rank id, thread id, func id) -> number of times encountered on this node*/
118120

119121
const ExecDataMap_t * m_execDataMap; /**< execution data map */

include/chimbuko/ad/ADParser.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ namespace chimbuko {
4343
*
4444
* If this is performed, the parser will replace the local with global index in the incoming data stream
4545
*/
46-
void linkNetClient(ADNetClient *net_client){ m_global_func_idx_map.linkNetClient(net_client); }
47-
46+
//void linkNetClient(ADNetClient *net_client){ m_global_func_idx_map.linkNetClient(net_client); }
47+
void linkNetClient(ADThreadNetClient *net_client){ m_global_func_idx_map.linkNetClient(net_client); }
4848
/**
4949
* @brief If linked, performance information will be gathered
5050
*/

include/chimbuko/ad/ADglobalFunctionIndexMap.hpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ namespace chimbuko{
1414
* If the parameter server is not connected it will simply return the local index
1515
*/
1616
class ADglobalFunctionIndexMap{
17-
ADNetClient *m_net_client;
17+
//ADNetClient *m_net_client;
18+
ADThreadNetClient *m_net_client;
1819
std::unordered_map<unsigned long, unsigned long> m_idxmap; /**< Map of local function index to global function index*/
1920
unsigned long m_pid; /**< Program index*/
2021
public:
@@ -26,7 +27,8 @@ namespace chimbuko{
2627
*
2728
*If a pointer to the net client is not provided the local index will not be synchronized betwee nodes
2829
*/
29-
ADglobalFunctionIndexMap(unsigned long pid, ADNetClient *net_client = nullptr): m_net_client(net_client), m_pid(pid){}
30+
//ADglobalFunctionIndexMap(unsigned long pid, ADNetClient *net_client = nullptr): m_net_client(net_client), m_pid(pid){}
31+
ADglobalFunctionIndexMap(unsigned long pid, ADThreadNetClient *net_client = nullptr): m_net_client(net_client), m_pid(pid){}
3032

3133
/**
3234
* @brief Check if the pserver is connected
@@ -36,7 +38,8 @@ namespace chimbuko{
3638
/**
3739
* @brief Link the net client
3840
*/
39-
void linkNetClient(ADNetClient *net_client){ m_net_client = net_client; }
41+
//void linkNetClient(ADNetClient *net_client){ m_net_client = net_client; }
42+
void linkNetClient(ADThreadNetClient *net_client){ m_net_client = net_client; }
4043

4144
/**
4245
* @brief Lookup the global index corresponding to the input local index
@@ -63,7 +66,8 @@ namespace chimbuko{
6366
/**
6467
* @brief Return a pointer to the net client
6568
*/
66-
ADNetClient* getNetClient(){ return m_net_client; }
69+
//ADNetClient* getNetClient(){ return m_net_client; }
70+
ADThreadNetClient* getNetClient(){ return m_net_client; }
6771
};
6872

6973
}

include/chimbuko/chimbuko.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ namespace chimbuko {
5757
std::string outlier_statistic; /**< Set the statistic used in outlier detection*/
5858

5959
int step_report_freq; /**<Steps between Chimbuko reporting IO step progress. Use 0 to deactivate this logging entirely (default 1)*/
60+
61+
int net_recv_timeout; /**< Timeout (in ms) used for blocking receives functionality on client (driver) of parameter server */
6062

6163
ChimbukoParams();
6264

@@ -207,7 +209,8 @@ namespace chimbuko {
207209
ADCounter * m_counter; /**< counter event manager */
208210
ADOutlier * m_outlier; /**< outlier detection algorithm */
209211
ADio * m_io; /**< output writer */
210-
ADNetClient * m_net_client; /**< client for comms with parameter server */
212+
//ADNetClient * m_net_client; /**< client for comms with parameter server */
213+
ADThreadNetClient * m_net_client; /**< client for comms with parameter server */
211214
ADMetadataParser *m_metadata_parser; /**< parser for metadata */
212215
#ifdef ENABLE_PROVDB
213216
ADProvenanceDBclient *m_provdb_client; /**< provenance DB client*/

src/ad/ADLocalCounterStatistics.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ ADLocalCounterStatistics::State ADLocalCounterStatistics::get_state() const{
7070
}
7171

7272

73-
std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const{
73+
//std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const{
74+
std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADThreadNetClient &net_client, int rank, std::string pserver_addr) const{
7475
//nlohmann::json state = get_json_state();
7576
State state = get_state();
7677
PerfTimer timer;
@@ -86,23 +87,25 @@ std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADNet
8687
return msgsz;
8788
}
8889

89-
std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
90+
//std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
91+
std::pair<size_t, size_t> ADLocalCounterStatistics::updateGlobalStatistics(ADThreadNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
9092
if (!net_client.use_ps())
9193
return std::make_pair(0, 0);
9294

93-
ADThreadNetClient thrnet;
94-
thrnet.connect_ps(rank, 0, pserver_addr);
95+
//ADThreadNetClient thrnet;
96+
//thrnet.connect_ps(rank, 0, pserver_addr);
9597

9698
Message msg;
9799
msg.set_info(net_client.get_client_rank(), net_client.get_server_rank(), MessageType::REQ_ADD, MessageKind::COUNTER_STATS, step);
98100
msg.set_msg(l_stats);
99101

100102
size_t sent_sz = msg.size();
101103
//std::string strmsg = net_client.send_and_receive(msg);
102-
thrnet.async_send(msg);
103-
size_t recv_sz = msg.size(); //strmsg.size();
104+
net_client.async_send(msg);
105+
//thrnet.async_send(msg);
106+
size_t recv_sz = 0; //msg.size(); //strmsg.size();
104107

105-
thrnet.disconnect_ps();
108+
//thrnet.disconnect_ps();
106109
return std::make_pair(sent_sz, recv_sz);
107110
}
108111

src/ad/ADLocalFuncStatistics.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ ADLocalFuncStatistics::State ADLocalFuncStatistics::get_state() const{
9595
}
9696

9797

98-
std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const{
98+
//std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADNetClient &net_client, int rank, std::string pserver_addr) const{
99+
std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADThreadNetClient &net_client, int rank, std::string pserver_addr) const{
99100
// func id --> (name, # anomaly, inclusive run stats, exclusive run stats)
100101
//nlohmann::json g_info = get_json_state();
101102
State g_info = get_state();
@@ -113,22 +114,24 @@ std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADNetCli
113114
return msgsz;
114115
}
115116

116-
std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
117+
//std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
118+
std::pair<size_t, size_t> ADLocalFuncStatistics::updateGlobalStatistics(ADThreadNetClient &net_client, const std::string &l_stats, int step, int rank, std::string pserver_addr){
117119
if (!net_client.use_ps())
118120
return std::make_pair(0, 0);
119121

120-
ADThreadNetClient thrnet;
121-
thrnet.connect_ps(rank, 0, pserver_addr);
122+
//ADThreadNetClient thrnet;
123+
//thrnet.connect_ps(rank, 0, pserver_addr);
122124

123125
Message msg;
124126
msg.set_info(net_client.get_client_rank(), net_client.get_server_rank(), MessageType::REQ_ADD, MessageKind::ANOMALY_STATS, step);
125127
msg.set_msg(l_stats);
126128

127129
size_t sent_sz = msg.size();
128130
//std::string strmsg = net_client.send_and_receive(msg);
129-
thrnet.async_send(msg);
130-
size_t recv_sz = msg.size(); //strmsg.size();
131+
net_client.async_send(msg);
132+
//thrnet.async_send(msg);
133+
size_t recv_sz =0;// msg.size(); //strmsg.size();
131134

132-
thrnet.disconnect_ps();
135+
//thrnet.disconnect_ps();
133136
return std::make_pair(sent_sz, recv_sz);
134137
}

0 commit comments

Comments
 (0)