Skip to content

Commit 1c4954f

Browse files
committed
Moved generation of JSON from AggregatedAnomalyData from GlobalAnomalyStats to a method of AggregatedAnomalyData
Improved thread safety of GlobalAnomalyStats and marked functions in header as thread safe or not thread safe Made outlier_severity a method of ExecData
1 parent b5ce01a commit 1c4954f

7 files changed

Lines changed: 120 additions & 55 deletions

File tree

include/chimbuko/ad/ExecData.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,10 +482,15 @@ class ExecData_t {
482482
unsigned long get_n_counter() const { return m_counters.size(); }
483483

484484
/**
485-
* @brief Return the outlier score assigned to the data point
485+
* @brief Return the outlier score assigned to the data point, representing how unlikely an event is
486486
*/
487487
double get_outlier_score() const{ return m_score; }
488488

489+
/**
490+
* @brief Return the outlier severity, representing how important the outlier is
491+
*/
492+
double get_outlier_severity() const{ return (double)get_runtime(); }
493+
489494
/**
490495
* @brief Set the label
491496
*

include/chimbuko/pserver/AggregateAnomalyData.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ namespace chimbuko {
6464
*/
6565
std::list<AnomalyData>* get_data() const;
6666

67+
/**
68+
* @brief Get the current statistics in JSON format and flush the list of AnomalyData collected since previous call to get()
69+
* @param pid The program index
70+
* @param rid The rank index
71+
*/
72+
nlohmann::json get_json_and_flush(int pid, int rid);
73+
6774
/**
6875
* @brief Return the number of JSON-formatted strings of serialized incoming AnomalyData since the last flush
6976
*/

include/chimbuko/pserver/GlobalAnomalyStats.hpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ namespace chimbuko{
2323

2424
/**
2525
* @brief Merge internal statistics with those contained within the input ADLocalFuncStatistics object
26+
*
27+
* Thread safe
2628
*/
2729
void add_anomaly_data(const ADLocalFuncStatistics& data);
2830

@@ -31,13 +33,17 @@ namespace chimbuko{
3133
* @brief Get the JSON-formatted string corresponding to the anomaly statistics (RunStats instance) for a given program/rank
3234
* @param pid program index
3335
* @param rid rank
36+
*
37+
* Thread safe
3438
*/
3539
std::string get_anomaly_stat(const int pid, const unsigned long rid) const;
3640

3741
/**
3842
* @brief Get the RunStats object corresponding to the anomaly statistics for a given program/rank (throw error if not present)
3943
* @param pid program index
4044
* @param rid rank
45+
*
46+
* Thread safe
4147
*/
4248
RunStats get_anomaly_stat_obj(const int pid, const unsigned long rid) const;
4349

@@ -46,19 +52,25 @@ namespace chimbuko{
4652
* @brief Const accessor to the AggregateAnomalyData instance corresponding to a particular stat_id (throw error if not present)
4753
* @param pid program index
4854
* @param rid rank
55+
*
56+
* NOT thread safe
4957
*/
5058
const AggregateAnomalyData & get_anomaly_stat_container(const int pid, const unsigned long rid) const;
5159

5260
/**
5361
* @brief Get the number of anomaly data objects collected since the last flush for a given program/rank
5462
* @param pid program index
5563
* @param rid rank
64+
*
65+
* Thread safe
5666
*/
5767
size_t get_n_anomaly_data(const int pid, const unsigned long rid) const;
5868

5969
/**
6070
* @brief Update internal data to include additional information
61-
* @parag anom The anomaly data
71+
* @param anom The anomaly data
72+
*
73+
* Thread safe
6274
*/
6375
void update_anomaly_stat(const AnomalyData &anom);
6476

@@ -70,6 +82,8 @@ namespace chimbuko{
7082
* @param n_anomaly The number of anomalies detected
7183
* @param inclusive Statistics on inclusive timings
7284
* @param exclusive Statistics on exclusive timings
85+
*
86+
* Thread safe
7387
*/
7488
void update_func_stat(int pid,
7589
unsigned long fid,
@@ -82,32 +96,44 @@ namespace chimbuko{
8296
* @brief Get the FuncStats object containing the profile information for the specified function
8397
* @param pid Program index
8498
* @param fid Function index
99+
*
100+
* NOT thread safe
85101
*/
86102
const AggregateFuncStats & get_func_stats(int pid, unsigned long fid) const;
87103

88104
/**
89105
* @brief Collect anomaly statistics into JSON object and flush the m_anomaly_stats statistics
106+
*
107+
* Thread safe
90108
*/
91109
nlohmann::json collect_stat_data();
92110

93111
/**
94112
* @brief Collect aggregated function profile statistics into JSON object
113+
*
114+
* Thread safe
95115
*/
96116
nlohmann::json collect_func_data() const;
97117

98118
/**
99119
* @brief Collect anomaly statistics and function statistics. Flushes the m_anomaly_stats statistics
100120
* @return JSON object containing anomaly and function data
121+
*
122+
* Thread safe
101123
*/
102124
nlohmann::json collect();
103125

104126
/**
105127
* @brief Comparison operator
128+
*
129+
* NOT thread safe
106130
*/
107131
bool operator==(const GlobalAnomalyStats &r) const{ return m_anomaly_stats == r.m_anomaly_stats && m_funcstats == r.m_funcstats; }
108132

109133
/**
110134
* @brief Inequality operator
135+
*
136+
* NOT thread safe
111137
*/
112138
bool operator!=(const GlobalAnomalyStats &r) const{ return !( *this == r ); }
113139

src/ad/ADAnomalyProvenance.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ nlohmann::json ADAnomalyProvenance::get_json() const{
179179
{"io_step_tstart", m_io_step_tstart},
180180
{"io_step_tend", m_io_step_tend},
181181
{"outlier_score", m_call.get_outlier_score() },
182-
{"outlier_severity", m_call.get_runtime() },
182+
{"outlier_severity", m_call.get_outlier_severity() },
183183
{"hostname", getHostname() }
184184
};
185185
}

src/pserver/AggregateAnomalyData.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "chimbuko/pserver/AggregateAnomalyData.hpp"
2+
#include "chimbuko/util/string.hpp"
23
#include <sstream>
34

45
using namespace chimbuko;
@@ -89,3 +90,36 @@ bool AggregateAnomalyData::operator==(const AggregateAnomalyData &r) const{
8990
if(*m_data != *r.m_data) return false;
9091
return true;
9192
}
93+
94+
nlohmann::json AggregateAnomalyData::get_json_and_flush(int pid, int rid){
95+
auto stats = this->get(); //returns a std::pair<RunStats, std::list<AnomalyData>*>, and flushes the state of pair.second.
96+
//We now own the std::list<AnomalyData>* pointer and have to delete it
97+
98+
nlohmann::json object;
99+
100+
if(stats.second){
101+
//Decide whether to include the data for this pid/rid
102+
//Do this only if any anomalies were seen since the last call
103+
bool include = false;
104+
for(const AnomalyData &adata: *stats.second){
105+
if(adata.get_n_anomalies() > 0){
106+
include = true;
107+
break;
108+
}
109+
}
110+
111+
if(include){
112+
object["key"] = stringize("%d:%d", pid,rid);
113+
object["stats"] = stats.first.get_json(); //statistics on anomalies to date for this pid/rid
114+
115+
object["data"] = nlohmann::json::array();
116+
for (const AnomalyData &adata: *stats.second){
117+
//Don't include data for which there are no anomalies
118+
if(adata.get_n_anomalies()>0)
119+
object["data"].push_back(adata.get_json());
120+
}
121+
}
122+
delete stats.second;
123+
}
124+
return object;
125+
}

src/pserver/GlobalAnomalyStats.cpp

Lines changed: 27 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -42,68 +42,44 @@ const AggregateAnomalyData & GlobalAnomalyStats::get_anomaly_stat_container(cons
4242
}
4343

4444
RunStats GlobalAnomalyStats::get_anomaly_stat_obj(const int pid, const unsigned long rid) const{
45+
std::lock_guard<std::mutex> _(m_mutex_anom);
4546
return get_anomaly_stat_container(pid, rid).get_stats();
4647
}
4748

4849
std::string GlobalAnomalyStats::get_anomaly_stat(const int pid, const unsigned long rid) const{
49-
RunStats stat;
50-
try{
51-
stat = get_anomaly_stat_obj(pid,rid);
52-
}catch(const std::exception &e){
53-
return "";
54-
}
55-
return stat.get_json().dump();
50+
std::lock_guard<std::mutex> _(m_mutex_anom);
51+
auto pit = m_anomaly_stats.find(pid);
52+
if(pit == m_anomaly_stats.end()) return "";
53+
auto rit = pit->second.find(rid);
54+
if(rit == pit->second.end()) return "";
55+
return rit->second.get_stats().get_json().dump();
5656
}
5757

5858
size_t GlobalAnomalyStats::get_n_anomaly_data(const int pid, const unsigned long rid) const{
59-
AggregateAnomalyData const* s;
60-
try{
61-
s = &get_anomaly_stat_container(pid,rid);
62-
}catch(const std::exception &e){
63-
return 0;
64-
}
65-
return s->get_n_data();
59+
std::lock_guard<std::mutex> _(m_mutex_anom);
60+
auto pit = m_anomaly_stats.find(pid);
61+
if(pit == m_anomaly_stats.end()) return 0;
62+
auto rit = pit->second.find(rid);
63+
if(rit == pit->second.end()) return 0;
64+
return rit->second.get_n_data();
6665
}
6766

6867
nlohmann::json GlobalAnomalyStats::collect_stat_data(){
6968
nlohmann::json jsonObjects = nlohmann::json::array();
70-
71-
//m_anomaly_stats is a map of app_idx/rank to AggregateAnomalyData instances
72-
//AggregateAnomalyData contains statistics on the number of anomalies found per io step and also a set of AnomalyData objects
73-
//that have been collected from that rank since the last flush
74-
for(auto & pp : m_anomaly_stats){
75-
int pid = pp.first; //pid
76-
for(auto & rp: pp.second){
77-
unsigned long rid = rp.first; //rank
78-
79-
auto stats = rp.second.get(); //returns a std::pair<RunStats, std::list<AnomalyData>*>, and flushes the state of pair.second.
80-
//We now own the std::list<AnomalyData>* pointer and have to delete it
81-
82-
if(stats.second){
83-
//Decide whether to include the data for this pid/rid
84-
//Do this only if any anomalies were seen since the last call
85-
bool include = false;
86-
for(const AnomalyData &adata: *stats.second){
87-
if(adata.get_n_anomalies() > 0){
88-
include = true;
89-
break;
90-
}
91-
}
92-
93-
if(include){
94-
nlohmann::json object;
95-
object["key"] = stringize("%d:%d", pid,rid);
96-
object["stats"] = stats.first.get_json(); //statistics on anomalies to date for this pid/rid
97-
98-
object["data"] = nlohmann::json::array();
99-
for (const AnomalyData &adata: *stats.second){
100-
//Don't include data for which there are no anomalies
101-
if(adata.get_n_anomalies()>0)
102-
object["data"].push_back(adata.get_json());
103-
}
104-
jsonObjects.push_back(object);
105-
}
106-
delete stats.second;
69+
{
70+
std::lock_guard<std::mutex> _(m_mutex_anom);
71+
72+
//m_anomaly_stats is a map of app_idx/rank to AggregateAnomalyData instances
73+
//AggregateAnomalyData contains statistics on the number of anomalies found per io step and also a set of AnomalyData objects
74+
//that have been collected from that rank since the last flush
75+
for(auto & pp : m_anomaly_stats){
76+
int pid = pp.first; //pid
77+
for(auto & rp: pp.second){
78+
unsigned long rid = rp.first; //rank
79+
80+
nlohmann::json object = rp.second.get_json_and_flush(pid,rid);
81+
if(!object.empty())
82+
jsonObjects.push_back(std::move(object));
10783
}
10884
}
10985
}
@@ -153,7 +129,6 @@ void GlobalAnomalyStats::update_func_stat(int pid, unsigned long fid, const std:
153129
}
154130

155131
const AggregateFuncStats & GlobalAnomalyStats::get_func_stats(int pid, unsigned long fid) const{
156-
std::lock_guard<std::mutex> _(m_mutex_func);
157132
auto pit = m_funcstats.find(pid);
158133
if(pit == m_funcstats.end()) fatal_error("Could not find program index");
159134
auto fit = pit->second.find(fid);

test/unit_tests/unit_test_common.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <chimbuko/message.hpp>
1111
#include <zmq.h>
1212
#include <chimbuko/util/barrier.hpp>
13+
#include <chimbuko/util/RunStats.hpp>
1314

1415
namespace chimbuko{
1516
/**
@@ -404,4 +405,21 @@ namespace chimbuko{
404405
};
405406

406407

408+
bool compare(const RunStats &a, const RunStats &b, const double tol = 1e-12){
409+
RunStats::RunStatsValues sa = a.get_stat_values(), sb = b.get_stat_values();
410+
bool ret = true;
411+
#define COM(A) if(fabs( sa. A - sb. A ) > tol){ std::cout << #A << " " << sa. A << " " << sb. A << std::endl; ret = false; }
412+
COM(count);
413+
COM(minimum);
414+
COM(maximum);
415+
COM(accumulate);
416+
COM(mean);
417+
COM(stddev);
418+
COM(skewness);
419+
COM(kurtosis);
420+
return ret;
421+
#undef COM
422+
}
423+
424+
407425
}

0 commit comments

Comments
 (0)