Skip to content

Commit 6874ae2

Browse files
committed
The functionality of looking up and performing a Net payload action is now provided by static functions through the NetInterface, simplifying net implementations
1 parent d93607e commit 6874ae2

4 files changed

Lines changed: 54 additions & 37 deletions

File tree

include/chimbuko/net.hpp

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,26 @@ namespace chimbuko {
131131
*/
132132
void list_payloads(std::ostream &os) const;
133133

134+
typedef std::unordered_map<MessageKind, std::unordered_map<MessageType, std::unique_ptr<NetPayloadBase> > > PayloadMapType; /**< Map of message kind/type to payloads */
135+
typedef std::unordered_map<int, PayloadMapType> WorkerPayloadMapType; /**< Map of worker index and message type to payloads */
136+
137+
/**
138+
* @brief Find the action associated with the given worker_id and message type and perform the action
139+
* @param worker_id The worker index
140+
* @param msg_reply The reply message
141+
* @param msg The input message
142+
* @param payloads The map of worker/message type to payload
143+
*/
144+
static void find_and_perform_action(int worker_id, Message &msg_reply, const Message &msg, const WorkerPayloadMapType &payloads);
145+
146+
/**
147+
* @brief Find the action associated with the given message type and perform the action
148+
* @param msg_reply The reply message
149+
* @param msg The input message
150+
* @param payloads The map of worker/message type to payload
151+
*/
152+
static void find_and_perform_action(Message &msg_reply, const Message &msg, const PayloadMapType &payloads);
153+
134154
protected:
135155
/**
136156
* @brief initialize thread pool
@@ -139,14 +159,9 @@ namespace chimbuko {
139159
*/
140160
virtual void init_thread_pool(int nt) = 0;
141161

142-
protected:
143162
int m_nt; /**< The number of threads in the pool */
144-
145-
std::unordered_map<int,
146-
std::unordered_map<MessageKind,
147-
std::unordered_map<MessageType, std::unique_ptr<NetPayloadBase> >
148-
>
149-
> m_payloads; /**< Map of worker index (implementation defined), message kind and message type to a payload*/
163+
164+
WorkerPayloadMapType m_payloads; /**< Map of worker index (implementation defined), message kind and message type to a payload*/
150165
};
151166

152167
namespace DefaultNetInterface

src/net.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "chimbuko/net.hpp"
22
#include "chimbuko/net/mpi_net.hpp"
33
#include "chimbuko/net/zmq_net.hpp"
4+
#include "chimbuko/util/error.hpp"
45

56
#include <iostream>
67
#include <chrono>
@@ -43,3 +44,21 @@ void NetInterface::list_payloads(std::ostream &os) const{
4344
}
4445
}
4546
}
47+
48+
void NetInterface::find_and_perform_action(int worker_id, Message &msg_reply, const Message &msg, const NetInterface::WorkerPayloadMapType &payloads){
49+
auto iit = payloads.find(worker_id);
50+
if(iit == payloads.end()){ fatal_error(std::string("Cannot find payloads with worker index ") + std::to_string(worker_id) ); }
51+
find_and_perform_action(msg_reply, msg, iit->second);
52+
}
53+
54+
void NetInterface::find_and_perform_action(Message &msg_reply, const Message &msg, const NetInterface::PayloadMapType &payloads){
55+
auto kit = payloads.find((MessageKind)msg.kind());
56+
if(kit == payloads.end()) fatal_error("No payload associated with the message kind provided. Message: " + msg.buf() + " (did you add the payload to the server?)");
57+
auto pit = kit->second.find((MessageType)msg.type());
58+
if(pit == kit->second.end()) fatal_error("No payload associated with the message type provided. Mesage: " + msg.buf() + " (did you add the payload to the server?)");
59+
60+
msg_reply = msg.createReply();
61+
pit->second->action(msg_reply, msg);
62+
}
63+
64+

src/net/zmq_net.cpp

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,37 +139,28 @@ void doWork(void* context,
139139
}
140140
perf.add(perf_prefix + "poll_time_ms", timer.elapsed_ms());
141141

