Skip to content

Commit 6e81ced

Browse files
committed
Added a new LocalNet net implementation for intraprocess communication + unit test
Added an ADLocalNetClient implementation associated with the above + unit test
1 parent 6874ae2 commit 6e81ced

9 files changed

Lines changed: 280 additions & 2 deletions

File tree

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#else
66
#include "chimbuko/net/zmq_net.hpp"
77
#endif
8+
#include "chimbuko/net/local_net.hpp"
89
#include "chimbuko/message.hpp"
910
#include "chimbuko/util/PerfStats.hpp"
1011

@@ -168,6 +169,8 @@ namespace chimbuko{
168169
*/
169170
void disconnect_ps() override;
170171

172+
using ADNetClient::send_and_receive;
173+
171174
/**
172175
* @brief Send a message to the parameter server and receive the response in a serialized format
173176
* @param msg The message
@@ -183,6 +186,35 @@ namespace chimbuko{
183186
#endif
184187

185188

189+
/**
190+
* @brief Implementation of ADNetClient for intraprocess communications
191+
*/
192+
class ADLocalNetClient: public ADNetClient{
193+
public:
194+
/**
195+
* @brief connect to the parameter server
196+
*
197+
* @param rank Ignored
198+
* @param srank Ignored
199+
* @param sname Ignored
200+
*/
201+
void connect_ps(int rank, int srank = 0, std::string sname="MPINET") override;
202+
/**
203+
* @brief disconnect from the connected parameter server
204+
*
205+
* Called automatically by destructor if not previously called
206+
*/
207+
void disconnect_ps() override;
208+
209+
using ADNetClient::send_and_receive;
210+
211+
/**
212+
* @brief Send a message to the parameter server and receive the response in a serialized format
213+
* @param msg The message
214+
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
215+
*/
216+
std::string send_and_receive(const Message &msg) const override;
217+
};
186218

187219

188220

include/chimbuko/net/local_net.hpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#pragma once
2+
3+
#include <chimbuko/net.hpp>
4+
#include <chimbuko/util/PerfStats.hpp>
5+
6+
namespace chimbuko {
7+
8+
/**
9+
@brief A simple interface for a network in which the sender and receiver are on the same process
10+
*/
11+
class LocalNet : public NetInterface {
12+
public:
13+
14+
//Pointer the the current instance of LocalNet. Only one instance can exist at any given time
15+
static LocalNet * & globalInstance();
16+
17+
LocalNet();
18+
~LocalNet();
19+
20+
/**
21+
* @brief (virtual) initialize network interface
22+
*
23+
* @param argc command line argc
24+
* @param argv command line argv
25+
* @param nt the number of threads for a thread pool
26+
*/
27+
void init(int* argc = nullptr, char*** argv = nullptr, int nt=1) override{}
28+
29+
/**
30+
* @brief (virtual) finalize network
31+
*
32+
*/
33+
void finalize() override{}
34+
35+
#ifdef _PERF_METRIC
36+
/**
37+
* @brief (virtual) Run network server with performance logging to provided directory
38+
*/
39+
void run(std::string logdir="./") override{}
40+
#else
41+
/**
42+
* @brief (virtual) Run network server
43+
*/
44+
void run() override{}
45+
#endif
46+
/**
47+
* @brief (virtual) stop network server
48+
*
49+
*/
50+
void stop() override{}
51+
52+
/**
53+
* @brief (virtual) name of network server
54+
*
55+
* @return std::string name of network server
56+
*/
57+
std::string name() const override{ return "LocalNet"; }
58+
59+
60+
/**
61+
* @brief Send a message and receive the response
62+
*/
63+
static std::string send_and_receive(const std::string &send_str);
64+
65+
protected:
66+
void init_thread_pool(int nt){}
67+
68+
};
69+
70+
71+
}

src/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/3rdparty @PS_FLAGS@
22

33
lib_LTLIBRARIES = libchimbuko.la
4-
libchimbuko_la_SOURCES = chimbuko.cpp param.cpp pserver/global_anomaly_stats.cpp pserver/global_counter_stats.cpp util/ADIOS2parseUtils.cpp util/RunStats.cpp util/DispatchQueue.cpp util/Anomalies.cpp util/map.cpp util/string.cpp util/error.cpp util/memutils.cpp util/time.cpp util/PerfStats.cpp ad/ADOutlier.cpp ad/ADParser.cpp ad/ADio.cpp ad/utils.cpp ad/ADEvent.cpp ad/AnomalyData.cpp ad/ExecData.cpp ad/ADCounter.cpp ad/ADAnomalyProvenance.cpp ad/ADNetClient.cpp ad/ADLocalFuncStatistics.cpp ad/ADLocalCounterStatistics.cpp ad/ADProvenanceDBengine.cpp ad/ADProvenanceDBclient.cpp ad/ADMetadataParser.cpp ad/ADglobalFunctionIndexMap.cpp ad/ADNormalEventProvenance.cpp pserver/PSstatSender.cpp pserver/PSglobalFunctionIndexMap.cpp pserver/PSProvenanceDBclient.cpp pserver/AnomalyStat.cpp message.cpp net/zmq_net.cpp net/mpi_net.cpp net/zmqme_net.cpp net.cpp param/hbos_param.cpp param/sstd_param.cpp
4+
libchimbuko_la_SOURCES = chimbuko.cpp param.cpp pserver/global_anomaly_stats.cpp pserver/global_counter_stats.cpp util/ADIOS2parseUtils.cpp util/RunStats.cpp util/DispatchQueue.cpp util/Anomalies.cpp util/map.cpp util/string.cpp util/error.cpp util/memutils.cpp util/time.cpp util/PerfStats.cpp ad/ADOutlier.cpp ad/ADParser.cpp ad/ADio.cpp ad/utils.cpp ad/ADEvent.cpp ad/AnomalyData.cpp ad/ExecData.cpp ad/ADCounter.cpp ad/ADAnomalyProvenance.cpp ad/ADNetClient.cpp ad/ADLocalFuncStatistics.cpp ad/ADLocalCounterStatistics.cpp ad/ADProvenanceDBengine.cpp ad/ADProvenanceDBclient.cpp ad/ADMetadataParser.cpp ad/ADglobalFunctionIndexMap.cpp ad/ADNormalEventProvenance.cpp pserver/PSstatSender.cpp pserver/PSglobalFunctionIndexMap.cpp pserver/PSProvenanceDBclient.cpp pserver/AnomalyStat.cpp message.cpp net/zmq_net.cpp net/mpi_net.cpp net/zmqme_net.cpp net/local_net.cpp net.cpp param/hbos_param.cpp param/sstd_param.cpp
55
libchimbuko_la_LDFLAGS = -version-info 3:0:0

src/ad/ADNetClient.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,3 +231,34 @@ std::string ADMPINetClient::send_and_receive(const Message &msg) const{
231231
}
232232

233233
#endif
234+
235+
236+
237+
void ADLocalNetClient::connect_ps(int rank, int srank, std::string sname){
238+
if(LocalNet::globalInstance() == nullptr) fatal_error("No LocalNet instance exists");
239+
m_use_ps = true;
240+
}
241+
242+
243+
void ADLocalNetClient::disconnect_ps(){
244+
m_use_ps = false;
245+
}
246+
247+
248+
std::string ADLocalNetClient::send_and_receive(const Message &msg) const{
249+
if(!m_use_ps) fatal_error("User should call connect_ps prior to sending/receiving messages");
250+
251+
PerfTimer timer;
252+
std::string send_msg = msg.data();
253+
std::string recv_msg = LocalNet::send_and_receive(send_msg);
254+
255+
#ifdef _PERF_METRIC
256+
if(m_perf){
257+
m_perf->add("net_client_send_recv_time_ms", timer.elapsed_ms());
258+
m_perf->add("net_client_send_bytes", send_msg.size()); //1 char = 1 byte
259+
m_perf->add("net_client_recv_bytes", recv_msg.size());
260+
}
261+
#endif
262+
263+
return recv_msg;
264+
}

src/net/local_net.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#include "chimbuko/message.hpp"
2+
#include "chimbuko/verbose.hpp"
3+
#include "chimbuko/util/string.hpp"
4+
#include "chimbuko/util/error.hpp"
5+
#include "chimbuko/net/local_net.hpp"
6+
#include <iostream>
7+
#include <string.h>
8+
9+
using namespace chimbuko;
10+
11+
LocalNet * & LocalNet::globalInstance(){ static LocalNet* net = nullptr; return net; }
12+
13+
LocalNet::LocalNet(){
14+
if(globalInstance() != nullptr) fatal_error("Cannot have 2 instances of LocalNet active");
15+
globalInstance() = this;
16+
}
17+
18+
LocalNet::~LocalNet(){
19+
globalInstance() = nullptr;
20+
}
21+
22+
std::string LocalNet::send_and_receive(const std::string &send_str){
23+
if(globalInstance() == nullptr){ fatal_error("An instance of LocalNet must exist"); }
24+
25+
Message msg, msg_reply;
26+
msg.set_msg(send_str, true);
27+
28+
NetInterface::find_and_perform_action(0, msg_reply, msg, globalInstance()->m_payloads);
29+
return msg_reply.data();
30+
}

test/unit_tests/ad/ADNetClient.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,3 +359,40 @@ TEST(ADNetClient, TestRemoteStop){
359359

360360

361361

362+
363+
TEST(ADLocalNetClientTest, SendRecv){
364+
ADLocalNetClient client;
365+
bool err = false;
366+
try{
367+
client.connect_ps(1,2,"arg");
368+
}catch(const std::exception &e){
369+
err = true;
370+
}
371+
EXPECT_EQ(err,true);
372+
EXPECT_EQ(client.use_ps(),false);
373+
374+
err = false;
375+
LocalNet net;
376+
net.add_payload(new NetPayloadBounceback);
377+
378+
try{
379+
client.connect_ps(1,2,"arg");
380+
}catch(const std::exception &e){
381+
err = true;
382+
}
383+
384+
EXPECT_EQ(err, false);
385+
EXPECT_EQ(client.use_ps(), true);
386+
387+
Message msg, msg_reply;
388+
msg.set_info(0,0,REQ_ECHO,CMD);
389+
msg.set_msg("Hello!!");
390+
391+
client.send_and_receive(msg_reply, msg);
392+
393+
EXPECT_EQ(msg_reply.buf(), "Hello!!");
394+
395+
client.disconnect_ps();
396+
EXPECT_EQ(client.use_ps(), false);
397+
}
398+

test/unit_tests/net/LocalNet.cpp

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
#include "gtest/gtest.h"
2+
#include "../unit_test_common.hpp"
3+
#include <chimbuko/net/local_net.hpp>
4+
5+
using namespace chimbuko;
6+
7+
TEST(TestLocalNet, TestSingleton){
8+
LocalNet *net1 = new LocalNet;
9+
bool err = false;
10+
11+
try{
12+
LocalNet net2;
13+
}catch(const std::exception &e){
14+
err = true;
15+
}
16+
17+
EXPECT_EQ(err, true);
18+
19+
delete net1;
20+
21+
err = false;
22+
try{
23+
LocalNet net2;
24+
}catch(const std::exception &e){
25+
err = true;
26+
}
27+
28+
EXPECT_EQ(err, false);
29+
}
30+
31+
class NetPayloadBounceback: public NetPayloadBase{
32+
public:
33+
MessageKind kind() const{ return MessageKind::CMD; }
34+
MessageType type() const{ return MessageType::REQ_ECHO; }
35+
void action(Message &response, const Message &message) override{
36+
check(message);
37+
std::cout << "Bounce received: '" << message.buf() << "'" << std::endl;
38+
response.set_msg(message.buf(), false);
39+
};
40+
};
41+
42+
43+
TEST(TestLocalNet, SendAndReceive){
44+
//Test it throws if no active instance
45+
bool err = false;
46+
try{
47+
LocalNet::send_and_receive("hello");
48+
}catch(const std::exception &e){
49+
err = true;
50+
}
51+
EXPECT_EQ(err, true);
52+
53+
err = false;
54+
55+
LocalNet net;
56+
net.add_payload(new NetPayloadBounceback);
57+
58+
std::string the_message = "Hello!";
59+
Message msg;
60+
msg.set_info(0,0, MessageType::REQ_ECHO, MessageKind::CMD);
61+
msg.set_msg(the_message);
62+
63+
std::string str_reply = LocalNet::send_and_receive(msg.data());
64+
Message reply;
65+
reply.set_msg(str_reply, true);
66+
67+
EXPECT_EQ(reply.buf(), the_message);
68+
}
69+
70+
71+
72+

test/unit_tests/net/Makefile.am

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/3rdparty @PS_FLAGS@
22
LDADD = $(top_builddir)/src/libchimbuko.la -lgtest -lstdc++fs
33

44
testdir = $(prefix)/test/unit_tests/net
5-
test_PROGRAMS = ZMQMENet ZMQNet
5+
test_PROGRAMS = ZMQMENet ZMQNet LocalNet
66

77
ZMQMENet_SOURCES = ZMQMENet.cpp ../unit_test_main_mpi.cpp
88
ZMQMENet_LDADD = $(LDADD)
99

1010
ZMQNet_SOURCES = ZMQNet.cpp ../unit_test_main_mpi.cpp
1111
ZMQNet_LDADD = $(LDADD)
12+
13+
LocalNet_SOURCES = LocalNet.cpp ../unit_test_main_mpi.cpp
14+
LocalNet_LDADD = $(LDADD)
15+

test/unit_tests/run_all.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ set -o pipefail
3333
./pserver/global_anomaly_stats
3434
./net/ZMQMENet
3535
./net/ZMQNet
36+
./net/LocalNet

0 commit comments

Comments
 (0)