Skip to content

Commit 6d1b6d2

Browse files
committed
Reworked how io steps are treated in the AD sim to resolve logic errors dealing with function executions that span io steps.
Now the user generates the full program trace and iterates through the IO steps afterwards.
1 parent 6493c84 commit 6d1b6d2

3 files changed

Lines changed: 108 additions & 54 deletions

File tree

sim/include/sim/ad.hpp

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include<chimbuko/ad/ADNormalEventProvenance.hpp>
66
#include<chimbuko/ad/ADEvent.hpp>
77
#include<chimbuko/ad/ADCounter.hpp>
8+
#include<map>
89

910
namespace chimbuko_sim{
1011
using namespace chimbuko;
@@ -13,32 +14,41 @@ namespace chimbuko_sim{
1314

1415
//An object that represents a rank of the AD
1516
class ADsim{
16-
std::unordered_map<unsigned long, std::list<ExecData_t> > m_all_execs; //map of thread to execs, never flushed
17-
std::unordered_map<eventID, CallListIterator_t> m_eventid_map; //map from event index to iterator ; never flushed
18-
std::list<CallListIterator_t> m_step_exec_its; //iterators to events on this io step; flushed at end of step
19-
std::unordered_map<unsigned long, std::vector<CallListIterator_t>> m_step_func_exec_its; //map of function id to iterators on this io step; flushed at end of step
17+
std::unordered_map<unsigned long, std::list<ExecData_t> > m_all_execs; /**< Map of thread to execs */
18+
std::unordered_map<eventID, CallListIterator_t> m_eventid_map; /**< Map from event index to iterator */
19+
std::map<unsigned long, std::list<CallListIterator_t> > m_step_execs; /**< Events in any given step */
20+
21+
unsigned long m_largest_step; /**< Largest step index of an inserted function execution thus far encountered */
22+
unsigned long m_program_start;
23+
unsigned long m_step_freq; /**< Frequency of IO steps */
2024

21-
bool m_step_is_open; /**< Is an io step currently open? */
22-
int m_step;
2325
int m_window_size;
2426
int m_pid;
2527
int m_rid;
26-
unsigned long m_step_start_time;
2728
ADNormalEventProvenance m_normal_events;
28-
ADCounter m_counters;
2929
ADMetadataParser m_metadata;
3030
std::unique_ptr<ADProvenanceDBclient> m_pdb_client;
3131
public:
32-
void init(int window_size, int pid, int rid);
32+
/**
33+
* @brief Initialize the AD simulator
34+
* @param window_size The number of events around an anomaly that are recorded in the provDB
35+
* @param pid The program index
36+
* @param rid The rank index
37+
* @param program_start Timestamp of program start
38+
* @param step_freq Frequency at which IO steps are to occur
39+
*/
40+
void init(int window_size, int pid, int rid, unsigned long program_start, unsigned long step_freq);
3341

3442
/**
3543
* @brief Instantiate the AD simulator
3644
* @param window_size The number of events around an anomaly that are recorded in the provDB
3745
* @param pid The program index
3846
* @param rid The rank index
47+
* @param program_start Timestamp of program start
48+
* @param step_freq Frequency at which IO steps are to occur
3949
*/
40-
ADsim(int window_size, int pid, int rid): ADsim(){
41-
init(window_size, pid, rid);
50+
ADsim(int window_size, int pid, int rid, unsigned long program_start, unsigned long step_freq): ADsim(){
51+
init(window_size, pid, rid, program_start, step_freq);
4252
}
4353
ADsim(){}
4454

@@ -101,16 +111,26 @@ namespace chimbuko_sim{
101111
void bindCPUparentGPUkernel(CallListIterator_t cpu_parent, CallListIterator_t gpu_kern);
102112

103113
/**
104-
* @brief Begin an IO step at the provided time
114+
* @brief Bind a parent and child execution
105115
*/
106-
void beginStep(const unsigned long step_start_time);
116+
static void bindParentChild(CallListIterator_t parent, CallListIterator_t child);
107117

108118
/**
109-
* @brief End the IO step at the provided time
119+
* @brief Analyze the given IO step, sending the data to the pserver/provdb
110120
*
111121
* When this is called the executions added since the start of the step will be gathered and analyzed, and the data send to the pserver/provdb
112122
*/
113-
void endStep(const unsigned long step_end_time);
123+
void step(const unsigned long step);
124+
125+
/**
126+
* @brief Get the largest step index in the trace
127+
*/
128+
unsigned long largest_step() const{ return m_largest_step; }
129+
130+
/**
131+
* @brief Get the number of executions in a given step
132+
*/
133+
size_t nStepExecs(unsigned long step) const;
114134
};
115135

116136
/**

sim/main/example1.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ int main(int argc, char **argv){
99

1010
int window_size = 5; //number of events to record around an anomaly in the provenance data
1111
int pid = 0; //program index
12-
12+
unsigned long program_start = 100;
13+
unsigned long step_freq = 1000;
14+
1315
//Setup the "AD" instances
1416
int n_ranks = 1;
1517
std::vector<ADsim> ad;
1618
for(int r=0;r<n_ranks;r++)
17-
ad.push_back(ADsim(window_size, pid, r));
19+
ad.push_back(ADsim(window_size, pid, r, program_start, step_freq));
1820

1921
//Setup some functions, providing some fake statistics to attach to their provenance data
2022
registerFunc("main", 1000, 200, 20);
@@ -31,22 +33,29 @@ int main(int argc, char **argv){
3133
ad[0].registerGPUthread(gpu_threads[g]);
3234
}
3335

36+
//Generate the program trace
3437
threadExecution thr0(0, ad[0]); //thread 0 (a cpu thread) on first ad
3538

36-
ad[0].beginStep(50);
37-
3839
thr0.enterNormal("main", 100, "tag_main"); //events can be tagged and retrieved once exit has been called, allowing for external manipulation
40+
3941
thr0.addCounter("main_counter", 1234, 120);
4042
thr0.addComm(CommType::Send, 1, 1024, 130);
4143
thr0.enterAnomaly("child1", 140, 1.99);
4244
thr0.exit(149);
43-
thr0.exit(150);
45+
46+
thr0.exit(1500);
4447

4548
auto it = thr0.getTagged("tag_main");
4649
std::cout << it << std::endl;
4750

48-
ad[0].endStep(200);
51+
//Run the simulation
52+
int nstep = ad[0].largest_step() + 1;
53+
std::cout << "Got data for " << nstep << " steps" << std::endl;
4954

55+
for(int i=0;i<nstep;i++){
56+
std::cout << "Step " << i << " contains " << ad[0].nStepExecs(i) << " execs" << std::endl;
57+
ad[0].step(i);
58+
}
5059

5160
MPI_Finalize();
5261
return 0;

sim/src/ad.cpp

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
using namespace chimbuko;
1616
using namespace chimbuko_sim;
1717

18-
void ADsim::init(int window_size, int pid, int rid){
19-
m_step = 0;
18+
void ADsim::init(int window_size, int pid, int rid, unsigned long program_start, unsigned long step_freq){
2019
m_window_size = window_size;
2120
m_pid = pid;
2221
m_rid = rid;
2322
m_pdb_client.reset(new ADProvenanceDBclient(rid));
2423
m_pdb_client->setEnableHandshake(false);
2524
m_pdb_client->connect(getProvDB().getAddr(), getProvDB().getNshards());
26-
m_counters.linkCounterMap(getCidxManager().getCounterMap());
27-
m_step_is_open = false;
25+
m_program_start = program_start;
26+
m_step_freq = step_freq;
27+
m_largest_step = 0;
2828
}
2929

3030
CallListIterator_t ADsim::addExec(const int thread,
@@ -33,17 +33,20 @@ CallListIterator_t ADsim::addExec(const int thread,
3333
unsigned long runtime,
3434
bool is_anomaly,
3535
double outlier_score){
36-
if(!m_step_is_open) fatal_error("No step is currently open");
37-
if(start + runtime < m_step_start_time) fatal_error("Function end time is not within the current step");
36+
if(start < m_program_start) fatal_error("Exec start is before program start");
3837

3938
auto it = funcIdxMap().find(func_name);
4039
if(it == funcIdxMap().end()) assert(0);
4140
unsigned long func_id = it->second;
4241

43-
static long event_idx_s = 0;
44-
long event_idx = event_idx_s++;
42+
unsigned long exit = start + runtime;
43+
unsigned long step = ( exit - m_program_start ) / m_step_freq;
44+
45+
long event_idx = m_step_execs[step].size();
4546

46-
eventID id(m_rid, m_step, event_idx);
47+
m_largest_step = std::max(m_largest_step, step);
48+
49+
eventID id(m_rid, step, event_idx);
4750

4851
ExecData_t exec(id, m_pid, m_rid, thread, func_id, func_name, start, start+runtime);
4952
exec.set_label( is_anomaly ? -1 : 1 );
@@ -53,9 +56,7 @@ CallListIterator_t ADsim::addExec(const int thread,
5356
CallListIterator_t exec_it = m_all_execs[thread].insert(m_all_execs[thread].end(), exec);
5457
m_eventid_map[id] = exec_it;
5558

56-
m_step_exec_its.push_back(exec_it);
57-
m_step_func_exec_its[func_id].push_back(exec_it);
58-
59+
m_step_execs[step].push_back(exec_it);
5960
return exec_it;
6061
}
6162

@@ -69,7 +70,6 @@ void ADsim::attachCounter(const std::string &counter_name,
6970
assert(ts <= to->get_exit());
7071

7172
CounterData_t cdata(m_pid, m_rid, to->get_tid(), cid, counter_name, value, ts);
72-
m_counters.addCounter(cdata);
7373
to->add_counter(cdata);
7474
}
7575

@@ -110,25 +110,48 @@ void ADsim::bindCPUparentGPUkernel(CallListIterator_t cpu_parent, CallListIterat
110110
cpu_parent->set_GPU_correlationID_partner(gpu_kern->get_id());
111111
}
112112

113-
void ADsim::beginStep(const unsigned long step_start_time){
114-
m_step_start_time = step_start_time;
115-
m_step_is_open = true;
113+
void ADsim::bindParentChild(CallListIterator_t parent, CallListIterator_t child){
114+
parent->inc_n_children();
115+
parent->update_exclusive(child->get_runtime());
116+
//Tell the child who it's parent is
117+
child->set_parent(parent->get_id());
116118
}
117119

118-
void ADsim::endStep(const unsigned long step_end_time){
120+
121+
void ADsim::step(const unsigned long step){
122+
unsigned long step_start_time = m_program_start + step * m_step_freq;
123+
unsigned long step_end_time = step_start_time + m_step_freq;
124+
125+
//Collect events in the timestep
126+
auto mse = m_step_execs.find(step);
127+
if(mse == m_step_execs.end()){
128+
std::cout << "Step " << step << " contains no executions" << std::endl;
129+
return;
130+
}
131+
std::list<CallListIterator_t> const& this_step_execs = mse->second;
132+
119133
//Collect outliers and 1 normal event/func into Anomalies object
134+
//and collect the map of function index to execs
120135
Anomalies anom;
136+
std::unordered_map<unsigned long, std::vector<CallListIterator_t> > this_step_func_execs;
137+
ADCounter counters;
138+
counters.linkCounterMap(getCidxManager().getCounterMap());
121139

122140
int nanom=0, nnorm=0;
123-
for(const auto &exec_it : m_step_exec_its){
124-
if(exec_it->get_exit() > step_end_time) fatal_error("Event " + exec_it->get_json(true, true).dump(4) + " has an exit time later than the end of the current step, " + std::to_string(step_end_time) );
125-
141+
for(const auto &exec_it : this_step_execs){
126142
if(exec_it->get_label() == -1){ anom.insert(exec_it, Anomalies::EventType::Outlier); nanom++; }
127143
else if(anom.nFuncEvents(exec_it->get_fid(), Anomalies::EventType::Normal) == 0){ anom.insert(exec_it, Anomalies::EventType::Normal); nnorm++; }
144+
145+
this_step_func_execs[exec_it->get_fid()].push_back(exec_it);
146+
147+
const std::deque<CounterData_t>& ect = exec_it->get_counters();
148+
for(const CounterData_t &c : ect)
149+
counters.addCounter(c);
128150
}
129151

130-
std::cout << "Rank " << m_rid << " " << nanom << " anomalies and " << nnorm << " normal events" << std::endl;
131-
std::cout << "TEST " << anom.nEvents(Anomalies::EventType::Outlier) << " " << anom.nEvents(Anomalies::EventType::Normal) << std::endl;
152+
//TODO: Normal events need to be processed correctly
153+
154+
std::cout << "Step " << step << " rank " << m_rid << " " << nanom << " anomalies and " << nnorm << " normal events" << std::endl;
132155

133156
//Extract provenance data and send to the provDB
134157
if(nanom > 0){
@@ -138,8 +161,8 @@ void ADsim::endStep(const unsigned long step_end_time){
138161
eventIDmap evmap(m_all_execs, m_eventid_map);
139162

140163
ADAnomalyProvenance::getProvenanceEntries(anomalous_events, normal_events, m_normal_events,
141-
anom, m_step, m_step_start_time, step_end_time, m_window_size,
142-
globalParams(), evmap, m_counters, m_metadata);
164+
anom, step, step_start_time, step_end_time, m_window_size,
165+
globalParams(), evmap, counters, m_metadata);
143166
//Send to provdb
144167
if(anomalous_events.size() > 0)
145168
m_pdb_client->sendMultipleData(anomalous_events, ProvenanceDataType::AnomalyData);
@@ -150,20 +173,22 @@ void ADsim::endStep(const unsigned long step_end_time){
150173

151174
//Update the pserver
152175
{
153-
ADLocalCounterStatistics count_stats(m_pid, m_step, nullptr);
154-
count_stats.gatherStatistics(m_counters.getCountersByIndex());
176+
ADLocalCounterStatistics count_stats(m_pid, step, nullptr);
177+
count_stats.gatherStatistics(counters.getCountersByIndex());
155178
getPserver().addCounterData(count_stats);
156179

157-
ADLocalFuncStatistics prof_stats(m_pid, m_rid, m_step);
158-
prof_stats.gatherStatistics(&m_step_func_exec_its); //takes map of fid -> vector of ExecData iterators
180+
ADLocalFuncStatistics prof_stats(m_pid, m_rid, step);
181+
prof_stats.gatherStatistics(&this_step_func_execs); //takes map of fid -> vector of ExecData iterators
159182
prof_stats.gatherAnomalies(anom);
160183
getPserver().addAnomalyData(prof_stats);
161184
}
185+
}
162186

163-
m_step_exec_its.clear();
164-
m_step_func_exec_its.clear();
165-
delete m_counters.flushCounters();
166187

167-
m_step++;
168-
m_step_is_open = false;
188+
size_t ADsim::nStepExecs(unsigned long step) const{
189+
auto mse = m_step_execs.find(step);
190+
if(mse == m_step_execs.end()){
191+
return 0;
192+
}else return mse->second.size();
169193
}
194+

0 commit comments

Comments
 (0)