Skip to content

Commit eb9b7fa

Browse files
committed
Restored a number of things that got lost in the recent merge
1 parent 30a1c06 commit eb9b7fa

29 files changed

Lines changed: 351 additions & 100 deletions

app/Makefile.am

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,17 @@ bpfile_replay_LDADD = $(LDADD)
3232

3333

3434
if ENABLE_PROVDB
35-
bin_PROGRAMS += provdb_admin provdb_query
35+
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown
3636

3737
provdb_admin_SOURCES = provdb_admin.cpp
3838
provdb_admin_LDADD = $(LDADD)
3939

4040
provdb_query_SOURCES = provdb_query.cpp
4141
provdb_query_LDADD = $(LDADD)
4242

43+
provdb_shutdown_SOURCES = provdb_shutdown.cpp
44+
provdb_shutdown_LDADD = $(LDADD)
45+
4346
else
4447
echo "Provenance DB not being built"
4548
endif

app/driver.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
// #include "chimbuko/AD.hpp"
21
#include "chimbuko/chimbuko.hpp"
32
#include "chimbuko/verbose.hpp"
43
#include "chimbuko/util/string.hpp"
54
#include "chimbuko/util/commandLineParser.hpp"
5+
#include "chimbuko/util/error.hpp"
66
#include <chrono>
77
#include <cstdlib>
88

@@ -285,14 +285,17 @@ int main(int argc, char ** argv){
285285
}
286286
catch (const std::invalid_argument &e){
287287
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : caught invalid argument: " << e.what() << std::endl;
288+
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : caught invalid argument: ") + e.what()); //ensure errors also written to error logs
288289
error = true;
289290
}
290291
catch (const std::ios_base::failure &e){
291292
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : I/O base exception caught: " << e.what() << std::endl;
293+
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : I/O base exception caught: ") + e.what());
292294
error = true;
293295
}
294296
catch (const std::exception &e){
295297
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : Exception caught: " << e.what() << std::endl;
298+
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : Exception caught: ") + e.what());
296299
error = true;
297300
}
298301

app/provdb_admin.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ void pserver_goodbye(const tl::request& req) {
6666
progressStream << "ProvDB Admin: Pserver has said goodbye" << std::endl;
6767
}
6868

69+
bool cmd_shutdown = false; //true if a client has requested that the server shut down
70+
71+
void client_stop_rpc(const tl::request& req) {
72+
std::lock_guard<tl::mutex> lock(*mtx);
73+
cmd_shutdown = true;
74+
progressStream << "ProvDB Admin: Received shutdown request from client" << std::endl;
75+
}
76+
77+
78+
79+
6980