142-
//Receive the message into a json-formatted string
143-
timer.start();
144142
std::string strmsg;
143+
144+
//Receive the message
145+
timer.start();
145146
ZMQNet::recv(socket, strmsg);
146147
perf.add(perf_prefix + "receive_from_front_ms", timer.elapsed_ms());
147-
148148
verboseStream << "ZMQNet worker thread " << thr_idx << " received message of size " << strmsg.size() << std::endl;
149149

150150
//Parse the message and instantiate a reply message with appropriate sender
151151
timer.start();
152152
Message msg, msg_reply;
153153
msg.set_msg(strmsg, true);
154-
155-
msg_reply = msg.createReply();
156-
perf.add(perf_prefix + "message_parse_reply_create_ms", timer.elapsed_ms());
154+
perf.add(perf_prefix + "message_parse_ms", timer.elapsed_ms());
157155

158156
timer.start();
159-
auto kit = payloads.find((MessageKind)msg.kind());
160-
if(kit == payloads.end()) fatal_error("ZMQNet::doWork : No payload associated with the message kind provided. Message: " + strmsg + " (did you add the payload to the server?)");
161-
auto pit = kit->second.find((MessageType)msg.type());
162-
if(pit == kit->second.end()) fatal_error("ZMQNet::doWork : No payload associated with the message type provided. Mesage: " + strmsg + " (did you add the payload to the server?)");
163-
perf.add(perf_prefix + "message_kind_type_lookup_ms", timer.elapsed_ms());
157+
NetInterface::find_and_perform_action(msg_reply, msg, payloads);
158+
perf.add(perf_prefix + "find_and_perform_action_ms", timer.elapsed_ms());
164159

165-
//Apply the payload
166-
timer.start();
167-
pit->second->action(msg_reply, msg);
160+
//Send the reply
168161
strmsg = msg_reply.data();
169162
verboseStream << "ZMQNet worker thread " << thr_idx << " sending response of size " << strmsg.size() << std::endl;
170-
perf.add(perf_prefix + "perform_action_ms", timer.elapsed_ms());
171163

172-
//Send the reply
173164
timer.start();
174165
ZMQNet::send(socket, strmsg);
175166
perf.add(perf_prefix + "send_to_front_ms", timer.elapsed_ms());

src/net/zmqme_net.cpp

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -134,24 +134,16 @@ void doWork(std::unordered_map<int,
134134
Message msg, msg_reply;
135135
msg.set_msg(strmsg, true);
136136

137-
msg_reply = msg.createReply();
138-
139-
timer.start();
140-
auto kit = payloads_t.find((MessageKind)msg.kind());
141-
if(kit == payloads_t.end()) throw std::runtime_error("ZMQMENet::doWork : No payload associated with the message kind provided (did you add the payload to the server?)");
142-
auto pit = kit->second.find((MessageType)msg.type());
143-
if(pit == kit->second.end()) throw std::runtime_error("ZMQMENet::doWork : No payload associated with the message type provided (did you add the payload to the server?)");
144-
perf.add(perf_prefix + "message_kind_type_lookup_ms", timer.elapsed_ms());
145-
146-
//Apply the payload
147137
timer.start();
148-
pit->second->action(msg_reply, msg);
149-
verboseStream << "ZMQMENet Worker thread " << thr_idx << " sending response of size " << msg_reply.data().size() << std::endl;
150-
perf.add(perf_prefix + "perform_action_ms", timer.elapsed_ms());
138+
NetInterface::find_and_perform_action(msg_reply, msg, payloads_t);
139+
perf.add(perf_prefix + "find_and_perform_action_ms", timer.elapsed_ms());
140+
141+
strmsg = msg_reply.data();
142+
verboseStream << "ZMQMENet Worker thread " << thr_idx << " sending response of size " << strmsg.size() << std::endl;
151143

152144
//Send the reply
153145
timer.start();
154-
ZMQNet::send(socket, msg_reply.data());
146+
ZMQNet::send(socket, strmsg);
155147
perf.add(perf_prefix + "send_to_front_ms", timer.elapsed_ms());
156148
} //while(<work in queue>)
157149

0 commit comments

Comments
 (0)