Skip to content

Commit 98d38ac

Browse files
committed
The logic of extracting the provenance data for normal and anomalous events is now contained in a static member function of ADAnomalyProvenance rather than being part of the AD main code
1 parent d0428bf commit 98d38ac

3 files changed

Lines changed: 141 additions & 42 deletions

File tree

include/chimbuko/ad/ADAnomalyProvenance.hpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
#include <chimbuko/param.hpp>
55
#include "chimbuko/ad/ADCounter.hpp"
66
#include "chimbuko/ad/ADMetadataParser.hpp"
7+
#include "chimbuko/ad/ADNormalEventProvenance.hpp"
8+
#include "chimbuko/util/Anomalies.hpp"
9+
#include "chimbuko/util/PerfStats.hpp"
710

811
namespace chimbuko{
912

@@ -38,6 +41,71 @@ namespace chimbuko{
3841
*/
3942
nlohmann::json get_json() const;
4043

44+
/**
45+
* @brief Extract the json provDB entries for the anomalies and normal events from an Anomalies collection
46+
*
47+
* @param anom_event_entries The provDB entries for anomalous events
48+
* @param normal_event_entries The provDB entries for normal events
49+
* @param normal_event_manager An instance of ADNormalEventProvenance that maintains normal events between calls
50+
* @param perf Performance timing
51+
* @param anomalies The Anomalies object containing anomalies and select normal events for this io step
52+
* @param step The io step
53+
* @param first_event_ts The timestamp of the first event in the io step
54+
* @param last_event_ts The timestamp of the last event in the io step
55+
* @param anom_win_size The window size of events to capture around each anomaly
56+
* @param algo_params The outlier algorithm parameters
57+
* @param event_man The event manager object
58+
* @param counters The counter manager object
59+
* @param metadata The metadata manager object
60+
*/
61+
static void getProvenanceEntries(std::vector<nlohmann::json> &anom_event_entries,
62+
std::vector<nlohmann::json> &normal_event_entries,
63+
ADNormalEventProvenance &normal_event_manager,
64+
PerfStats &perf,
65+
const Anomalies &anomalies,
66+
const int step,
67+
const unsigned long first_event_ts,
68+
const unsigned long last_event_ts,
69+
const unsigned int anom_win_size,
70+
const ParamInterface &algo_params,
71+
const ADEvent &event_man,
72+
const ADCounter &counters,
73+
const ADMetadataParser &metadata);
74+
75+
/**
76+
* @brief Extract the json provDB entries for the anomalies and normal events from an Anomalies collection
77+
*
78+
* @param anom_event_entries The provDB entries for anomalous events
79+
* @param normal_event_entries The provDB entries for normal events
80+
* @param normal_event_manager An instance of ADNormalEventProvenance that maintains normal events between calls
81+
* @param anomalies The Anomalies object containing anomalies and select normal events for this io step
82+
* @param step The io step
83+
* @param first_event_ts The timestamp of the first event in the io step
84+
* @param last_event_ts The timestamp of the last event in the io step
85+
* @param anom_win_size The window size of events to capture around each anomaly
86+
* @param algo_params The outlier algorithm parameters
87+
* @param event_man The event manager object
88+
* @param counters The counter manager object
89+
* @param metadata The metadata manager object
90+
*/
91+
static void getProvenanceEntries(std::vector<nlohmann::json> &anom_event_entries,
92+
std::vector<nlohmann::json> &normal_event_entries,
93+
ADNormalEventProvenance &normal_event_manager,
94+
const Anomalies &anomalies,
95+
const int step,
96+
const unsigned long first_event_ts,
97+
const unsigned long last_event_ts,
98+
const unsigned int anom_win_size,
99+
const ParamInterface &algo_params,
100+
const ADEvent &event_man,
101+
const ADCounter &counters,
102+
const ADMetadataParser &metadata){
103+
PerfStats perf;
104+
getProvenanceEntries(anom_event_entries, normal_event_entries,
105+
normal_event_manager, perf, anomalies, step, first_event_ts, last_event_ts,
106+
anom_win_size, algo_params, event_man, counters, metadata);
107+
}
108+
41109
private:
42110
/**
43111
* @brief Get the call stack

src/ad/ADAnomalyProvenance.cpp

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,68 @@ nlohmann::json ADAnomalyProvenance::get_json() const{
168168

169169
};
170170
}
171+
172+
173+
174+
175+
176+
void ADAnomalyProvenance::getProvenanceEntries(std::vector<nlohmann::json> &anom_event_entries,
177+
std::vector<nlohmann::json> &normal_event_entries,
178+
ADNormalEventProvenance &normal_event_manager,
179+
PerfStats &perf,
180+
const Anomalies &anomalies,
181+
const int step,
182+
const unsigned long first_event_ts,
183+
const unsigned long last_event_ts,
184+
const unsigned int anom_win_size,
185+
const ParamInterface &algo_params,
186+
const ADEvent &event_man,
187+
const ADCounter &counters,
188+
const ADMetadataParser &metadata){
189+
190+
constexpr bool do_delete = true;
191+
constexpr bool add_outstanding = true;
192+
193+
PerfTimer timer,timer2;
194+
195+
//Put new normal event provenance into m_normalevent_prov
196+
timer.start();
197+
for(auto norm_it : anomalies.allEvents(Anomalies::EventType::Normal)){
198+
timer2.start();
199+
ADAnomalyProvenance extract_prov(*norm_it, event_man, algo_params,
200+
counters, metadata, anom_win_size,
201+
step, first_event_ts, last_event_ts);
202+
normal_event_manager.addNormalEvent(norm_it->get_pid(), norm_it->get_rid(), norm_it->get_tid(), norm_it->get_fid(), extract_prov.get_json());
203+
perf.add("ad_extract_send_prov_normalevent_update_per_event_ms", timer2.elapsed_ms());
204+
}
205+
perf.add("ad_extract_send_prov_normalevent_update_total_ms", timer.elapsed_ms());
206+
207+
//Get any outstanding normal events from previous timesteps that we couldn't previously provide
208+
timer.start();
209+
210+
normal_event_entries = normal_event_manager.getOutstandingRequests(do_delete); //allow deletion of internal copy of events that are returned
211+
212+
perf.add("ad_extract_send_prov_normalevent_get_outstanding_ms", timer.elapsed_ms());
213+
214+
//Gather provenance of anomalies and for each one try to obtain a normal execution
215+
timer.start();
216+
anom_event_entries.resize(anomalies.nEvents(Anomalies::EventType::Outlier));
217+
size_t i=0;
218+
for(auto anom_it : anomalies.allEvents(Anomalies::EventType::Outlier)){
219+
timer2.start();
220+
ADAnomalyProvenance extract_prov(*anom_it, event_man, algo_params,
221+
counters, metadata, anom_win_size,
222+
step, first_event_ts, last_event_ts);
223+
anom_event_entries[i++] = extract_prov.get_json();
224+
perf.add("ad_extract_send_prov_anom_data_generation_per_anom_ms", timer2.elapsed_ms());
225+
226+
//Get the associated normal event
227+
//if normal event not available put into the list of outstanding requests
228+
//if normal event is available, delete internal copy within m_normalevent_prov so the normal event isn't added more than once
229+
timer2.start();
230+
auto nev = normal_event_manager.getNormalEvent(anom_it->get_pid(), anom_it->get_rid(), anom_it->get_tid(), anom_it->get_fid(), add_outstanding, do_delete);
231+
if(nev.second) normal_event_entries.push_back(std::move(nev.first));
232+
perf.add("ad_extract_send_prov_normalevent_gather_per_anom_ms", timer2.elapsed_ms());
233+
}
234+
235+
}

src/chimbuko.cpp

Lines changed: 8 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -377,56 +377,22 @@ void Chimbuko::extractAndSendProvenance(const Anomalies &anomalies,
377377
const int step,
378378
const unsigned long first_event_ts,
379379
const unsigned long last_event_ts) const{
380-
constexpr bool do_delete = true;
381-
constexpr bool add_outstanding = true;
382-
383380
//Gather provenance data on anomalies and send to provenance database
384381
if(m_params.prov_outputpath.length() > 0
385382
#ifdef ENABLE_PROVDB
386383
|| m_provdb_client->isConnected()
387384
#endif
388385
){
389-
PerfTimer timer,timer2;
390-
391-
//Put new normal event provenance into m_normalevent_prov
392-
timer.start();
393-
for(auto norm_it : anomalies.allEvents(Anomalies::EventType::Normal)){
394-
timer2.start();
395-
ADAnomalyProvenance extract_prov(*norm_it, *m_event, *m_outlier->get_global_parameters(),
396-
*m_counter, *m_metadata_parser, m_params.anom_win_size,
397-
step, first_event_ts, last_event_ts);
398-
m_normalevent_prov->addNormalEvent(norm_it->get_pid(), norm_it->get_rid(), norm_it->get_tid(), norm_it->get_fid(), extract_prov.get_json());
399-
m_perf.add("ad_extract_send_prov_normalevent_update_per_event_ms", timer2.elapsed_ms());
400-
}
401-
m_perf.add("ad_extract_send_prov_normalevent_update_total_ms", timer.elapsed_ms());
402-
403-
//Get any outstanding normal events from previous timesteps that we couldn't previously provide
404-
timer.start();
405-
std::vector<nlohmann::json> normalevent_prov = m_normalevent_prov->getOutstandingRequests(do_delete); //allow deletion of internal copy of events that are returned
406-
m_perf.add("ad_extract_send_prov_normalevent_get_outstanding_ms", timer.elapsed_ms());
407-
408-
//Gather provenance of anomalies and for each one try to obtain a normal execution
409-
timer.start();
410-
std::vector<nlohmann::json> anomaly_prov(anomalies.nEvents(Anomalies::EventType::Outlier));
411-
size_t i=0;
412-
for(auto anom_it : anomalies.allEvents(Anomalies::EventType::Outlier)){
413-
timer2.start();
414-
ADAnomalyProvenance extract_prov(*anom_it, *m_event, *m_outlier->get_global_parameters(),
415-
*m_counter, *m_metadata_parser, m_params.anom_win_size,
416-
step, first_event_ts, last_event_ts);
417-
anomaly_prov[i++] = extract_prov.get_json();
418-
m_perf.add("ad_extract_send_prov_anom_data_generation_per_anom_ms", timer2.elapsed_ms());
419-
420-
//Get the associated normal event
421-
//if normal event not available put into the list of outstanding requests
422-
//if normal event is available, delete internal copy within m_normalevent_prov so the normal event isn't added more than once
423-
timer2.start();
424-
auto nev = m_normalevent_prov->getNormalEvent(anom_it->get_pid(), anom_it->get_rid(), anom_it->get_tid(), anom_it->get_fid(), add_outstanding, do_delete);
425-
if(nev.second) normalevent_prov.push_back(std::move(nev.first));
426-
m_perf.add("ad_extract_send_prov_normalevent_gather_per_anom_ms", timer2.elapsed_ms());
427-
}
386+
//Get the provenance data
387+
PerfTimer timer;
388+
std::vector<nlohmann::json> anomaly_prov, normalevent_prov;
389+
ADAnomalyProvenance::getProvenanceEntries(anomaly_prov, normalevent_prov, *m_normalevent_prov, m_perf,
390+
anomalies, step, first_event_ts, last_event_ts, m_params.anom_win_size,
391+
*m_outlier->get_global_parameters(), *m_event, *m_counter, *m_metadata_parser);
428392
m_perf.add("ad_extract_send_prov_provenance_data_generation_total_ms", timer.elapsed_ms());
429393

394+
395+
//Write and send provenance dataX
430396
if(anomaly_prov.size() > 0){
431397
timer.start();
432398
m_io->writeJSON(anomaly_prov, step, "anomalies");

0 commit comments

Comments
 (0)