Skip to content

Commit b8d0930

Browse files
authored
Merge pull request #25 from CODARcode/sm_release
Copod algorithm
2 parents 8614909 + aa7aa49 commit b8d0930

186 files changed

Lines changed: 6166 additions & 1157 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

app/bpfile_replay.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#include <mpi.h>
21
#include <adios2.h>
32
#include <string>
43
#include <cassert>
@@ -22,7 +21,6 @@ int main(int argc, char** argv){
2221
enableVerboseLogging() = true;
2322
}
2423

25-
MPI_Init(&argc, &argv);
2624
if(argc < 3){
2725
std::cout << "Usage: bpfile_replay <input BPfile filename> <output step freq (ms)> <options>\n"
2826
<< "Output will be on SST under the same filename ${input BPfile filename}. The temporary .sst file created will thus be ${input BPfile filename}.sst\n"
@@ -77,7 +75,7 @@ int main(int argc, char** argv){
7775
}
7876

7977

80-
adios2::ADIOS ad = adios2::ADIOS(MPI_COMM_SELF, adios2::DebugON);
78+
adios2::ADIOS ad = adios2::ADIOS(adios2::DebugON);
8179

8280
adios2::IO io_out = ad.DeclareIO("writer");
8381

@@ -217,6 +215,5 @@ int main(int argc, char** argv){
217215
std::cout << "Shutting down" << std::endl;
218216
wr.Close();
219217

220-
MPI_Finalize();
221218
return 0;
222219
};

app/driver.cpp

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
#include<chimbuko_config.h>
2+
#ifdef USE_MPI
3+
#include<mpi.h>
4+
#endif
15
#include "chimbuko/chimbuko.hpp"
26
#include "chimbuko/verbose.hpp"
37
#include "chimbuko/util/string.hpp"
@@ -89,7 +93,15 @@ optionalArgsParser & getOptionalArgsParser(){
8993
addOptionalCommandLineArg(p, trace_connect_timeout, "(For SST mode) Set the timeout in seconds on the connection to the TAU-instrumented binary (default 60s)");
9094
addOptionalCommandLineArg(p, parser_beginstep_timeout, "Set the timeout in seconds on waiting for the next ADIOS2 timestep (default 30s)");
9195

92-
addOptionalCommandLineArg(p, rank, "Set the rank index of the trace data. Used for verification unless override_rank is set. A value < 0 signals the value to be equal to the MPI rank of Chimbuko driver (default)");
96+
addOptionalCommandLineArg(p, rank,
97+
#ifdef USE_MPI
98+
"Set the rank index of the trace data. Used for verification unless override_rank is set. A value < 0 signals the value to be equal to the MPI rank of Chimbuko driver (default)"
99+
#else
100+
"Set the rank index of the trace data (default 0)"
101+
#endif
102+
);
103+
104+
93105
p.addOptionalArg(new overrideRankArg); //-override_rank <idx>
94106
p.addOptionalArg(new setLoggingHeadRankArg); //-logging_head_rank <rank>
95107

@@ -110,7 +122,11 @@ void printHelp(){
110122
getOptionalArgsParser().help(std::cout);
111123
}
112124

113-
ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_world_rank){
125+
ChimbukoParams getParamsFromCommandLine(int argc, char** argv
126+
#ifdef USE_MPI
127+
, const int mpi_world_rank
128+
#endif
129+
){
114130
if(argc < 4){
115131
std::cerr << "Expected at least 4 arguments: <exe> <BPFile/SST> <.bp location> <bp file prefix>" << std::endl;
116132
exit(-1);
@@ -153,8 +169,13 @@ ChimbukoParams getParamsFromCommandLine(int argc, char** argv, const int mpi_wor
153169

154170
//By default assign the rank index of the trace data as the MPI rank of the AD process
155171
//Allow override by user
156-
if(params.rank < 0)
172+
if(params.rank < 0){
173+
#ifdef USE_MPI
157174
params.rank = mpi_world_rank;
175+
#else
176+
params.rank = 0; //default to 0 for non-MPI applications
177+
#endif
178+
}
158179

159180
params.verbose = params.rank == 0; //head node produces verbose output
160181

@@ -187,26 +208,35 @@ int main(int argc, char ** argv){
187208
return 0;
188209
}
189210

211+
#ifdef USE_MPI
190212
assert( MPI_Init(&argc, &argv) == MPI_SUCCESS );
191213

192214
int mpi_world_rank, mpi_world_size;
193215
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_world_rank);
194216
MPI_Comm_size(MPI_COMM_WORLD, &mpi_world_size);
217+
#endif
195218

196219
//Parse environment variables
197220
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE"))
198221
enableVerboseLogging() = true;
199222

200223
//Parse Chimbuko parameters
201-
ChimbukoParams params = getParamsFromCommandLine(argc, argv, mpi_world_rank);
224+
ChimbukoParams params = getParamsFromCommandLine(argc, argv
225+
#ifdef USE_MPI
226+
, mpi_world_rank
227+
#endif
228+
);
229+
202230
if(params.rank == progressHeadRank()) params.print();
203231

204-
if(enableVerboseLogging())
232+
if(enableVerboseLogging()){
205233
headProgressStream(params.rank) << "Driver rank " << params.rank << ": Enabling verbose debug output" << std::endl;
206-
234+
}
207235

208236
verboseStream << "Driver rank " << params.rank << ": waiting at pre-run barrier" << std::endl;
237+
#ifdef USE_MPI
209238
MPI_Barrier(MPI_COMM_WORLD);
239+
#endif
210240

211241
bool error = false;
212242

@@ -248,6 +278,7 @@ int main(int argc, char ** argv){
248278
// -----------------------------------------------------------------------
249279
// Average analysis time and total number of outliers
250280
// -----------------------------------------------------------------------
281+
#ifdef USE_MPI
251282
verboseStream << "Driver rank " << params.rank << ": waiting at post-run barrier" << std::endl;
252283
MPI_Barrier(MPI_COMM_WORLD);
253284
processing_time = duration_cast<milliseconds>(t2 - t1).count();
@@ -274,6 +305,19 @@ int main(int argc, char ** argv){
274305
total_n_comm_events = global_measures[1];
275306
total_n_counter_events = global_measures[2];
276307
}
308+
#else
309+
//Without MPI only report local parameters. In principle we could aggregate using the Pserver if we really want the global information
310+
total_processing_time = processing_time;
311+
total_n_outliers = n_outliers;
312+
total_frames = frames;
313+
314+
total_n_func_events = n_func_events;
315+
total_n_comm_events = n_comm_events;
316+
total_n_counter_events = n_counter_events;
317+
318+
int mpi_world_size = 1;
319+
#endif
320+
277321

278322
headProgressStream(params.rank) << "Driver rank " << params.rank << ": Final report\n"
279323
<< "Avg. num. frames over MPI ranks : " << (double)total_frames/(double)mpi_world_size << "\n"
@@ -301,7 +345,9 @@ int main(int argc, char ** argv){
301345
error = true;
302346
}
303347

348+
#ifdef USE_MPI
304349
MPI_Finalize();
350+
#endif
305351
headProgressStream(params.rank) << "Driver is exiting" << std::endl;
306352
return error ? 1 : 0;
307353
}

app/hpserver.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//The hierarchical parameter server main program. This program collects statistics from the node-instances of the anomaly detector
2+
#include <chimbuko_config.h>
23
#ifndef _USE_ZMQNET
34
#include<iostream>
45
#warning "Hierarchical parameter server requires ZMQNet"
@@ -12,7 +13,9 @@ int main(void){
1213

1314
#include <chimbuko/pserver.hpp>
1415
#include <chimbuko/net/zmqme_net.hpp>
16+
#ifdef USE_MPI
1517
#include <mpi.h>
18+
#endif
1619
#include <chimbuko/param/sstd_param.hpp>
1720
#include <chimbuko/util/commandLineParser.hpp>
1821
#include <chimbuko/verbose.hpp>
@@ -103,7 +106,10 @@ int main (int argc, char ** argv){
103106
}
104107

105108
ZMQMENet net;
109+
#ifdef USE_MPI
106110
MPI_Init(&argc, &argv);
111+
#endif
112+
107113
PSstatSender stat_sender(args.stat_send_freq);
108114

109115
try {
@@ -169,7 +175,7 @@ int main (int argc, char ** argv){
169175

170176
std::cout << "HPserver finalizing the network" << std::endl;
171177
net.finalize();
172-
#ifdef _USE_ZMQNET
178+
#if defined(_USE_ZMQNET) && defined(USE_MPI)
173179
MPI_Finalize();
174180
#endif
175181

app/pclient.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//A test application that mocks part of the anomaly detection modules, acting as a client for the parameter server and sending it anomaly information
2+
#include <chimbuko_config.h>
23

34
#include "chimbuko/param/sstd_param.hpp"
45
#include "chimbuko/message.hpp"
@@ -7,6 +8,9 @@
78
#include "chimbuko/net/mpi_net.hpp"
89
#else
910
#include "chimbuko/net/zmq_net.hpp"
11+
#endif
12+
13+
#ifdef USE_MPI
1014
#include <mpi.h>
1115
#endif
1216

@@ -18,11 +22,13 @@ using namespace chimbuko;
1822

1923
int main (int argc, char** argv)
2024
{
21-
int size, rank;
25+
int size=1, rank=0;
2226

27+
#ifdef USE_MPI
2328
MPI_Init(&argc, &argv);
2429
MPI_Comm_size(MPI_COMM_WORLD, &size);
2530
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
31+
#endif
2632

2733
#ifdef _USE_MPINET
2834
int count;
@@ -114,7 +120,9 @@ s */
114120
//g_param.assign(msg.data_buffer());
115121
}
116122

123+
#ifdef USE_MPI
117124
MPI_Barrier(MPI_COMM_WORLD);
125+
#endif
118126

119127
#ifdef _USE_MPINET
120128
if (rank == 0) {
@@ -159,6 +167,8 @@ s */
159167
zmq_ctx_term(context);
160168
#endif
161169

170+
#ifdef USE_MPI
162171
MPI_Finalize();
172+
#endif
163173
return EXIT_SUCCESS;
164174
}

app/pclient_stats.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
//A test application that mocks part of the anomaly detection modules, acting as a client for the parameter server and sending it function statistics information
2+
#include <chimbuko_config.h>
23

34
#include "chimbuko/param/sstd_param.hpp"
45
#include "chimbuko/message.hpp"
@@ -9,6 +10,9 @@
910
#include "chimbuko/net/mpi_net.hpp"
1011
#else
1112
#include "chimbuko/net/zmq_net.hpp"
13+
#endif
14+
15+
#ifdef USE_MPI
1216
#include <mpi.h>
1317
#endif
1418

@@ -21,8 +25,9 @@ using namespace chimbuko;
2125
int main (int argc, char** argv)
2226
{
2327
const int N_MPI_PROCESSORS = 10;
24-
int size, rank;
28+
int size=1, rank=0;
2529

30+
#ifdef USE_MPI
2631
MPI_Init(&argc, &argv);
2732
MPI_Comm_size(MPI_COMM_WORLD, &size);
2833
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
@@ -33,6 +38,7 @@ int main (int argc, char** argv)
3338
MPI_Finalize();
3439
return EXIT_SUCCESS;
3540
}
41+
#endif
3642

3743
#ifdef _USE_MPINET
3844
throw std::runtime_error("Not implemented yet.");
@@ -103,8 +109,10 @@ int main (int argc, char** argv)
103109
std::this_thread::sleep_for(std::chrono::milliseconds(10));
104110
}
105111

112+
#ifdef USE_MPI
106113
MPI_Barrier(MPI_COMM_WORLD);
107-
114+
#endif
115+
108116
// terminate parameter server
109117
#ifdef _USE_MPINET
110118
throw std::runtime_error("Not implemented yet.");
@@ -127,6 +135,8 @@ int main (int argc, char** argv)
127135
zmq_ctx_term(context);
128136
#endif
129137

138+
#ifdef USE_MPI
130139
MPI_Finalize();
140+
#endif
131141
return EXIT_SUCCESS;
132142
}

app/provdb_admin.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,12 @@ int main(int argc, char** argv) {
300300
//Spin quietly until SIGTERM sent
301301
signal(SIGTERM, termSignalHandler);
302302
progressStream << "ProvDB Admin: main thread waiting for completion" << std::endl;
303+
304+
size_t iter = 0;
303305
while(!stop_wait_loop) { //stop wait loop will be set by SIGTERM handler
304306
tl::thread::sleep(engine, 1000); //Thallium engine sleeps but listens for rpc requests
305-
307+
if(iter % 20 == 0){ verboseStream << "ProvDB Admin heartbeat" << std::endl; }
308+
306309
unsigned long commit_timer_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - commit_timer_start).count();
307310
if(args.db_commit_freq > 0 && commit_timer_ms >= args.db_commit_freq){
308311
verboseStream << "ProvDB Admin: committing database to disk" << std::endl;
@@ -331,7 +334,9 @@ int main(int argc, char** argv) {
331334
#endif
332335
break;
333336
}
334-
}
337+
338+
++iter;
339+
}//wait loop
335340
}//client scope
336341

337342
//If the pserver didn't connect (it is optional), delete the empty database

app/pserver.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <chimbuko/pserver.hpp>
55

66
#ifdef _USE_MPINET
7-
#include <mpi.h>
87
#include <chimbuko/net/mpi_net.hpp>
98
#else
109
#include <chimbuko/net/zmq_net.hpp>
@@ -125,6 +124,7 @@ int main (int argc, char ** argv){
125124
}
126125
GlobalAnomalyStats global_func_stats; //global anomaly statistics
127126
GlobalCounterStats global_counter_stats; //global counter statistics
127+
GlobalAnomalyMetrics global_anom_metrics; //global anomaly metrics
128128
PSglobalFunctionIndexMap global_func_index_map; //mapping of function name to global index
129129

130130
//Optionally load previously-computed AD algorithm statistics
@@ -187,14 +187,14 @@ int main (int argc, char ** argv){
187187

188188
net.add_payload(new NetPayloadUpdateParams(param, args.freeze_params));
189189
net.add_payload(new NetPayloadGetParams(param));
190-
net.add_payload(new NetPayloadUpdateAnomalyStats(&global_func_stats));
191-
net.add_payload(new NetPayloadUpdateCounterStats(&global_counter_stats));
190+
net.add_payload(new NetPayloadRecvCombinedADdata(&global_func_stats, &global_counter_stats, &global_anom_metrics));
192191
net.add_payload(new NetPayloadGlobalFunctionIndexMapBatched(&global_func_index_map));
193192
net.init(nullptr, nullptr, args.nt);
194193

195194
//Start sending anomaly statistics to viz
196195
stat_sender.add_payload(new PSstatSenderGlobalAnomalyStatsPayload(&global_func_stats));
197196
stat_sender.add_payload(new PSstatSenderGlobalCounterStatsPayload(&global_counter_stats));
197+
stat_sender.add_payload(new PSstatSenderGlobalAnomalyMetricsPayload(&global_anom_metrics));
198198
stat_sender.run_stat_sender(args.ws_addr, args.stat_outputdir);
199199

200200
//Register a signal handler that prevents the application from exiting on SIGTERM; instead this signal will be handled by ZeroMQ and will cause the pserver to shutdown gracefully
@@ -277,5 +277,8 @@ int main (int argc, char ** argv){
277277
}
278278

279279
progressStream << "Pserver: finished" << std::endl;
280+
281+
delete param;
282+
280283
return 0;
281284
}

0 commit comments

Comments
 (0)