Skip to content

Commit 3d5ffe2

Browse files
committed
Merge branch 'ckelly_develop'
2 parents 1448eb1 + 8bd74c0 commit 3d5ffe2

123 files changed

Lines changed: 8522 additions & 2535 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Makefile.am

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
11
SUBDIRS = include src app test sphinx 3rdparty scripts sim
22
nobase_include_HEADERS = chimbuko_config.h
33

4+
install-provdb-python:
5+
python3 -m pip install --user $(top_srcdir)/scripts/provdb_python/
6+
#python3 -m pip install --prefix $(prefix) $(top_srcdir)/scripts/provdb_python/
7+
8+
uninstall-provdb-python:
9+
python3 -m pip uninstall -y provdb-python
10+
#Below are needed if installing into prefixc becase pip doesn't allow uninstall of packages installed with --prefix?????!?!? (I HATE PYTHON)
11+
# rm -rf $(prefix)/lib/python3.6
12+
# rm $(prefix)/bin/provdb-python

app/bpfile_replay.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ int main(int argc, char** argv){
7575
}
7676

7777

78-
adios2::ADIOS ad = adios2::ADIOS(adios2::DebugON);
78+
adios2::ADIOS ad = adios2::ADIOS();
7979

8080
adios2::IO io_out = ad.DeclareIO("writer");
8181

