Skip to content

Commit 4631b7b

Browse files
committed
Added setRecvTimeout method to ADNetClient interface to fix compile bug
ADThreadNetClient now has an option to use a local network client, used in the sim Fixed ADThreadNetClient not calling disconnect_ps in destructor, preventing thread join unless this function is called manually
1 parent 1f868f6 commit 4631b7b

4 files changed

Lines changed: 40 additions & 13 deletions

File tree

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ namespace chimbuko{
7878
*/
7979
void linkPerf(PerfStats* perf){ m_perf = perf; }
8080

81+
/**
82+
* @brief Set the timeout for receiving messages (implementation dependent)
83+
*/
84+
virtual void setRecvTimeout(const int timeout_ms){ }
85+
8186
protected:
8287
bool m_use_ps; /**< true if the parameter server is in use */
8388
int m_rank; /**< MPI rank of current process */
@@ -125,7 +130,7 @@ namespace chimbuko{
125130
/**
126131
* @brief Set the timeout on blocking receives. Must be called prior to connecting
127132
*/
128-
void setRecvTimeout(const int timeout_ms){ m_recv_timeout_ms = timeout_ms; }
133+
void setRecvTimeout(const int timeout_ms) override{ m_recv_timeout_ms = timeout_ms; }
129134

130135
/**
131136
* @brief Get the zeroMQ socket
@@ -328,20 +333,29 @@ namespace chimbuko{
328333
return work_item;
329334
}
330335

331-
void run(){
336+
/**
337+
* @brief Create the worker thread
338+
* @param local Use a local (in process) communicator if true, otherwise use the default network communicator
339+
*/
340+
void run(bool local = false){
332341
worker = std::thread([&](){
342+
ADNetClient *client = nullptr;
343+
if(local){
344+
client = new ADLocalNetClient;
345+
}else{
333346
#ifdef _USE_MPINET
334-
ADMPINetClient client;
347+
client = new ADMPINetClient;
335348
#else
336-
ADZMQNetClient client;
349+
client = new ADZMQNetClient;
337350
#endif
351+
}
338352
bool shutdown = false;
339353

340354
while(!shutdown){
341355
size_t nwork = getNwork();
342356
while(nwork > 0){
343357
ClientAction* work_item = getWorkItem();
344-
work_item->perform(client);
358+
work_item->perform(*client);
345359
shutdown = shutdown || work_item->shutdown_worker();
346360

347361
if(work_item->do_delete()) delete work_item;
@@ -353,12 +367,17 @@ namespace chimbuko{
353367
std::this_thread::sleep_for(std::chrono::milliseconds(80));
354368
}
355369
}
370+
delete client;
356371
});
357372
}
358373

359374
public:
360-
ADThreadNetClient(){
361-
run();
375+
/**
376+
* @brief Constructor
377+
* @param local Use a local (in process) communicator if true, otherwise use the default network communicator
378+
*/
379+
ADThreadNetClient(bool local = false){
380+
run(local);
362381
}
363382

364383
//Use only if you know what you are doing!
@@ -398,6 +417,7 @@ namespace chimbuko{
398417
}
399418

400419
~ADThreadNetClient(){
420+
disconnect_ps();
401421
worker.join();
402422
}
403423

sim/include/sim/ad.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace chimbuko_sim{
3030
ADNormalEventProvenance m_normal_events;
3131
ADMetadataParser m_metadata;
3232
std::unique_ptr<ADProvenanceDBclient> m_pdb_client;
33-
ADLocalNetClient m_net_client; /**< The local net client. Only activated if an AD algorithm is used */
33+
ADThreadNetClient *m_net_client; /**< The local net client. Only activated if an AD algorithm is used */
3434
ADOutlier* m_outlier; /**< The local outlier algorithm instance, if used*/
3535
public:
3636
/**
@@ -54,11 +54,14 @@ namespace chimbuko_sim{
5454
ADsim(int window_size, int pid, int rid, unsigned long program_start, unsigned long step_freq): ADsim(){
5555
init(window_size, pid, rid, program_start, step_freq);
5656
}
57-
ADsim(): m_outlier(nullptr){}
57+
ADsim(): m_outlier(nullptr), m_net_client(nullptr){}
5858

5959
ADsim(ADsim &&r);
6060

61-
~ADsim(){ if(m_outlier) delete m_outlier; }
61+
~ADsim(){
62+
if(m_outlier) delete m_outlier;
63+
if(m_net_client) delete m_net_client;
64+
}
6265

6366
ADProvenanceDBclient &getProvDBclient(){ return *m_pdb_client; }
6467

sim/main/example2.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,6 @@ int main(int argc, char **argv){
7373
}
7474

7575
MPI_Finalize();
76+
std::cout << "Done" << std::endl;
7677
return 0;
7778
}

sim/src/ad.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@ ADsim::ADsim(ADsim &&r):
2929
m_normal_events(std::move(r.m_normal_events)),
3030
m_metadata(std::move(r.m_metadata)),
3131
m_pdb_client(std::move(r.m_pdb_client)),
32-
m_outlier(r.m_outlier)
32+
m_outlier(r.m_outlier),
33+
m_net_client(r.m_net_client)
3334
{
3435
r.m_outlier = nullptr;
36+
r.m_net_client = nullptr;
3537
}
3638

3739

@@ -50,8 +52,9 @@ void ADsim::init(int window_size, int pid, int rid, unsigned long program_start,
5052
if(p.algorithm != "none"){
5153
m_outlier = ADOutlier::set_algorithm(p.stat, p.algorithm, p.hbos_thres, p.glob_thres, p.sstd_sigma);
5254
getPserver(); //force construction of pserver
53-
m_net_client.connect_ps(m_rid);
54-
m_outlier->linkNetworkClient(&m_net_client);
55+
m_net_client = new ADThreadNetClient(true); //use local comms
56+
m_net_client->connect_ps(m_rid);
57+
m_outlier->linkNetworkClient(m_net_client);
5558
}
5659
}
5760

0 commit comments

Comments
 (0)