7081
struct ProvdbArgs{
7182
std::string ip;
@@ -143,6 +154,7 @@ int main(int argc, char** argv) {
143154
engine.define("client_goodbye",client_goodbye).disable_response();
144155
engine.define("pserver_hello",pserver_hello).disable_response();
145156
engine.define("pserver_goodbye",pserver_goodbye).disable_response();
157+
engine.define("stop_server",client_stop_rpc).disable_response();
146158

147159
std::string addr = (std::string)engine.self(); //ip and port of admin
148160

@@ -219,8 +231,10 @@ int main(int argc, char** argv) {
219231

220232
//If at least one client has previously connected but none are now connected, shutdown the server
221233
//If all clients disconnected we must also wait for the pserver to disconnect (if it is connected)
234+
235+
//If args.autoshutdown is disabled we can force shutdown via a "stop_server" RPC
222236
if(
223-
args.autoshutdown &&
237+
(args.autoshutdown || cmd_shutdown) &&
224238
( a_client_has_connected && connected.size() == 0 ) &&
225239
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) )
226240
){

app/provdb_shutdown.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#include "chimbuko/ad/ADProvenanceDBclient.hpp"
2+
3+
using namespace chimbuko;
4+
5+
int main(int argc, char** argv){
6+
#ifdef ENABLE_PROVDB
7+
if(argc != 2){
8+
std::cout << "Usage: provdb_shutdown <server address e.g. ofi+tcp;ofi_rxm://172.17.0.4:5000>" << std::endl;
9+
return 1;
10+
}
11+
ADProvenanceDBclient ad(0);
12+
ad.connect(argv[1], 1);
13+
ad.stopServer();
14+
ad.disconnect();
15+
16+
return 0;
17+
#else
18+
std::cout << "ProvDB not in use" << std::endl;
19+
return 1;
20+
#endif
21+
}
22+

app/pserver.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ int main (int argc, char ** argv){
131131
nlohmann::json in_p;
132132
in >> in_p;
133133
global_func_index_map.deserialize(in_p["func_index_map"]);
134-
param->assign(in_p["alg_params"].dump()); //param.assign(in_p["alg_params"].dump());
134+
param->assign(in_p["alg_params"].dump());
135135
}
136136

137137
#ifdef _USE_MPINET
@@ -181,8 +181,8 @@ int main (int argc, char ** argv){
181181
if(args.stat_outputdir.size()) std::cout << "(dir @ " << args.stat_outputdir << ")";
182182
}
183183

184-
net.add_payload(new NetPayloadUpdateParams(param, args.freeze_params)); //new NetPayloadUpdateParams(&param, args.freeze_params));
185-
net.add_payload(new NetPayloadGetParams(param)); //new NetPayloadGetParams(&param));
184+
net.add_payload(new NetPayloadUpdateParams(param, args.freeze_params));
185+
net.add_payload(new NetPayloadGetParams(param));
186186
net.add_payload(new NetPayloadUpdateAnomalyStats(&global_func_stats));
187187
net.add_payload(new NetPayloadUpdateCounterStats(&global_counter_stats));
188188
net.add_payload(new NetPayloadGlobalFunctionIndexMapBatched(&global_func_index_map));
@@ -227,7 +227,7 @@ int main (int argc, char ** argv){
227227
o.open(args.logdir + "/parameters.txt");
228228
if (o.is_open())
229229
{
230-
param->show(o); //param.show(o);
230+
param->show(o);
231231
o.close();
232232
}
233233
}
@@ -256,7 +256,7 @@ int main (int argc, char ** argv){
256256
if(!out.good()) throw std::runtime_error("Could not write anomaly algorithm parameters to the file provided");
257257
nlohmann::json out_p;
258258
out_p["func_index_map"] = global_func_index_map.serialize();
259-
out_p["alg_params"] = nlohmann::json::parse(param->serialize()); //param.serialize());
259+
out_p["alg_params"] = nlohmann::json::parse(param->serialize());
260260
out << out_p;
261261
}
262262

app/pshutdown.cpp

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,23 @@
1-
#include "chimbuko/net/zmq_net.hpp"
2-
#include "chimbuko/message.hpp"
1+
#include "chimbuko/verbose.hpp"
2+
#include "chimbuko/util/error.hpp"
3+
#include "chimbuko/ad/ADNetClient.hpp"
34

45
using namespace chimbuko;
56

6-
int main (int argc, char ** argv)
7-
{
8-
void * context;
9-
void * socket;
10-
std::string addr;
11-
Message msg;
12-
std::string strmsg;
13-
14-
if (argc > 1)
15-
addr = argv[1];
16-
17-
context = zmq_ctx_new();
18-
socket = zmq_socket(context, ZMQ_REQ);
19-
zmq_connect(socket, addr.c_str());
20-
21-
// test connection
22-
msg.clear();
23-
msg.set_info(-1, -1, MessageType::REQ_ECHO, MessageKind::DEFAULT);
24-
msg.set_msg("Hello!");
25-
26-
ZMQNet::send(socket, msg.data());
27-
28-
msg.clear();
29-
ZMQNet::recv(socket, strmsg);
30-
msg.set_msg(strmsg, true);
31-
32-
// if (msg.data_buffer().compare("Hello!>I am ZMQNET!") != 0)
33-
// {
34-
// std::cerr << "Connect error to parameter server (ZMQNET)!\n";
35-
// exit(1);
36-
// }
37-
38-
// shutdown
39-
zmq_send(socket, nullptr, 0, 0);
40-
41-
// finalize
42-
zmq_close(socket);
43-
zmq_ctx_term(context);
44-
45-
return EXIT_SUCCESS;
46-
}
7+
int main(int argc, char** argv){
8+
#ifdef _USE_ZMQNET
9+
if(argc != 2){
10+
std::cout << "Usage: pshutdown <server address e.g. tcp://localhost:5559>" << std::endl;
11+
return 1;
12+
}
13+
ADNetClient ad;
14+
ad.connect_ps(0,0,argv[1]);
15+
ad.stopServer();
16+
ad.disconnect_ps();
17+
18+
return 0;
19+
#else
20+
std::cout << "Not implemented for MPINET" << std::endl;
21+
return 1;
22+
#endif
23+
}

include/chimbuko/ad/ADEvent.hpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "chimbuko/ad/ADDefine.hpp"
33
#include "chimbuko/ad/ExecData.hpp"
4+
#include "chimbuko/ad/ADMetadataParser.hpp"
45
#include "chimbuko/util/map.hpp"
56
#include <string>
67
#include <vector>
@@ -164,6 +165,13 @@ namespace chimbuko {
164165
*/
165166
void linkCounterMap(const std::unordered_map<int, std::string>* m) { m_counterMap = m; }
166167

168+
169+
/**
170+
* @brief Optional: give the event manager knowledge of which threads are GPU threads, improves error checking
171+
*/
172+
void linkGPUthreadMap(const std::unordered_map<unsigned long, GPUvirtualThreadInfo> *m){ m_gpu_thread_Map = m; }
173+
174+
167175
/**
168176
* @brief Get the Func Map object
169177
*
@@ -273,6 +281,10 @@ namespace chimbuko {
273281
*/
274282
CallListMap_p_t* trimCallList(int n_keep_thread = 0);
275283

284+
/**
285+
* @brief Get the total number of function events in the call list over all pid/rid/tid
286+
*/
287+
size_t getCallListSize() const;
276288

277289
/**
278290
* @brief purge all function calls that are completed (i.e. a pair of ENTRY and EXIT events are observed)
@@ -318,6 +330,11 @@ namespace chimbuko {
318330
int m_eidx_comm_recv; /**< If previously seen, the eid corresponding to the comm recv event (-1 otherwise)*/
319331

320332

333+
/**
334+
* @brief Optional: give the event manager knowledge of which threads are GPU threads, improves error checking
335+
*/
336+
const std::unordered_map<unsigned long, GPUvirtualThreadInfo> *m_gpu_thread_Map;
337+
321338

322339
/**
323340
* @brief communication event stack. Once a function call has exited, all comms events are associated with that call and the stack is cleared

include/chimbuko/ad/ADNetClient.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ namespace chimbuko{
5858
* @param msg The message
5959
* @return The response message in serialized format. Use Message::set_msg( <serialized_msg>, true ) to unpack
6060
*/
61-
std::string send_and_receive(const Message &msg);
61+
std::string send_and_receive(const Message &msg) const;
6262

6363
/**
6464
* @brief Send a message to the parameter server and receive the response both as Message objects
@@ -90,6 +90,10 @@ namespace chimbuko{
9090
*/
9191
void * getZMQcontext(){ return m_context; }
9292

93+
/**
94+
* @brief Issue a stop command to the server. The server will then stop once all clients have disconnected and all messages processed
95+
*/
96+
void stopServer() const;
9397
#endif
9498

9599

include/chimbuko/ad/ADOutlier.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include "chimbuko/util/PerfStats.hpp"
1010
#include "chimbuko/util/Anomalies.hpp"
1111

12-
1312
namespace chimbuko {
1413
/**
1514
* @brief abstract class for anomaly detection algorithms

include/chimbuko/ad/ADProvenanceDBclient.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ namespace chimbuko{
9090
thallium::endpoint m_server; /**< Endpoint for provDB comms*/
9191
thallium::remote_procedure *m_client_hello; /**< RPC to register client with provDB */
9292
thallium::remote_procedure *m_client_goodbye; /**< RPC to deregister client with provDB */
93+
thallium::remote_procedure *m_stop_server; /** RPC to shutdown server*/
9394
bool m_perform_handshake; /**< Optionally disable the client->server registration */
9495

9596
PerfStats *m_stats; /**< Performance data gathering*/
@@ -233,6 +234,12 @@ namespace chimbuko{
233234
*/
234235
size_t getNoutstandingAsyncReqs(){ return anom_send_man.getNoutstanding(); }
235236

237+
238+
/**
239+
* @brief Issue a stop request to the server
240+
*/
241+
void stopServer() const;
242+
236243
};
237244

238245
};

0 commit comments

Comments
 (0)