Skip to content

Commit c9946b3

Browse files
committed
AD and pserver now collect / aggregate anomaly metrics, pserver sends to viz
Simulator now simulates collection / aggregation of anomaly metrics Added GlobalAnomalyMetrics to ADcombinedPSdata + unit test In benchmark_suite/func_multimodal, the anomalous function now executes a varying number of times a function that performa sleep for a fixed time such that it is the parent that appears anomalous, not the sleep call Fixed GlobalAnomalyMetrics unit test failing due to addition of _id field causing json outputs to differ
1 parent 0b31583 commit c9946b3

16 files changed

Lines changed: 124 additions & 30 deletions

File tree

app/pserver.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ int main (int argc, char ** argv){
125125
}
126126
GlobalAnomalyStats global_func_stats; //global anomaly statistics
127127
GlobalCounterStats global_counter_stats; //global counter statistics
128+
GlobalAnomalyMetrics global_anom_metrics; //global anomaly metrics
128129
PSglobalFunctionIndexMap global_func_index_map; //mapping of function name to global index
129130

130131
//Optionally load previously-computed AD algorithm statistics
@@ -187,13 +188,14 @@ int main (int argc, char ** argv){
187188

188189
net.add_payload(new NetPayloadUpdateParams(param, args.freeze_params));
189190
net.add_payload(new NetPayloadGetParams(param));
190-
net.add_payload(new NetPayloadRecvCombinedADdata(&global_func_stats, &global_counter_stats));
191+
net.add_payload(new NetPayloadRecvCombinedADdata(&global_func_stats, &global_counter_stats, &global_anom_metrics));
191192
net.add_payload(new NetPayloadGlobalFunctionIndexMapBatched(&global_func_index_map));
192193
net.init(nullptr, nullptr, args.nt);
193194

194195
//Start sending anomaly statistics to viz
195196
stat_sender.add_payload(new PSstatSenderGlobalAnomalyStatsPayload(&global_func_stats));
196197
stat_sender.add_payload(new PSstatSenderGlobalCounterStatsPayload(&global_counter_stats));
198+
stat_sender.add_payload(new PSstatSenderGlobalAnomalyMetricsPayload(&global_anom_metrics));
197199
stat_sender.run_stat_sender(args.ws_addr, args.stat_outputdir);
198200

199201
//Register a signal handler that prevents the application from exiting on SIGTERM; instead this signal will be handled by ZeroMQ and will cause the pserver to shutdown gracefully

benchmark_suite/func_multimodal/main.C

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,19 @@ T strToAny(const std::string &s){
1414
return out;
1515
}
1616

17+
int round_to_nearest_100(int val){
18+
int nhundred = ( val + 50 ) / 100;
19+
return nhundred * 100;
20+
}
21+
void sleep_100ms(){
22+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
23+
}
1724

25+
//In order to make this function exhibit the anomaly and not just sleep_for, call sleep in units of 100ms
1826
void the_function(const int sleep_ms){
19-
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
27+
int nhundred = sleep_ms / 100;
28+
for(int i=0;i<nhundred;i++)
29+
sleep_100ms();
2030
}
2131

2232
int main(int argc, char **argv){
@@ -29,19 +39,22 @@ int main(int argc, char **argv){
2939
std::stringstream ss;
3040
ss << argv[2];
3141

32-
std::cout << "Mode times: ";
42+
std::cout << "Mode times (rounded to nearest 100ms): ";
3343
std::vector<int> mode_times;
3444
while(ss.good()) {
3545
std::string substr;
3646
getline(ss, substr, ','); //get first string delimited by comma
37-
mode_times.push_back(strToAny<int>(substr));
47+
int val = strToAny<int>(substr);
48+
mode_times.push_back(round_to_nearest_100(val));
3849
std::cout << mode_times.back() << " ";
3950
}
4051
std::cout << std::endl;
4152

42-
int anom_time = strToAny<int>(argv[3]);
53+
int anom_time = round_to_nearest_100(strToAny<int>(argv[3]));
4354
int anom_freq = strToAny<int>(argv[4]);
4455

56+
std::cout << "Anomaly time (rounded to nearest 100ms): " << anom_time << std::endl;
57+
4558
std::mt19937 gen(1234);
4659
std::uniform_int_distribution<> dist(0, mode_times.size()-1);
4760

include/chimbuko/AD.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "chimbuko/ad/ADNetClient.hpp"
1111
#include "chimbuko/ad/ADLocalFuncStatistics.hpp"
1212
#include "chimbuko/ad/ADLocalCounterStatistics.hpp"
13+
#include "chimbuko/ad/ADLocalAnomalyMetrics.hpp"
1314
#include "chimbuko/ad/ADcombinedPSdata.hpp"
1415
#include "chimbuko/ad/ADProvenanceDBclient.hpp"
1516
#include "chimbuko/ad/ADMetadataParser.hpp"

include/chimbuko/ad/ADcombinedPSdata.hpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,44 @@
22
#include <chimbuko_config.h>
33
#include <chimbuko/ad/ADLocalFuncStatistics.hpp>
44
#include <chimbuko/ad/ADLocalCounterStatistics.hpp>
5+
#include <chimbuko/ad/ADLocalAnomalyMetrics.hpp>
56

67
namespace chimbuko{
78
/**
89
* @brief A class that combines all data that is sent to the PS into a single send transmission
910
*/
1011
class ADcombinedPSdata{
1112
public:
12-
ADcombinedPSdata(ADLocalFuncStatistics &func_stats, ADLocalCounterStatistics &counter_stats, PerfStats* perf=nullptr):
13-
m_func_stats(func_stats), m_counter_stats(counter_stats), m_perf(perf){}
13+
ADcombinedPSdata(ADLocalFuncStatistics &func_stats,
14+
ADLocalCounterStatistics &counter_stats,
15+
ADLocalAnomalyMetrics &anom_metrics,
16+
PerfStats* perf=nullptr):
17+
m_func_stats(func_stats), m_counter_stats(counter_stats), m_anom_metrics(anom_metrics), m_perf(perf){}
1418

1519
ADLocalCounterStatistics &get_counter_stats(){ return m_counter_stats; }
1620
const ADLocalCounterStatistics &get_counter_stats() const{ return m_counter_stats; }
1721

1822
ADLocalFuncStatistics &get_func_stats(){ return m_func_stats; }
1923
const ADLocalFuncStatistics &get_func_stats() const{ return m_func_stats; }
2024

25+
ADLocalAnomalyMetrics &get_anom_metrics(){ return m_anom_metrics; }
26+
const ADLocalAnomalyMetrics &get_anom_metrics() const{ return m_anom_metrics; }
27+
28+
2129
/**
2230
* @brief The State object is what is serialized for the communication
2331
*/
2432
struct State{
2533
ADLocalFuncStatistics::State func_stats_state;
2634
ADLocalCounterStatistics::State counter_stats_state;
35+
ADLocalAnomalyMetrics::State anom_metrics_state;
2736

2837
/**
2938
* @brief Serialize using cereal
3039
*/
3140
template<class Archive>
3241
void serialize(Archive & archive){
33-
archive(func_stats_state , counter_stats_state);
42+
archive(func_stats_state , counter_stats_state, anom_metrics_state);
3443
}
3544

3645
/**
@@ -80,6 +89,7 @@ namespace chimbuko{
8089
private:
8190
ADLocalFuncStatistics &m_func_stats;
8291
ADLocalCounterStatistics &m_counter_stats;
92+
ADLocalAnomalyMetrics &m_anom_metrics;
8393
PerfStats* m_perf;
8494
};
8595

include/chimbuko/chimbuko.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ namespace chimbuko {
198198
* @brief Gather and send the required data to the pserver
199199
*/
200200
void gatherAndSendPSdata(const Anomalies &anomalies,
201-
const int step) const;
201+
const int step,
202+
const unsigned long first_event_ts,
203+
const unsigned long last_event_ts) const;
202204

203205
/**
204206
* @brief Send new metadata entries collected during current fram to provenance DB

include/chimbuko/pserver/NetPayloadRecvCombinedADdata.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <chimbuko_config.h>
33
#include <chimbuko/pserver/GlobalAnomalyStats.hpp>
44
#include <chimbuko/pserver/GlobalCounterStats.hpp>
5+
#include <chimbuko/pserver/GlobalAnomalyMetrics.hpp>
56

67
namespace chimbuko{
78

@@ -11,9 +12,12 @@ namespace chimbuko{
1112
class NetPayloadRecvCombinedADdata: public NetPayloadBase{
1213
GlobalAnomalyStats * m_global_anom_stats;
1314
GlobalCounterStats * m_global_counter_stats;
15+
GlobalAnomalyMetrics * m_global_anom_metrics;
1416
public:
15-
NetPayloadRecvCombinedADdata(GlobalAnomalyStats * global_anom_stats, GlobalCounterStats * global_counter_stats): m_global_anom_stats(global_anom_stats),
16-
m_global_counter_stats(global_counter_stats){}
17+
NetPayloadRecvCombinedADdata(GlobalAnomalyStats * global_anom_stats, GlobalCounterStats * global_counter_stats,
18+
GlobalAnomalyMetrics * global_anom_metrics): m_global_anom_stats(global_anom_stats),
19+
m_global_counter_stats(global_counter_stats),
20+
m_global_anom_metrics(global_anom_metrics){}
1721
MessageKind kind() const override{ return MessageKind::AD_PS_COMBINED_STATS; }
1822
MessageType type() const override{ return MessageType::REQ_ADD; }
1923
void action(Message &response, const Message &message) override;

sim/include/sim/pserver.hpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,21 @@
33
#include<chimbuko/param.hpp>
44
#include<chimbuko/pserver/GlobalAnomalyStats.hpp>
55
#include<chimbuko/pserver/GlobalCounterStats.hpp>
6+
#include<chimbuko/pserver/GlobalAnomalyMetrics.hpp>
67

78
namespace chimbuko_sim{
89
using namespace chimbuko;
9-
10-
//An object that represents the pserver for the purposes of aggregating data over AD instances and writing streaming output to disk
10+
11+
/**
12+
* @brief An object that represents the pserver for the purposes of aggregating data over AD instances and writing streaming output to disk
13+
*/
1114
class pserverSim{
12-
GlobalAnomalyStats global_func_stats; //global anomaly statistics
13-
GlobalCounterStats global_counter_stats; //global counter statistics
15+
GlobalAnomalyStats global_func_stats; /**< global anomaly statistics */
16+
GlobalCounterStats global_counter_stats; /**< global counter statistics */
17+
GlobalAnomalyMetrics global_anomaly_metrics; /**< global anomaly metrics */
1418
PSstatSenderGlobalAnomalyStatsPayload anomaly_stats_payload;
1519
PSstatSenderGlobalCounterStatsPayload counter_stats_payload;
20+
PSstatSenderGlobalAnomalyMetricsPayload anomaly_metrics_payload;
1621
LocalNet m_net; /**< The net server */
1722
ParamInterface* m_ad_params; /**< If using an AD algorithm to analyze the trace, these are the global parameters*/
1823
public:
@@ -37,6 +42,13 @@ namespace chimbuko_sim{
3742
void addCounterData(const ADLocalCounterStatistics &loc){
3843
global_counter_stats.add_counter_data(loc);
3944
}
45+
46+
/**
47+
* @brief Stand-in for the anomaly metrics comms from AD -> pserver
48+
*/
49+
void addAnomalyMetrics(const ADLocalAnomalyMetrics &loc){
50+
global_anomaly_metrics.add(loc);
51+
}
4052

4153
/**
4254
* @brief Mirror write functionality of PSstatSender

sim/src/ad.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,9 @@ void ADsim::step(const unsigned long step){
292292
prof_stats.gatherStatistics(&this_step_func_execs); //takes map of fid -> vector of ExecData iterators
293293
prof_stats.gatherAnomalies(anom);
294294
getPserver().addAnomalyData(prof_stats);
295+
296+
ADLocalAnomalyMetrics metrics(m_pid, m_rid, step, step_start_time, step_end_time, anom);
297+
getPserver().addAnomalyMetrics(metrics);
295298
}
296299
}
297300

sim/src/pserver.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ void pserverSim::writeStreamingOutput() const{
88
nlohmann::json json_packet;
99
anomaly_stats_payload.add_json(json_packet);
1010
counter_stats_payload.add_json(json_packet);
11-
11+
anomaly_metrics_payload.add_json(json_packet);
12+
1213
static int iter = 0;
1314

1415
std::ostringstream fname; fname << "pserver_output_stats_" << iter << ".json";
@@ -19,7 +20,8 @@ void pserverSim::writeStreamingOutput() const{
1920
}
2021
}
2122

22-
pserverSim::pserverSim(): anomaly_stats_payload(&global_func_stats), counter_stats_payload(&global_counter_stats), m_ad_params(nullptr){
23+
pserverSim::pserverSim(): anomaly_stats_payload(&global_func_stats), counter_stats_payload(&global_counter_stats),
24+
anomaly_metrics_payload(&global_anomaly_metrics), m_ad_params(nullptr){
2325
if(adAlgorithmParams().algorithm != "none"){
2426
m_ad_params = ParamInterface::set_AdParam(adAlgorithmParams().algorithm);
2527
m_net.add_payload(new NetPayloadUpdateParams(m_ad_params, false));

src/ad/ADcombinedPSdata.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ ADcombinedPSdata::State ADcombinedPSdata::get_state() const{
1616
State state;
1717
state.func_stats_state = m_func_stats.get_state();
1818
state.counter_stats_state = m_counter_stats.get_state();
19+
state.anom_metrics_state = m_anom_metrics.get_state();
1920
return state;
2021
}
2122

2223
void ADcombinedPSdata::set_state(const ADcombinedPSdata::State &s){
2324
m_func_stats.set_state(s.func_stats_state);
2425
m_counter_stats.set_state(s.counter_stats_state);
26+
m_anom_metrics.set_state(s.anom_metrics_state);
2527
}
2628

2729

@@ -43,7 +45,8 @@ std::pair<size_t, size_t> ADcombinedPSdata::send(ADNetClient &net_client) const{
4345
return std::make_pair(0, 0);
4446

4547
int step = m_func_stats.getAnomalyData().get_step();
46-
if(m_counter_stats.getIOstep() != step) fatal_error("Step value mismatch between members");
48+
if(m_counter_stats.getIOstep() != step) fatal_error("Step value mismatch between counter stats and func anomaly stats");
49+
if(m_anom_metrics.get_step() != step) fatal_error("Step value mismatch between anomaly metrics and func anomaly stats");
4750

4851
Message msg;
4952
msg.set_info(net_client.get_client_rank(), net_client.get_server_rank(), MessageType::REQ_ADD, MessageKind::AD_PS_COMBINED_STATS, step);

0 commit comments

Comments
 (0)