Skip to content

Commit 1da6f55

Browse files
committed
Minor improvements:
Head node logging now reports number of events of each type encountered in the step Periodic perf output now records accumulated number of events of each type, number of outliers detected and number of IO steps since last write ZMQNet send/recv now throw errors if a zmq_error was detected, and the error message is reported Perf filenames now include program index rather than just rank PerfPeriodic now uses map rather than unordered_map so that output columns order doesn't change due to rehashing Driver now returns exit code of 1 if an error was caught ZMQNet uses the error functionality rather than just throw Anomaly count written to perf output
1 parent 98d38ac commit 1da6f55

5 files changed

Lines changed: 79 additions & 39 deletions

File tree

app/driver.cpp

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void printHelp(){
101101
std::cout << "Usage: driver <Trace engine type> <Trace directory> <Trace file prefix> <Options>\n"
102102
<< "Where <Trace engine type> : BPFile or SST\n"
103103
<< " <Trace directory> : The directory in which the BPFile or SST file is located\n"
104-
<< " <Trace file prefix> : The prefix of the file eg \"tau-metrics-[mybinary]\"\n"
104+
<< " <Trace file prefix> : The prefix of the file (the trace file name without extension e.g. \"tau-metrics-mybinary\" for \"tau-metrics-mybinary.bp\")\n"
105105
<< " <Options> : Optional arguments as described below.\n";
106106
getOptionalArgsParser().help(std::cout);
107107
}
@@ -201,6 +201,8 @@ int main(int argc, char ** argv){
201201
verboseStream << "Driver rank " << params.rank << ": waiting at pre-run barrier" << std::endl;
202202
MPI_Barrier(MPI_COMM_WORLD);
203203

204+
bool error = false;
205+
204206
try
205207
{
206208
//Instantiate Chimbuko
@@ -276,23 +278,20 @@ int main(int argc, char ** argv){
276278
<< "Total function/comm events : " << total_n_func_events + total_n_comm_events
277279
<< std::endl;
278280
}
279-
catch (std::invalid_argument &e)
280-
{
281-
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : caught invalid argument:" << std::endl;
282-
std::cout << e.what() << std::endl;
283-
}
284-
catch (std::ios_base::failure &e)
285-
{
286-
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : I/O base exception caught\n";
287-
std::cout << e.what() << std::endl;
288-
}
289-
catch (std::exception &e)
290-
{
291-
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : Exception caught\n";
292-
std::cout << e.what() << std::endl;
293-
}
281+
catch (const std::invalid_argument &e){
282+
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : caught invalid argument: " << e.what() << std::endl;
283+
error = true;
284+
}
285+
catch (const std::ios_base::failure &e){
286+
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : I/O base exception caught: " << e.what() << std::endl;
287+
error = true;
288+
}
289+
catch (const std::exception &e){
290+
std::cout << '[' << getDateTime() << ", rank " << params.rank << "] Driver : Exception caught: " << e.what() << std::endl;
291+
error = true;
292+
}
294293

295294
MPI_Finalize();
296295
headProgressStream(params.rank) << "Driver is exiting" << std::endl;
297-
return 0;
296+
return error ? 1 : 0;
298297
}

include/chimbuko/util/PerfStats.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#ifdef _PERF_METRIC
44
#include <chrono>
55
#include <sstream>
6-
#include <unordered_map>
6+
#include <map>
77
#include "chimbuko/util/RunMetric.hpp"
88
#endif
99

@@ -103,7 +103,7 @@ namespace chimbuko{
103103
#ifdef _PERF_METRIC
104104
std::string m_outputpath;
105105
std::string m_filename;
106-
std::unordered_map<std::string, std::string> m_data;
106+
std::map<std::string, std::string> m_data;
107107
bool m_first_write; /**< Is this the first write?*/
108108
#endif
109109
public:

src/ad/ADNetClient.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ void ADNetClient::connect_ps(int rank, int srank, std::string sname) {
8484
msg.clear();
8585

8686
verboseStream << "ADNetClient waiting for handshake response from server" << std::endl;
87-
int ret = ZMQNet::recv(m_socket, strmsg);
88-
if(ret == -1){
89-
if(errno == EAGAIN){ recoverable_error("Connect error to parameter server: handshake timed out"); }
90-
else{ recoverable_error("Connect error to parameter server: receive returned an error"); }
87+
try{
88+
ZMQNet::recv(m_socket, strmsg);
89+
}catch(const std::exception &err){
90+
recoverable_error("Connect error to parameter server, ADNetClient failed to initialize");
9191
return;
9292
}
9393

@@ -166,8 +166,7 @@ std::string ADNetClient::send_and_receive(const Message &msg){
166166
ZMQNet::send(m_socket, send_msg);
167167

168168
//Receive global parameters from PS
169-
int ret = ZMQNet::recv(m_socket, recv_msg);
170-
if(ret == -1) throw std::runtime_error("ADNetClient::send_and_receive: receive call returned an error");
169+
ZMQNet::recv(m_socket, recv_msg);
171170
#endif
172171

173172
#ifdef _PERF_METRIC

src/chimbuko.cpp

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,9 @@ bool Chimbuko::parseInputStep(int &step,
320320
m_parser->endStep();
321321

322322
// count total number of events
323-
n_func_events += (unsigned long long)m_parser->getNumFuncData();
324-
n_comm_events += (unsigned long long)m_parser->getNumCommData();
325-
n_counter_events += (unsigned long long)m_parser->getNumCounterData();
323+
n_func_events = (unsigned long long)m_parser->getNumFuncData();
324+
n_comm_events = (unsigned long long)m_parser->getNumCommData();
325+
n_counter_events = (unsigned long long)m_parser->getNumCounterData();
326326

327327
//Parse the new metadata for any attributes we want to maintain
328328
m_metadata_parser->addData(m_parser->getNewMetaData());
@@ -458,10 +458,10 @@ void Chimbuko::run(unsigned long long& n_func_events,
458458
int step = m_parser->getCurrentStep(); //gives -1 as initial value. step+1 is the expected value of step in parseInputStep and is used as a (non-fatal) check
459459
unsigned long first_event_ts, last_event_ts; //earliest and latest timestamps in io frame
460460

461-
std::string ad_perf = "ad_perf_" + std::to_string(m_params.rank) + ".json";
461+
std::string ad_perf = stringize("ad_perf.%d.%d.json", m_params.program_idx, m_params.rank);
462462
m_perf.setWriteLocation(m_params.perf_outputpath, ad_perf);
463463

464-
std::string ad_perf_prd = "ad_perf_prd_" + std::to_string(m_params.rank) + ".log";
464+
std::string ad_perf_prd = stringize("ad_perf_prd.%d.%d.log", m_params.program_idx, m_params.rank);
465465
m_perf_prd.setWriteLocation(m_params.perf_outputpath, ad_perf_prd);
466466

467467
#if defined(_PERF_METRIC) && defined(ENABLE_PROVDB)
@@ -471,13 +471,28 @@ void Chimbuko::run(unsigned long long& n_func_events,
471471
#endif
472472

473473
PerfTimer step_timer, timer;
474+
475+
unsigned long long n_func_events_step, n_comm_events_step, n_counter_events_step; //event count in present step
476+
unsigned long long n_func_events_accum_prd = 0, n_comm_events_accum_prd = 0, n_counter_events_accum_prd = 0; //accumulated event counts since last write of periodic data
477+
unsigned long n_outliers_accum_prd = 0; //accumulated outliers detected since last write of periodic data
478+
int n_steps_accum_prd = 0; //number of steps since last write of periodic data
474479

475480
//Loop until we lose connection with the application
476-
while ( parseInputStep(step, n_func_events, n_comm_events, n_counter_events) ) {
481+
while ( parseInputStep(step, n_func_events_step, n_comm_events_step, n_counter_events_step) ) {
482+
//Increment total events
483+
n_func_events += n_func_events_step;
484+
n_comm_events += n_comm_events_step;
485+
n_counter_events += n_counter_events_step;
486+
487+
n_func_events_accum_prd += n_func_events_step;
488+
n_comm_events_accum_prd += n_comm_events_step;
489+
n_counter_events_accum_prd += n_counter_events_step;
490+
477491
//Decide whether to report step progress
478492
bool do_step_report = enableVerboseLogging() || (m_params.step_report_freq > 0 && step % m_params.step_report_freq == 0);
479493

480-
if(do_step_report){ headProgressStream(m_params.rank) << "driver rank " << m_params.rank << " starting analysis of step " << step << std::endl; }
494+
if(do_step_report){ headProgressStream(m_params.rank) << "driver rank " << m_params.rank << " starting analysis of step " << step
495+
<< ". Event count: func=" << n_func_events_step << " comm=" << n_comm_events_step << " counter=" << n_counter_events_step << std::endl; }
481496
step_timer.start();
482497

483498
//Extract counters and put into counter manager
@@ -494,9 +509,13 @@ void Chimbuko::run(unsigned long long& n_func_events,
494509
timer.start();
495510
Anomalies anomalies = m_outlier->run(step);
496511
m_perf.add("ad_run_anom_detection_time_ms", timer.elapsed_ms());
512+
m_perf.add("ad_run_anomaly_count", anomalies.nEvents(Anomalies::EventType::Outlier));
497513

498-
n_outliers += anomalies.nEvents(Anomalies::EventType::Outlier);
514+
int nout = anomalies.nEvents(Anomalies::EventType::Outlier);
515+
n_outliers += nout;
516+
n_outliers_accum_prd += nout;
499517
frames++;
518+
n_steps_accum_prd++;
500519

501520
//Generate anomaly provenance for detected anomalies and send to DB
502521
timer.start();
@@ -542,6 +561,23 @@ void Chimbuko::run(unsigned long long& n_func_events,
542561
size_t total, resident;
543562
getMemUsage(total, resident);
544563
m_perf_prd.add("ad_mem_usage_kB", resident);
564+
565+
m_perf_prd.add("io_steps", n_steps_accum_prd);
566+
567+
//Write accumulated outlier count
568+
m_perf_prd.add("outlier_count", n_outliers_accum_prd);
569+
570+
//Write accumulated event counts
571+
m_perf_prd.add("event_count_func", n_func_events_accum_prd);
572+
m_perf_prd.add("event_count_comm", n_comm_events_accum_prd);
573+
m_perf_prd.add("event_count_counter", n_counter_events_accum_prd);
574+
575+
//Reset the counts
576+
n_func_events_accum_prd = 0;
577+
n_comm_events_accum_prd = 0;
578+
n_counter_events_accum_prd = 0;
579+
n_outliers_accum_prd = 0;
580+
n_steps_accum_prd = 0;
545581

546582
//These only write if both filename and output path is set
547583
m_perf_prd.write();

src/net/zmq_net.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class NetPayloadClientDisconnectWithCount: public NetPayloadBase{
5656
response.set_msg("", false);
5757
std::lock_guard<std::mutex> _(m);
5858
clients--;
59-
if(clients < 0) throw std::runtime_error("ZMQNet registered clients < 0! Likely a client did not perform a handshake");
59+
if(clients < 0) fatal_error("ZMQNet registered clients < 0! Likely a client did not perform a handshake");
6060

6161
verboseStream << "ZMQNet received disconnect notification, #clients is now " << clients << std::endl;
6262
};
@@ -77,11 +77,11 @@ void ZMQNet::init(int* /*argc*/, char*** /*argv*/, int nt)
7777
{
7878
m_context = zmq_ctx_new();
7979
if(zmq_ctx_set(m_context, ZMQ_IO_THREADS, m_io_threads) != 0)
80-
throw std::runtime_error("ZMQNet::init couldn't set number of io threads to requested amount");
80+
fatal_error("ZMQNet::init couldn't set number of io threads to requested amount");
8181

8282
//Check worker_idx = 0 for all payloads
8383
for(auto const &e : m_payloads)
84-
if(e.first != 0) throw std::runtime_error("ZMQNet all payloads must have worker_idx=0");
84+
if(e.first != 0) fatal_error("ZMQNet all payloads must have worker_idx=0");
8585

8686
init_thread_pool(nt);
8787
}
@@ -134,9 +134,9 @@ void doWork(void* context,
134134

135135
timer.start();
136136
auto kit = payloads.find((MessageKind)msg.kind());
137-
if(kit == payloads.end()) throw std::runtime_error("ZMQNet::doWork : No payload associated with the message kind provided. Message: " + strmsg + " (did you add the payload to the server?)");
137+
if(kit == payloads.end()) fatal_error("ZMQNet::doWork : No payload associated with the message kind provided. Message: " + strmsg + " (did you add the payload to the server?)");
138138
auto pit = kit->second.find((MessageType)msg.type());
139-
if(pit == kit->second.end()) throw std::runtime_error("ZMQNet::doWork : No payload associated with the message type provided. Mesage: " + strmsg + " (did you add the payload to the server?)");
139+
if(pit == kit->second.end()) fatal_error("ZMQNet::doWork : No payload associated with the message type provided. Mesage: " + strmsg + " (did you add the payload to the server?)");
140140
perf.add(perf_prefix + "message_kind_type_lookup_ms", timer.elapsed_ms());
141141

142142
//Apply the payload
@@ -394,7 +394,7 @@ int ZMQNet::recvAndSend(void* skFrom, void* skTo, int max_msg){
394394
if(len == -1 && errno == EAGAIN){ //no more messages on queue
395395
zmq_msg_close(&msg);
396396
return msg_count;
397-
}else if(len == -1) throw std::runtime_error("ZMQNet::recvAndSend error receiving: " + std::string(zmq_strerror(errno)));
397+
}else if(len == -1) fatal_error("ZMQNet::recvAndSend error receiving: " + std::string(zmq_strerror(errno)));
398398
int more;
399399
zmq_getsockopt(skFrom, ZMQ_RCVMORE, &more, &more_size); //check whether message is part of a multi-part message
400400

@@ -429,6 +429,9 @@ int ZMQNet::send(void* socket, const std::string& strmsg)
429429
memcpy(zmq_msg_data(&msg), (const void*)strmsg.data(), strmsg.size());
430430
ret = zmq_msg_send(&msg, socket, 0);
431431
zmq_msg_close(&msg);
432+
433+
if(ret == -1) fatal_error("Send failed with error: " + std::string(zmq_strerror(errno)) );
434+
432435
return ret;
433436
}
434437

@@ -440,6 +443,9 @@ int ZMQNet::recv(void* socket, std::string& strmsg)
440443
ret = zmq_msg_recv(&msg, socket, 0);
441444
if(ret != -1) strmsg.assign((char*)zmq_msg_data(&msg), ret);
442445
zmq_msg_close(&msg);
446+
447+
if(ret == -1) fatal_error("Receive failed with error: " + std::string(zmq_strerror(errno)) );
448+
443449
return ret;
444450
}
445451
#endif

0 commit comments

Comments
 (0)