app/driver.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ optionalArgsParser & getOptionalArgsParser(){
8888
addOptionalCommandLineArg(p, provdb_addr_dir, "Directory in which the provenance database outputs its address files. If empty (default) the provenance DB will not be used.");
8989
addOptionalCommandLineArg(p, nprovdb_shards, "Number of provenance database shards. Clients connect to shards round-robin by rank (default 1)");
9090
addOptionalCommandLineArg(p, nprovdb_instances, "Number of provenance database instances. Shards are divided uniformly over instances. (default 1)");
91+
addOptionalCommandLineArg(p, provdb_mercury_auth_key, "Set the Mercury authorization key for connection to the provDB (default \"\")");
9192
#endif
9293
#ifdef _PERF_METRIC
9394
addOptionalCommandLineArg(p, perf_outputpath, "Output path for AD performance monitoring data. If an empty string (default) no output is written.");

app/pclient_stats.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,14 @@ int main (int argc, char** argv)
8080
int n_anomalies = std::max(0, (int)dist(generator));
8181
AnomalyData d(0, rank, step, 0, 0, n_anomalies);
8282

83-
ADLocalFuncStatistics::State state;
84-
state.anomaly = d;
83+
ADLocalFuncStatistics fstat;
84+
fstat.setAnomalyData(d);
8585

8686
// create message
8787
msg.clear();
8888
msg.set_info(rank, 0, MessageType::REQ_ADD, MessageKind::ANOMALY_STATS, step);
8989
msg.set_msg(
90-
state.serialize_cerealpb(), false
91-
//nlohmann::json::object({{"anomaly", d.get_json()}}).dump(), false
90+
fstat.net_serialize(), false
9291
);
9392

9493
#ifdef _USE_MPINET

app/provdb_admin.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,15 @@ struct ProvdbArgs{
141141
bool db_in_mem; //database is in-memory not written to disk, for testing
142142
std::string db_base_config;
143143
std::string db_margo_config;
144-
144+
std::string db_mercury_auth_key;
145+
145146
bool db_use_aggregator; /**< Use the aggregator backend*/
146147
int db_batch_size; /**< Batch size for "aggregator" backend*/
147148

148149
bool db_bypass_unqlite;
149150

150151
ProvdbArgs(): engine("ofi+tcp"), autoshutdown(true), server_instance(0), ninstances(1), nshards(1), db_type("unqlite"), db_commit_freq(10000), db_write_dir("."), db_in_mem(false), db_base_config(""), db_margo_config(""),
151-
db_use_aggregator(false), db_batch_size(64), db_bypass_unqlite(true){}
152+
db_use_aggregator(false), db_batch_size(64), db_bypass_unqlite(true), db_mercury_auth_key(""){}
152153
};
153154

154155

@@ -176,6 +177,7 @@ int main(int argc, char** argv) {
176177
addOptionalCommandLineArg(parser, db_in_mem, "Use an in-memory database rather than writing to disk (*unqlite backend only*) (default false)");
177178
addOptionalCommandLineArg(parser, db_base_config, "Provide the *absolute path* to a JSON file to use as the base configuration of the Sonata databases. The database path will be appended automatically (default \"\" - not used)");
178179
addOptionalCommandLineArg(parser, db_margo_config, "Provide the *absolute path* to a JSON file containing the Margo configuration (default \"\" - not used)");
180+
addOptionalCommandLineArg(parser, db_mercury_auth_key, "Provide an authentication key for the Mercury configuration (default \"\" - not used)");
179181
addOptionalCommandLineArg(parser, db_use_aggregator, "Use the \"aggregator\" backend layer (default false)");
180182
addOptionalCommandLineArg(parser, db_batch_size, "Provide the batch size for the \"aggregator\" backend if in use (default 64)");
181183
addOptionalCommandLineArg(parser, db_bypass_unqlite, "Use Sonata's bypass method for faster unqlite stores (default true)");
@@ -244,7 +246,12 @@ int main(int argc, char** argv) {
244246

245247
free(config); //yuck c-strings
246248
margo_finalize(margo_id);
247-
249+
250+
//Apply auth_key if provided
251+
if(args.db_mercury_auth_key.size()){
252+
config_j["mercury"]["auth_key"] = args.db_mercury_auth_key;
253+
}
254+
248255
//Initial number of pools should be 1: the primary pool
249256
assert(config_j["argobots"]["pools"].size() == 1);
250257

@@ -439,16 +446,19 @@ int main(int argc, char** argv) {
439446
commit_timer_start = Clock::now();
440447
}
441448

442-
//If at least one client has previously connected but none are now connected, shutdown the server
443-
//If all clients disconnected we must also wait for the pserver to disconnect (if it is connected)
449+
bool do_break = false;
450+
451+
//Shut down if told to, as long as no clients are still connected
452+
//Force shutdown is called via a "stop_server" RPC
453+
if(cmd_shutdown && connected.size() == 0 && !pserver_connected && !committer_connected){
454+
do_break = true;
455+
}
456+
//If using auto-shutdown, we wait until a) at least one client has previously connected b) no clients are currently connected c) The pserver and committer are not connected (their connection is optional)
457+
if(args.autoshutdown && a_client_has_connected && connected.size() == 0 && !pserver_connected && !committer_connected){
458+
do_break = true;
459+
}
444460

445-
//If args.autoshutdown is disabled we can force shutdown via a "stop_server" RPC
446-
if(
447-
(args.autoshutdown || cmd_shutdown) &&
448-
( a_client_has_connected && connected.size() == 0 ) &&
449-
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) ) &&
450-
( !committer_has_connected || (committer_has_connected && !committer_connected) )
451-
){
461+
if(do_break){
452462
PSprogressStream << "detected all clients disconnected, shutting down" << std::endl;
453463
#ifdef ENABLE_MARGO_STATE_DUMP
454464
margo_dump("margo_dump_all_client_disconnected." + std::to_string(instance));

app/provdb_commit.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include<chrono>
33
#include<thread>
44
#include<cassert>
5+
#include<csignal>
56
#include<chimbuko/ad/ADProvenanceDBengine.hpp>
67
#include<chimbuko/verbose.hpp>
78
#include<chimbuko/util/string.hpp>
@@ -48,8 +49,9 @@ struct Args{
4849
int ninstances;
4950
int nshards;
5051
int freq_ms;
51-
52-
Args(): addr_file_dir("."), instance(0), ninstances(1), nshards(1), freq_ms(30000){}
52+
std::string provdb_mercury_auth_key; //An authorization key for initializing Mercury (optional, default "")
53+
54+
Args(): addr_file_dir("."), instance(0), ninstances(1), nshards(1), freq_ms(30000), provdb_mercury_auth_key(""){}
5355
};
5456

5557

@@ -62,7 +64,8 @@ int main(int argc, char** argv){
6264
addOptionalCommandLineArg(parser, ninstances, "Specify the number of server instances (default 1)");
6365
addOptionalCommandLineArg(parser, nshards, "Specify the total number of database shards (default 1)");
6466
addOptionalCommandLineArg(parser, freq_ms, "Specify the frequency in ms at which commit is called (default 30000)");
65-
67+
addOptionalCommandLineArg(parser, provdb_mercury_auth_key, "Set the Mercury authorization key for connection to the provDB (default \"\")");
68+
6669
if(argc-1 < parser.nMandatoryArgs() || (argc == 2 && std::string(argv[1]) == "-help")){
6770
parser.help(std::cout);
6871
return 0;
@@ -77,6 +80,11 @@ int main(int argc, char** argv){
7780
return 0;
7881
}
7982

83+
if(args.provdb_mercury_auth_key != ""){
84+
CprogressStream << "setting Mercury authorization key to \"" << args.provdb_mercury_auth_key << "\"" << std::endl;
85+
ADProvenanceDBengine::setMercuryAuthorizationKey(args.provdb_mercury_auth_key);
86+
}
87+
8088
ProvDBsetup setup(args.nshards, args.ninstances);
8189
std::string addr = setup.getInstanceAddress(args.instance, args.addr_file_dir);
8290
int instance_shard_offset = setup.getShardOffsetInstance(args.instance);

app/pserver.cpp

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ struct pserverArgs{
5454

5555
#ifdef ENABLE_PROVDB
5656
std::string provdb_addr_dir;
57+
std::string provdb_mercury_auth_key; //An authorization key for initializing Mercury (optional, default "")
5758
#endif
5859

5960
std::string prov_outputpath;
@@ -63,7 +64,7 @@ struct pserverArgs{
6364
, max_pollcyc_msg(10), zmq_io_thr(1), autoshutdown(true)
6465
#endif
6566
#ifdef ENABLE_PROVDB
66-
, provdb_addr_dir("")
67+
, provdb_addr_dir(""), provdb_mercury_auth_key("")
6768
#endif
6869
{}
6970

@@ -88,6 +89,7 @@ struct pserverArgs{
8889
#endif
8990
#ifdef ENABLE_PROVDB
9091
addOptionalCommandLineArg(p, provdb_addr_dir, "The directory containing the address file written out by the provDB server. An empty string will disable the connection to the global DB. (default empty, disabled)");
92+
addOptionalCommandLineArg(p, provdb_mercury_auth_key, "Set the Mercury authorization key for connection to the provDB (default \"\")");
9193
#endif
9294
addOptionalCommandLineArg(p, prov_outputpath, "Output global provenance data to this directory. Can be used in place of or in conjunction with the provenance database. An empty string \"\" (default) disables this output");
9395
addOptionalCommandLineArg(p, model_update_freq, "The frequency in ms at which the global AD model is updated (default 1000ms)");
@@ -136,14 +138,7 @@ int main (int argc, char ** argv){
136138
//Optionally load previously-computed AD algorithm statistics
137139
if(args.load_params_set){
138140
progressStream << "Pserver: Loading parameters from input file " << args.load_params << std::endl;
139-
std::ifstream in(args.load_params);
140-
if(!in.good()) throw std::runtime_error("Could not load anomaly algorithm parameters from the file provided");
141-
nlohmann::json in_p;
142-
in >> in_p;
143-
global_func_index_map.deserialize(in_p["func_index_map"]);
144-
145-
//When the global model is updated the previous model is discarded. Thus if we want to bring in params from a previous run we need to set the state of *one* of the worker threads (which are never flushed)
146-
param.updateWorkerModel(in_p["alg_params"].dump(),0);
141+
restoreModel(global_func_index_map, param, args.load_params);
147142
}
148143

149144
//Start the aggregator thread for the global model
@@ -165,6 +160,11 @@ int main (int argc, char ** argv){
165160
PSstatSender stat_sender(args.stat_send_freq);
166161

167162
#ifdef ENABLE_PROVDB
163+
if(args.provdb_mercury_auth_key != ""){
164+
progressStream << "Pserver: setting Mercury authorization key to \"" << args.provdb_mercury_auth_key << "\"" << std::endl;
165+
ADProvenanceDBengine::setMercuryAuthorizationKey(args.provdb_mercury_auth_key);
166+
}
167+
168168
PSProvenanceDBclient provdb_client;
169169
#endif
170170

@@ -299,12 +299,7 @@ int main (int argc, char ** argv){
299299
//Optionally save the final AD algorithm parameters
300300
if(args.save_params_set){
301301
progressStream << "PServer: Saving parameters to output file " << args.save_params << std::endl;
302-
std::ofstream out(args.save_params);
303-
if(!out.good()) throw std::runtime_error("Could not write anomaly algorithm parameters to the file provided");
304-
nlohmann::json out_p;
305-
out_p["func_index_map"] = global_func_index_map.serialize();
306-
out_p["alg_params"] = nlohmann::json::parse(param.getSerializedGlobalModel()); //todo - store in human readable format rather than net-serialize format
307-
out << out_p;
302+
writeModel(args.save_params, global_func_index_map, param);
308303
}
309304

310305
progressStream << "Pserver: finished" << std::endl;

app/sst_view.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ int main(int argc, char** argv){
4848
adios2::IO io;
4949
adios2::Engine eng;
5050

51-
ad = adios2::ADIOS(adios2::DebugON);
51+
ad = adios2::ADIOS();
5252
io = ad.DeclareIO("tau-metrics");
5353
if(!offline) io.SetEngine("SST");
5454
io.SetParameters({
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
benchmark: benchmark.cpp
2+
mpic++ -std=c++17 -O3 -g -I/src/develop2/PerformanceAnalysis/test/unit_tests -I/install/AD/develop2/include -I/install/AD/develop2/include/chimbuko/3rdparty -L/install/AD/develop2/lib benchmark.cpp -o benchmark -lchimbuko -lstdc++fs
3+
benchmark_tau: benchmark.cpp
4+
tau_cxx.sh -std=c++17 -O3 -g -I/src/develop2/PerformanceAnalysis/test/unit_tests -I/install/AD/develop2/include -I/install/AD/develop2/include/chimbuko/3rdparty -L/install/AD/develop2/lib benchmark.cpp -o benchmark -lchimbuko -lstdc++fs
5+
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
//A fake AD that sends data to the pserver at a regular cadence
2+
#include<mpi.h>
3+
#include<chimbuko/ad/ADNetClient.hpp>
4+
#include<chimbuko/ad/utils.hpp>
5+
#include<chimbuko/util/commandLineParser.hpp>
6+
#include<chimbuko/param/sstd_param.hpp>
7+
#include<chimbuko/param/hbos_param.hpp>
8+
#include<chimbuko/param/copod_param.hpp>
9+
#include<chimbuko/ad/ADEvent.hpp>
10+
#include<chimbuko/util/Anomalies.hpp>
11+
#include<chimbuko/ad/ADLocalFuncStatistics.hpp>
12+
#include<chimbuko/ad/ADLocalCounterStatistics.hpp>
13+
#include<chimbuko/ad/ADLocalAnomalyMetrics.hpp>
14+
#include<chimbuko/ad/ADcombinedPSdata.hpp>
15+
#include<chimbuko/ad/ADglobalFunctionIndexMap.hpp>
16+
#include<chimbuko/verbose.hpp>
17+
#include "gtest/gtest.h"
18+
#include<unit_test_common.hpp>
19+
20+
using namespace chimbuko;
21+
22+
struct Args{
23+
int nfuncs;
24+
std::string algorithm;
25+
int hbos_bins;
26+
int cycles;
27+
28+
Args(){
29+
nfuncs = 100;
30+
algorithm = "hbos";
31+
hbos_bins = 20;
32+
cycles = 10000;
33+
}
34+
};
35+
36+
int main(int argc, char **argv){
37+
MPI_Init(&argc, &argv);
38+
39+
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE")){
40+
std::cout << "Enabling verbose debug output" << std::endl;
41+
enableVerboseLogging() = true;
42+
}
43+
44+
commandLineParser<Args> cmdline;
45+
addOptionalCommandLineArgDefaultHelpString(cmdline, cycles);
46+
addOptionalCommandLineArgDefaultHelpString(cmdline, nfuncs);
47+
addOptionalCommandLineArgDefaultHelpString(cmdline, algorithm); //algorithm, default "sstd"
48+
addOptionalCommandLineArgDefaultHelpString(cmdline, hbos_bins); //default 20
49+
50+
if(argc == 1 || (argc == 2 && std::string(argv[1]) == "-help")){
51+
cmdline.help();
52+
MPI_Finalize();
53+
return 0;
54+
}
55+
56+
Args args;
57+
cmdline.parse(args, argc-1, (const char**)(argv+1));
58+
59+
//Set up a params object with the required number of params
60+
ParamInterface *params;
61+
if(args.algorithm == "sstd"){
62+
SstdParam *p = new SstdParam;
63+
for(int i=0;i<args.nfuncs;i++){
64+
RunStats &r = (*p)[i];
65+
for(int j=0;j<100;j++)
66+
r.push(double(j));
67+
}
68+
params = p;
69+
}else if(args.algorithm == "hbos" || args.algorithm == "copod"){
70+
std::vector<unsigned int> counts(args.hbos_bins);
71+
for(int i=0;i<args.hbos_bins;i++) counts[i] = i;
72+
Histogram hd;
73+
hd.set_histogram(counts, 0.0001, args.hbos_bins, 0, 1);
74+
75+
if(args.algorithm == "hbos"){
76+
HbosParam *p = new HbosParam;
77+
for(int i=0;i<args.nfuncs;i++) (*p)[i].getHistogram() = hd;
78+
params = p;
79+
}else{
80+
CopodParam *p = new CopodParam;
81+
for(int i=0;i<args.nfuncs;i++) (*p)[i].getHistogram() = hd;
82+
params = p;
83+
}
84+
}else{
85+
fatal_error("Unknown AD algorithm");
86+
}
87+
88+
PerfTimer cyc_timer;
89+
PerfTimer timer;
90+
91+
//To make the benchmark as lightweight as possible, precompute the messages and send the same each cycle
92+
std::string params_msg = params->serialize();
93+
RunStats r;
94+
95+
for(int c=0;c<args.cycles;c++){
96+
cyc_timer.start();
97+
std::cout << c << std::endl;
98+
99+
timer.start();
100+
params->update(params_msg, false);
101+
r.push(timer.elapsed_ms());
102+
}//cycle loop
103+
std::cout << r.mean() << " " << r.stddev() << std::endl;
104+
105+
delete params;
106+
MPI_Finalize();
107+
108+
return 0;
109+
}

0 commit comments

Comments
 (0)