Skip to content

Commit abfeca0

Browse files
committed
Merge branch 'develop' into ckelly_develop
Conflicts: scripts/launch/run_services.sh src/chimbuko.cpp
2 parents 1c1e05f + 5ab15fb commit abfeca0

61 files changed

Lines changed: 3477 additions & 1009 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

app/Makefile.am

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,14 @@ bpfile_replay_LDADD = $(LDADD)
3232

3333

3434
if ENABLE_PROVDB
35-
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown
35+
bin_PROGRAMS += provdb_admin provdb_query
3636

3737
provdb_admin_SOURCES = provdb_admin.cpp
3838
provdb_admin_LDADD = $(LDADD)
3939

4040
provdb_query_SOURCES = provdb_query.cpp
4141
provdb_query_LDADD = $(LDADD)
4242

43-
provdb_shutdown_SOURCES = provdb_shutdown.cpp
44-
provdb_shutdown_LDADD = $(LDADD)
45-
4643
else
4744
echo "Provenance DB not being built"
4845
endif

app/driver.cpp

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
// #include "chimbuko/AD.hpp"
12
#include "chimbuko/chimbuko.hpp"
23
#include "chimbuko/verbose.hpp"
34
#include "chimbuko/util/string.hpp"
45
#include "chimbuko/util/commandLineParser.hpp"
5-
#include "chimbuko/util/error.hpp"
66
#include <chrono>
77
#include <cstdlib>
88

@@ -60,12 +60,15 @@ struct setLoggingHeadRankArg: public optionalCommandLineArgBase<ChimbukoParams>{
6060
}
6161
};
6262

63-
63+
6464

