Skip to content

Commit 22c59d7

Browse files
committed
Stats sent from AD to pserver can now be aggregated over multiple steps before being sent + unit test
Added driver option -ps_send_stats_freq to control the above frequency
1 parent 1df773f commit 22c59d7

10 files changed

Lines changed: 327 additions & 27 deletions

File tree

app/driver.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ optionalArgsParser & getOptionalArgsParser(){
112112

113113
addOptionalCommandLineArg(p, outlier_statistic, "Set the statistic used for outlier detection. Options: exclusive_runtime (default), inclusive_runtime");
114114
addOptionalCommandLineArg(p, global_model_sync_freq, "Set the frequency in steps between updates of the global model (default 1)");
115+
addOptionalCommandLineArg(p, ps_send_stats_freq, "Set how often in steps the statistics data is uploaded to the pserver (default 1)");
115116
addOptionalCommandLineArg(p, step_report_freq, "Set the steps between Chimbuko reporting IO step progress. Use 0 to deactivate this logging entirely (default 1)");
116117
addOptionalCommandLineArg(p, prov_record_startstep, "If != -1, the IO step on which to start recording provenance information for anomalies (for testing, default -1)");
117118
addOptionalCommandLineArg(p, prov_record_stopstep, "If != -1, the IO step on which to stop recording provenance information for anomalies (for testing, default -1)");

app/pserver.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ int main (int argc, char ** argv){
194194
for(int i=0;i<args.nt;i++){
195195
net.add_payload(new NetPayloadUpdateParamManager(&param, i, args.freeze_params), i);
196196
net.add_payload(new NetPayloadGetParamsFromManager(&param),i);
197-
net.add_payload(new NetPayloadRecvCombinedADdata(&global_func_stats[i], &global_counter_stats[i], &global_anom_metrics[i]),i); //each worker thread writes to a separate stats object which are aggregated only at viz send time
197+
net.add_payload(new NetPayloadRecvCombinedADdataArray(&global_func_stats[i], &global_counter_stats[i], &global_anom_metrics[i]),i); //each worker thread writes to a separate stats object which are aggregated only at viz send time
198198
net.add_payload(new NetPayloadGlobalFunctionIndexMapBatched(&global_func_index_map),i);
199199
net.add_payload(new NetPayloadPing,i);
200200
}

include/chimbuko/ad/ADLocalCounterStatistics.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ namespace chimbuko{
2424
const std::unordered_set<std::string> *which_counters, PerfStats *perf = nullptr):
2525
m_program_idx(program_idx), m_step(step), m_which_counter(which_counters), m_perf(perf)
2626
{}
27-
27+
28+
ADLocalCounterStatistics(): m_program_idx(0), m_step(-1), m_which_counter(nullptr), m_perf(nullptr){}
29+
2830
/**
2931
* @brief Add counters to internal statistics
3032
*/

include/chimbuko/ad/ADcombinedPSdata.hpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,70 @@ namespace chimbuko{
7070
};
7171

7272

73+
74+
/**
75+
* @brief A class that combines all data that is sent to the PS collected over multiple steps into a single send transmission
76+
*/
77+
class ADcombinedPSdataArray{
78+
public:
79+
ADcombinedPSdataArray(std::vector<ADLocalFuncStatistics> &func_stats,
80+
std::vector<ADLocalCounterStatistics> &counter_stats,
81+
std::vector<ADLocalAnomalyMetrics> &anom_metrics,
82+
PerfStats* perf=nullptr):
83+
m_func_stats(func_stats), m_counter_stats(counter_stats), m_anom_metrics(anom_metrics), m_perf(perf){}
84+
85+
std::vector<ADLocalCounterStatistics> &get_counter_stats(){ return m_counter_stats; }
86+
const std::vector<ADLocalCounterStatistics> &get_counter_stats() const{ return m_counter_stats; }
87+
88+
std::vector<ADLocalFuncStatistics> &get_func_stats(){ return m_func_stats; }
89+
const std::vector<ADLocalFuncStatistics> &get_func_stats() const{ return m_func_stats; }
90+
91+
std::vector<ADLocalAnomalyMetrics> &get_anom_metrics(){ return m_anom_metrics; }
92+
const std::vector<ADLocalAnomalyMetrics> &get_anom_metrics() const{ return m_anom_metrics; }
93+
94+
/**
95+
* @brief Serialize using cereal
96+
*/
97+
template<class Archive>
98+
void serialize(Archive & archive){
99+
archive(m_func_stats , m_counter_stats, m_anom_metrics);
100+
}
101+
102+
/**
103+
* Serialize into Cereal portable binary format
104+
*/
105+
std::string serialize_cerealpb() const;
106+
107+
/**
108+
* Serialize from Cereal portable binary format
109+
*/
110+
void deserialize_cerealpb(const std::string &strstate);
111+
112+
/**
113+
* @brief Serialize this class for communication over the network
114+
*/
115+
std::string net_serialize() const;
116+
117+
/**
118+
* @brief Unserialize this class after communication over the network
119+
*/
120+
void net_deserialize(const std::string &s);
121+
122+
123+
/**
124+
* @brief Send the data to the pserver
125+
* @param net_client The network client object
126+
* @return std::pair<size_t, size_t> [sent, recv] message size
127+
*/
128+
std::pair<size_t, size_t> send(ADNetClient &net_client) const;
129+
130+
131+
private:
132+
std::vector<ADLocalFuncStatistics> &m_func_stats;
133+
std::vector<ADLocalCounterStatistics> &m_counter_stats;
134+
std::vector<ADLocalAnomalyMetrics> &m_anom_metrics;
135+
PerfStats* m_perf;
136+
};
137+
138+
73139
}

include/chimbuko/chimbuko.hpp

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ namespace chimbuko {
3939
< If no parameter server is in use, this string should be empty (length zero)
4040
< If using ZmqNet (default) this is a tcp address of the form "tcp://${ADDRESS}:${PORT}"
4141
*/
42+
int ps_send_stats_freq; /**< How often in steps the statistics data is uploaded to the pserver (default 1) */
43+
4244
int hpserver_nthr; /**< If using the hierarchical pserver, this parameter is used to compute a port offset for the particular endpoint that this AD rank connects to */
4345

4446
std::string prov_outputpath; /**< Directory where provenance data is written (in conjunction with provDB if active). Blank string indicates no output*/
@@ -260,14 +262,18 @@ namespace chimbuko {
260262
*/
261263
void sendProvenance(const int step, bool force = false);
262264

265+
/**
266+
* @brief Gather and buffer the required statistics data for the pserver
267+
*/
268+
void gatherPSdata(const Anomalies &anomalies,
269+
const int step,
270+
const unsigned long first_event_ts,
271+
const unsigned long last_event_ts);
263272

264273
/**
265-
* @brief Gather and send the required data to the pserver
274+
* @brief Send buffered pserver statistics data when step matches frequency or force==true
266275
*/
267-
void gatherAndSendPSdata(const Anomalies &anomalies,
268-
const int step,
269-
const unsigned long first_event_ts,
270-
const unsigned long last_event_ts) const;
276+
void sendPSdata(const int step, bool force = false);
271277

272278
/**
273279
* @brief Send new metadata entries collected during current fram to provenance DB
@@ -309,6 +315,10 @@ namespace chimbuko {
309315
std::vector<nlohmann::json> m_normalevent_prov_buf; /**<Buffered normal event provenance data waiting to be sent*/
310316

311317
std::set<unsigned long> m_exec_ignore_counters; /**< Counter indices in this list are ignored by the event manager (but will still be picked up by other components)*/
318+
319+
std::vector<ADLocalFuncStatistics> m_funcstats_buf; /**< Buffered function statistics data waiting to be sent*/
320+
std::vector<ADLocalCounterStatistics> m_countstats_buf; /**< Buffered counter statistics data waiting to be sent*/
321+
std::vector<ADLocalAnomalyMetrics> m_anom_metrics_buf; /**< Buffered anomaly metrics data waiting to be sent*/
312322
};
313323

314324
}

include/chimbuko/pserver/NetPayloadRecvCombinedADdata.hpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace chimbuko{
88

9-
/**
9+
/**
1010
* @brief Net payload for receiving the combined data payload from the AD
1111
*/
1212
class NetPayloadRecvCombinedADdata: public NetPayloadBase{
@@ -23,6 +23,23 @@ namespace chimbuko{
2323
void action(Message &response, const Message &message) override;
2424
};
2525

26+
/**
27+
* @brief Net payload for receiving the combined data payload over multiple steps from the AD
28+
*/
29+
class NetPayloadRecvCombinedADdataArray: public NetPayloadBase{
30+
GlobalAnomalyStats * m_global_anom_stats;
31+
GlobalCounterStats * m_global_counter_stats;
32+
GlobalAnomalyMetrics * m_global_anom_metrics;
33+
public:
34+
NetPayloadRecvCombinedADdataArray(GlobalAnomalyStats * global_anom_stats, GlobalCounterStats * global_counter_stats,
35+
GlobalAnomalyMetrics * global_anom_metrics): m_global_anom_stats(global_anom_stats),
36+
m_global_counter_stats(global_counter_stats),
37+
m_global_anom_metrics(global_anom_metrics){}
38+
MessageKind kind() const override{ return MessageKind::AD_PS_COMBINED_STATS; }
39+
MessageType type() const override{ return MessageType::REQ_ADD; }
40+
void action(Message &response, const Message &message) override;
41+
};
42+
2643

2744

2845

src/ad/ADcombinedPSdata.cpp

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,63 @@ std::pair<size_t, size_t> ADcombinedPSdata::send(ADNetClient &net_client) const{
4040

4141
if(m_perf != nullptr){
4242
m_perf->add("ps_combine_send_update_ms", timer.elapsed_ms());
43-
m_perf->add("ps_combine_send_sent_MB", (double)sent_sz / 1000000.0); // MB
44-
m_perf->add("ps_combine_send_recv_MB", (double)recv_sz / 1000000.0); // MB
43+
m_perf->add("ps_combine_send_sent_MB", (double)sent_sz / 1024./1024.); // MB
44+
m_perf->add("ps_combine_send_recv_MB", (double)recv_sz / 1024./1024.); // MB
45+
}
46+
47+
return std::make_pair(sent_sz, recv_sz);
48+
}
49+
50+
51+
52+
53+
54+
std::string ADcombinedPSdataArray::serialize_cerealpb() const{
55+
return cereal_serialize(*this);
56+
}
57+
58+
void ADcombinedPSdataArray::deserialize_cerealpb(const std::string &strstate){
59+
cereal_deserialize(*this, strstate);
60+
}
61+
62+
std::string ADcombinedPSdataArray::net_serialize() const{
63+
return serialize_cerealpb();
64+
}
65+
66+
void ADcombinedPSdataArray::net_deserialize(const std::string &s){
67+
deserialize_cerealpb(s);
68+
}
69+
70+
std::pair<size_t, size_t> ADcombinedPSdataArray::send(ADNetClient &net_client) const{
71+
PerfTimer timer;
72+
timer.start();
73+
74+
if (!net_client.use_ps())
75+
return std::make_pair(0, 0);
76+
77+
size_t N = m_func_stats.size();
78+
if(m_counter_stats.size() != N || m_anom_metrics.size() != N) fatal_error("Expect arrays to have the same size");
79+
80+
int latest_step = -1;
81+
for(size_t i=0;i<N;i++){
82+
int step = m_func_stats[i].getAnomalyData().get_step();
83+
if(m_counter_stats[i].getIOstep() != step) fatal_error("Step value mismatch between counter stats and func anomaly stats");
84+
if(m_anom_metrics[i].get_step() != step) fatal_error("Step value mismatch between anomaly metrics and func anomaly stats");
85+
latest_step = std::max(step, latest_step);
86+
}
87+
88+
Message msg;
89+
msg.set_info(net_client.get_client_rank(), net_client.get_server_rank(), MessageType::REQ_ADD, MessageKind::AD_PS_COMBINED_STATS, latest_step);
90+
msg.set_msg(net_serialize());
91+
92+
size_t sent_sz = msg.size();
93+
net_client.async_send(msg);
94+
size_t recv_sz =0;
95+
96+
if(m_perf != nullptr){
97+
m_perf->add("ps_combine_send_update_ms", timer.elapsed_ms());
98+
m_perf->add("ps_combine_send_sent_MB", (double)sent_sz / 1024./1024.); // MB
99+
m_perf->add("ps_combine_send_recv_MB", (double)recv_sz / 1024./1024.); // MB
45100
}
46101

47102
return std::make_pair(sent_sz, recv_sz);

src/chimbuko.cpp

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ ChimbukoParams::ChimbukoParams(): rank(-1234), //not set!
4040
monitoring_counter_prefix(""),
4141
prov_min_anom_time(0),
4242
prov_io_freq(1),
43-
global_model_sync_freq(1)
43+
global_model_sync_freq(1),
44+
ps_send_stats_freq(1)
4445
{}
4546

4647
void ChimbukoParams::print() const{
@@ -599,39 +600,52 @@ void Chimbuko::sendNewMetadataToProvDB(int step) const{
599600
}
600601
}
601602

602-
void Chimbuko::gatherAndSendPSdata(const Anomalies &anomalies,
603-
const int step,
604-
const unsigned long first_event_ts,
605-
const unsigned long last_event_ts) const{
603+
void Chimbuko::gatherPSdata(const Anomalies &anomalies,
604+
const int step,
605+
const unsigned long first_event_ts,
606+
const unsigned long last_event_ts){
606607
if(m_net_client && m_net_client->use_ps()){
607608
PerfTimer timer;
608609

609610
//Gather function profile and anomaly statistics
610611
timer.start();
611-
ADLocalFuncStatistics prof_stats(m_params.program_idx, m_params.rank, step, &m_perf);
612-
prof_stats.gatherStatistics(m_event->getExecDataMap());
613-
prof_stats.gatherAnomalies(anomalies);
614-
m_perf.add("ad_gather_send_ps_data_gather_profile_stats_time_ms", timer.elapsed_ms());
612+
ADLocalFuncStatistics func_stats(m_params.program_idx, m_params.rank, step, &m_perf);
613+
func_stats.gatherStatistics(m_event->getExecDataMap());
614+
func_stats.gatherAnomalies(anomalies);
615+
m_funcstats_buf.emplace_back(std::move(func_stats));
616+
m_perf.add("ad_gather_ps_data_gather_profile_stats_time_ms", timer.elapsed_ms());
615617

616618
//Gather counter statistics
617619
timer.start();
618620
ADLocalCounterStatistics count_stats(m_params.program_idx, step, nullptr, &m_perf); //currently collect all counters
619621
count_stats.gatherStatistics(m_counter->getCountersByIndex());
620-
m_perf.add("ad_gather_send_ps_data_gather_counter_stats_time_ms", timer.elapsed_ms());
622+
m_countstats_buf.emplace_back(std::move(count_stats));
623+
m_perf.add("ad_gather_data_gather_counter_stats_time_ms", timer.elapsed_ms());
621624

622625
//Gather anomaly metrics
623626
timer.start();
624627
ADLocalAnomalyMetrics metrics(m_params.program_idx, m_params.rank, step, first_event_ts, last_event_ts, anomalies);
625-
m_perf.add("ad_gather_send_ps_data_gather_metrics_time_ms", timer.elapsed_ms());
628+
m_anom_metrics_buf.emplace_back(std::move(metrics));
629+
m_perf.add("ad_gather_ps_data_gather_metrics_time_ms", timer.elapsed_ms());
626630

631+
m_perf.add("ad_gather_ps_data_total_time_ms", timer.elapsed_ms());
632+
}
633+
}
634+
635+
void Chimbuko::sendPSdata(const int step, bool force){
636+
if(m_net_client && m_net_client->use_ps() && m_funcstats_buf.size() && (step % m_params.ps_send_stats_freq == 0 || force) ){
627637
//Send the data in a single communication
638+
PerfTimer timer;
628639
timer.start();
629-
ADcombinedPSdata comb_stats(prof_stats, count_stats, metrics, &m_perf);
630-
comb_stats.send(*m_net_client);
631-
m_perf.add("ad_gather_send_ps_data_gather_send_all_stats_to_ps_time_ms", timer.elapsed_ms());
640+
ADcombinedPSdataArray comb_stats(m_funcstats_buf, m_countstats_buf, m_anom_metrics_buf, &m_perf);
641+
comb_stats.send(*m_net_client); //serializes and puts result in buffer awaiting send so we are free to flush the buffers
642+
m_funcstats_buf.clear(); m_countstats_buf.clear(); m_anom_metrics_buf.clear();
643+
m_perf.add("ad_send_ps_data_total_time_ms", timer.elapsed_ms());
632644
}
633645
}
634646

647+
648+
635649
bool Chimbuko::runFrame(unsigned long long& n_func_events,
636650
unsigned long long& n_comm_events,
637651
unsigned long long& n_counter_events,
@@ -720,7 +734,7 @@ bool Chimbuko::runFrame(unsigned long long& n_func_events,
720734

721735
//Gather and send statistics and data to the pserver
722736
timer.start();
723-
gatherAndSendPSdata(anomalies, step, m_execdata_first_event_ts, m_execdata_last_event_ts);
737+
gatherPSdata(anomalies, step, m_execdata_first_event_ts, m_execdata_last_event_ts);
724738
m_perf.add("ad_run_gather_send_ps_data_time_ms", timer.elapsed_ms());
725739

726740
//Trim the call list
@@ -739,7 +753,9 @@ bool Chimbuko::runFrame(unsigned long long& n_func_events,
739753
if(do_step_report){ headProgressStream(m_params.rank) << "driver rank " << m_params.rank << " function execution analysis complete: total=" << nout + nnormal << " normal=" << nnormal << " anomalous=" << nout << std::endl; }
740754
}//if(do_run_analysis)
741755

742-
sendProvenance(step); //only actually sends if the step aligns with the send frequency. Want to do this even if not analyzing this step
756+
//Send provenance and PS data if the step aligns with the send frequency. Want to do this even if not analyzing this step
757+
sendProvenance(step);
758+
sendPSdata(step);
743759

744760
m_perf.add("ad_run_total_step_time_excl_parse_ms", step_timer.elapsed_ms());
745761

@@ -805,5 +821,7 @@ void Chimbuko::run(unsigned long long& n_func_events,
805821
if (m_params.interval_msec)
806822
std::this_thread::sleep_for(std::chrono::milliseconds(m_params.interval_msec));
807823
}
808-
sendProvenance(m_parser->getCurrentStep(), true); //send any outstanding buffered provenance data (second arg forces send)
824+
//send any outstanding buffered provenance and PS data (second arg forces send)
825+
sendProvenance(m_parser->getCurrentStep(), true);
826+
sendPSdata(m_parser->getCurrentStep(), true);
809827
}

src/pserver/NetPayloadRecvCombinedADdata.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,29 @@ void NetPayloadRecvCombinedADdata::action(Message &response, const Message &mess
2222

2323
response.set_msg("", false);
2424
}
25+
26+
void NetPayloadRecvCombinedADdataArray::action(Message &response, const Message &message){
27+
check(message);
28+
if(m_global_anom_stats == nullptr) throw std::runtime_error("Cannot update global anomaly statistics as stats object has not been linked");
29+
if(m_global_counter_stats == nullptr) throw std::runtime_error("Cannot update global counter statistics as stats object has not been linked");
30+
if(m_global_anom_metrics == nullptr) throw std::runtime_error("Cannot update global anomaly metrics as stats object has not been linked");
31+
32+
std::vector<ADLocalFuncStatistics> locf;
33+
std::vector<ADLocalCounterStatistics> locc;
34+
std::vector<ADLocalAnomalyMetrics> locm;
35+
36+
ADcombinedPSdataArray comb(locf,locc,locm);
37+
comb.net_deserialize(message.buf());
38+
39+
int N = locf.size();
40+
if(locc.size() != N || locm.size() != N) fatal_error("Expect consistent size for arrays");
41+
42+
for(int i=0;i<N;i++){
43+
m_global_anom_stats->add_anomaly_data(locf[i]);
44+
m_global_counter_stats->add_counter_data(locc[i]);
45+
m_global_anom_metrics->add(locm[i]);
46+
}
47+
48+
response.set_msg("", false);
49+
}
2550

0 commit comments

Comments
 (0)