6565
optionalArgsParser & getOptionalArgsParser(){
6666
static bool initialized = false;
6767
static optionalArgsParser p;
6868
if(!initialized){
69+
addOptionalCommandLineArg(p, ad_algorithm, "Set an AD algorithm to use: hbos or sstd.");
70+
addOptionalCommandLineArg(p, hbos_threshold, "Set Threshold for HBOS anomaly detection filter.");
71+
addOptionalCommandLineArg(p, hbos_use_global_threshold, "Set true to use a global threshold in HBOS algorithm. Dafault is true.");
6972
addOptionalCommandLineArg(p, program_idx, "Set the index associated with the instrumented program. Use to label components of a workflow. (default 0)");
7073
addOptionalCommandLineArg(p, outlier_sigma, "Set the number of standard deviations that defines an anomalous event (default 6)");
7174
addOptionalCommandLineArg(p, pserver_addr, "Set the address of the parameter server. If empty (default) the pserver will not be used.");
@@ -74,9 +77,9 @@ optionalArgsParser & getOptionalArgsParser(){
7477
addOptionalCommandLineArg(p, anom_win_size, "When anomaly data are recorded a window of this size (in units of function execution events) around the anomalous event are also recorded (default 10)");
7578
addOptionalCommandLineArg(p, prov_outputpath, "Output provenance data to this directory. Can be used in place of or in conjunction with the provenance database. An empty string \"\" (default) disables this output");
7679
#ifdef ENABLE_PROVDB
77-
addOptionalCommandLineArg(p, provdb_addr, "Address of the provenance database. If empty (default) the provenance DB will not be used.\nHas format \"ofi+tcp;ofi_rxm://${IP_ADDR}:${PORT}\". Should also accept \"tcp://${IP_ADDR}:${PORT}\"");
78-
addOptionalCommandLineArg(p, nprovdb_shards, "Number of provenance database shards. Clients connect to shards round-robin by rank (default 1)");
79-
#endif
80+
addOptionalCommandLineArg(p, provdb_addr, "Address of the provenance database. If empty (default) the provenance DB will not be used.\nHas format \"ofi+tcp;ofi_rxm://${IP_ADDR}:${PORT}\". Should also accept \"tcp://${IP_ADDR}:${PORT}\"");
81+
addOptionalCommandLineArg(p, nprovdb_shards, "Number of provenance database shards. Clients connect to shards round-robin by rank (default 1)");
82+
#endif
8083
#ifdef _PERF_METRIC
8184
addOptionalCommandLineArg(p, perf_outputpath, "Output path for AD performance monitoring data. If an empty string (default) no output is written.");
8285
addOptionalCommandLineArg(p, perf_step, "How frequently (in IO steps) the performance data is dumped (default 10)");
@@ -98,7 +101,7 @@ optionalArgsParser & getOptionalArgsParser(){
98101
};
99102

100103
void printHelp(){
101-
std::cout << "Usage: driver <Trace engine type> <Trace directory> <Trace file prefix> <Options>\n"
104+
std::cout << "Usage: driver <Trace engine type> <Trace directory> <Trace file prefix> <Options>\n"
102105
<< "Where <Trace engine type> : BPFile or SST\n"
103106
<< " <Trace directory> : The directory in which the BPFile or SST file is located\n"
104107
<< " <Trace file prefix> : The prefix of the file (the trace file name without extension e.g. \"tau-metrics-mybinary\" for \"tau-metrics-mybinary.bp\")\n"
@@ -116,14 +119,16 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
116119
// Parse command line arguments (cf chimbuko.hpp for detailed description of parameters)
117120
// -----------------------------------------------------------------------
118121
ChimbukoParams params;
119-
122+
120123
//Parameters for the connection to the instrumented binary trace output
121124
params.trace_engineType = argv[1]; // BPFile or SST
122125
params.trace_data_dir = argv[2]; // *.bp location
123126
std::string bp_prefix = argv[3]; // bp file prefix (e.g. tau-metrics-[nwchem])
124127

125128
//The remainder are optional arguments. Enable using the appropriate command line switch
126-
129+
params.ad_algorithm = "hbos";
130+
params.hbos_threshold = 0.99;
131+
params.hbos_use_global_threshold = true;
127132
params.pserver_addr = ""; //don't use pserver by default
128133
params.hpserver_nthr = 1;
129134
params.outlier_sigma = 6.0; // anomaly detection algorithm parameter
@@ -141,7 +146,7 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
141146
params.rank = -1234; //assign an invalid value as default for use below
142147
params.outlier_statistic = "exclusive_runtime";
143148
params.step_report_freq = 1;
144-
149+
145150
getOptionalArgsParser().parse(params, argc-4, (const char**)(argv+4));
146151

147152
//By default assign the rank index of the trace data as the MPI rank of the AD process
@@ -158,7 +163,7 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
158163
//Thus we need to obtain the input data rank also from the command line and modify the filename accordingly
159164
if(params.override_rank)
160165
params.trace_inputFile = bp_prefix + "-" + std::to_string(overrideRankArg::input_data_rank()) + ".bp";
161-
166+
162167
//If neither the provenance database or the provenance output path are set, default to outputting to pwd
163168
if(params.prov_outputpath.size() == 0
164169
#ifdef ENABLE_PROVDB
@@ -170,21 +175,21 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
170175

171176
return params;
172177
}
173-
174-
178+
179+
175180

176181

177182
int main(int argc, char ** argv){
178183
if(argc == 1 || (argc == 2 && std::string(argv[1]) == "-help") ){
179184
printHelp();
180185
return 0;
181186
}
182-
187+
183188
assert( MPI_Init(&argc, &argv) == MPI_SUCCESS );
184189

185190
int mpi_world_rank, mpi_world_size;
186191
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_world_rank);
187-
MPI_Comm_size(MPI_COMM_WORLD, &mpi_world_size);
192+
MPI_Comm_size(MPI_COMM_WORLD, &mpi_world_size);
188193

189194
//Parse environment variables
190195
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE"))
@@ -203,7 +208,7 @@ int main(int argc, char ** argv){
203208

204209
bool error = false;
205210

206-
try
211+
try
207212
{
208213
//Instantiate Chimbuko
209214
Chimbuko driver(params);
@@ -215,24 +220,24 @@ int main(int argc, char ** argv){
215220
unsigned long total_n_outliers = 0, n_outliers = 0;
216221
unsigned long total_processing_time = 0, processing_time = 0;
217222
unsigned long long n_func_events = 0, n_comm_events = 0, n_counter_events = 0;
218-
unsigned long long total_n_func_events = 0, total_n_comm_events = 0, total_n_counter_events = 0;
223+
unsigned long long total_n_func_events = 0, total_n_comm_events = 0, total_n_counter_events = 0;
219224
high_resolution_clock::time_point t1, t2;
220225

221226
// -----------------------------------------------------------------------
222227
// Start analysis
223228
// -----------------------------------------------------------------------
224-
headProgressStream(params.rank) << "Driver rank " << params.rank
225-
<< ": analysis start " << (driver.use_ps() ? "with": "without")
229+
headProgressStream(params.rank) << "Driver rank " << params.rank
230+
<< ": analysis start " << (driver.use_ps() ? "with": "without")
226231
<< " pserver" << std::endl;
227232

228233
t1 = high_resolution_clock::now();
229-
driver.run(n_func_events,
234+
driver.run(n_func_events,
230235
n_comm_events,
231236
n_counter_events,
232237
n_outliers,
233238
frames);
234239
t2 = high_resolution_clock::now();
235-
240+
236241
if (params.rank == 0) {
237242
headProgressStream(params.rank) << "Driver rank " << params.rank << ": analysis done!\n";
238243
driver.show_status(true);
@@ -267,7 +272,7 @@ int main(int argc, char ** argv){
267272
total_n_comm_events = global_measures[1];
268273
total_n_counter_events = global_measures[2];
269274
}
270-
275+
271276
headProgressStream(params.rank) << "Driver rank " << params.rank << ": Final report\n"
272277
<< "Avg. num. frames over MPI ranks : " << (double)total_frames/(double)mpi_world_size << "\n"
273278
<< "Avg. processing time over MPI ranks : " << (double)total_processing_time/(double)mpi_world_size << " msec\n"
@@ -280,17 +285,14 @@ int main(int argc, char ** argv){
280285
}
281286
catch (const std::invalid_argument &e){
282287
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : caught invalid argument: " << e.what() << std::endl;
283-
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : caught invalid argument: ") + e.what()); //ensure errors also written to error logs
284288
error = true;
285289
}
286290
catch (const std::ios_base::failure &e){
287291
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : I/O base exception caught: " << e.what() << std::endl;
288-
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : I/O base exception caught: ") + e.what());
289292
error = true;
290293
}
291294
catch (const std::exception &e){
292295
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : Exception caught: " << e.what() << std::endl;
293-
if(params.err_outputpath.size()) recoverable_error(std::string("Driver : Exception caught: ") + e.what());
294296
error = true;
295297
}
296298

app/provdb_admin.cpp

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,6 @@ void pserver_goodbye(const tl::request& req) {
6666
progressStream << "ProvDB Admin: Pserver has said goodbye" << std::endl;
6767
}
6868

69-
bool cmd_shutdown = false; //true if a client has requested that the server shut down
70-
71-
void client_stop_rpc(const tl::request& req) {
72-
std::lock_guard<tl::mutex> lock(*mtx);
73-
cmd_shutdown = true;
74-
progressStream << "ProvDB Admin: Received shutdown request from client" << std::endl;
75-
}
76-
77-
78-
79-
8069

8170
struct ProvdbArgs{
8271
std::string ip;
@@ -154,7 +143,6 @@ int main(int argc, char** argv) {
154143
engine.define("client_goodbye",client_goodbye).disable_response();
155144
engine.define("pserver_hello",pserver_hello).disable_response();
156145
engine.define("pserver_goodbye",pserver_goodbye).disable_response();
157-
engine.define("stop_server",client_stop_rpc).disable_response();
158146

159147
std::string addr = (std::string)engine.self(); //ip and port of admin
160148

@@ -231,10 +219,8 @@ int main(int argc, char** argv) {
231219

232220
//If at least one client has previously connected but none are now connected, shutdown the server
233221
//If all clients disconnected we must also wait for the pserver to disconnect (if it is connected)
234-
235-
//If args.autoshutdown is disabled we can force shutdown via a "stop_server" RPC
236222
if(
237-
(args.autoshutdown || cmd_shutdown) &&
223+
args.autoshutdown &&
238224
( a_client_has_connected && connected.size() == 0 ) &&
239225
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) )
240226
){

app/provdb_shutdown.cpp

Lines changed: 0 additions & 22 deletions
This file was deleted.

app/pserver.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
#include <chimbuko/param/sstd_param.hpp>
1818
#include <chimbuko/util/commandLineParser.hpp>
19+
#include <chimbuko/util/error.hpp>
1920
#include <fstream>
2021
#include "chimbuko/verbose.hpp"
2122

2223

24+
2325
using namespace chimbuko;
2426

2527
struct pserverArgs{
@@ -40,6 +42,7 @@ struct pserverArgs{
4042
int stat_send_freq;
4143

4244
std::string stat_outputdir;
45+
std::string ad;
4346

4447
#ifdef _USE_ZMQNET
4548
int max_pollcyc_msg;
@@ -50,8 +53,8 @@ struct pserverArgs{
5053
#ifdef ENABLE_PROVDB
5154
std::string provdb_addr;
5255
#endif
53-
54-
pserverArgs(): nt(-1), logdir("."), ws_addr(""), load_params_set(false), save_params_set(false), freeze_params(false), stat_send_freq(1000), stat_outputdir(""), port(5559)
56+
57+
pserverArgs(): ad("hbos"), nt(-1), logdir("."), ws_addr(""), load_params_set(false), save_params_set(false), freeze_params(false), stat_send_freq(1000), stat_outputdir(""), port(5559)
5558
#ifdef _USE_ZMQNET
5659
, max_pollcyc_msg(10), zmq_io_thr(1), autoshutdown(true)
5760
#endif
@@ -64,6 +67,7 @@ struct pserverArgs{
6467
static bool init = false;
6568
static commandLineParser<pserverArgs> p;
6669
if(!init){
70+
addOptionalCommandLineArg(p, ad, "Set AD algorithm to use.");
6771
addOptionalCommandLineArg(p, nt, "Set the number of RPC handler threads (max-2 by default)");
6872
addOptionalCommandLineArg(p, logdir, "Set the output log directory (default: job directory)");
6973
addOptionalCommandLineArg(p, port, "Set the pserver port (default: 5559)");
@@ -79,7 +83,7 @@ struct pserverArgs{
7983
addOptionalCommandLineArg(p, autoshutdown, "If enabled the pserver will automatically shutdown when all clients have disconnected (default: true)");
8084
#endif
8185
#ifdef ENABLE_PROVDB
82-
addOptionalCommandLineArg(p, provdb_addr, "Address of the provenance database. If empty (default) the global function and counter statistics will not be send to the provenance DB.\nHas format \"ofi+tcp;ofi_rxm://${IP_ADDR}:${PORT}\". Should also accept \"tcp://${IP_ADDR}:${PORT}\"");
86+
addOptionalCommandLineArg(p, provdb_addr, "Address of the provenance database. If empty (default) the global function and counter statistics will not be send to the provenance DB.\nHas format \"ofi+tcp;ofi_rxm://${IP_ADDR}:${PORT}\". Should also accept \"tcp://${IP_ADDR}:${PORT}\"");
8387
#endif
8488

8589
init = true;
@@ -107,13 +111,18 @@ int main (int argc, char ** argv){
107111
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE")){
108112
progressStream << "Pserver: Enabling verbose debug output" << std::endl;
109113
enableVerboseLogging() = true;
110-
}
114+
}
111115

112-
SstdParam param; //global collection of parameters used to identify anomalies
116+
ParamInterface * param = ParamInterface::set_AdParam(args.ad); //"hbos"); //sstd"); //HbosParam param; //global collection of parameters used to identify anomalies
117+
if (param == nullptr) {
118+
fatal_error("INCORRECT algorithm for AdParam: Not Found. Choose sstd or hbos.");
119+
// verboseStream << "INCORRECT algorithm for AdParam: Not Found. Choose sstd or hbos." << std::endl;
120+
// exit(EXIT_FAILURE);
121+
}
113122
GlobalAnomalyStats global_func_stats; //global anomaly statistics
114123
GlobalCounterStats global_counter_stats; //global counter statistics
115124
PSglobalFunctionIndexMap global_func_index_map; //mapping of function name to global index
116-
125+
117126
//Optionally load previously-computed AD algorithm statistics
118127
if(args.load_params_set){
119128
progressStream << "Pserver: Loading parameters from input file " << args.load_params << std::endl;
@@ -122,7 +131,7 @@ int main (int argc, char ** argv){
122131
nlohmann::json in_p;
123132
in >> in_p;
124133
global_func_index_map.deserialize(in_p["func_index_map"]);
125-
param.assign(in_p["alg_params"].dump());
134+
param->assign(in_p["alg_params"].dump()); //param.assign(in_p["alg_params"].dump());
126135
}
127136

128137
#ifdef _USE_MPINET
@@ -172,8 +181,8 @@ int main (int argc, char ** argv){
172181
if(args.stat_outputdir.size()) std::cout << "(dir @ " << args.stat_outputdir << ")";
173182
}
174183

175-
net.add_payload(new NetPayloadUpdateParams(&param, args.freeze_params));
176-
net.add_payload(new NetPayloadGetParams(&param));
184+
net.add_payload(new NetPayloadUpdateParams(param, args.freeze_params)); //new NetPayloadUpdateParams(&param, args.freeze_params));
185+
net.add_payload(new NetPayloadGetParams(param)); //new NetPayloadGetParams(&param));
177186
net.add_payload(new NetPayloadUpdateAnomalyStats(&global_func_stats));
178187
net.add_payload(new NetPayloadUpdateCounterStats(&global_counter_stats));
179188
net.add_payload(new NetPayloadGlobalFunctionIndexMapBatched(&global_func_index_map));
@@ -206,7 +215,7 @@ int main (int argc, char ** argv){
206215
progressStream << "Pserver: sending final statistics to provDB" << std::endl;
207216
provdb_client.sendMultipleData(global_func_stats.collect_func_data(), GlobalProvenanceDataType::FunctionStats);
208217
provdb_client.sendMultipleData(global_counter_stats.get_json_state(), GlobalProvenanceDataType::CounterStats);
209-
progressStream << "Pserver: disconnecting from provDB" << std::endl;
218+
progressStream << "Pserver: disconnecting from provDB" << std::endl;
210219
provdb_client.disconnect();
211220
}
212221
#endif
@@ -218,7 +227,7 @@ int main (int argc, char ** argv){
218227
o.open(args.logdir + "/parameters.txt");
219228
if (o.is_open())
220229
{
221-
param.show(o);
230+
param->show(o); //param.show(o);
222231
o.close();
223232
}
224233
}
@@ -242,12 +251,12 @@ int main (int argc, char ** argv){
242251

243252
//Optionally save the final AD algorithm parameters
244253
if(args.save_params_set){
245-
progressStream << "PServer: Saving parameters to output file " << args.save_params << std::endl;
254+
progressStream << "PServer: Saving parameters to output file " << args.save_params << std::endl;
246255
std::ofstream out(args.save_params);
247256
if(!out.good()) throw std::runtime_error("Could not write anomaly algorithm parameters to the file provided");
248257
nlohmann::json out_p;
249258
out_p["func_index_map"] = global_func_index_map.serialize();
250-
out_p["alg_params"] = nlohmann::json::parse(param.serialize());
259+
out_p["alg_params"] = nlohmann::json::parse(param->serialize()); //param.serialize());
251260
out << out_p;
252261
}
253262

0 commit comments

Comments
 (0)