Skip to content

Commit 908a737

Browse files
committed
merged with ckelly_develop
2 parents 1494de1 + f24388d commit 908a737

9 files changed

Lines changed: 628 additions & 70 deletions

File tree

app/Makefile.am

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ bpfile_replay_LDADD = $(LDADD)
3232

3333

3434
if ENABLE_PROVDB
35-
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown
35+
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown provdb_rebuild_ascii
3636

3737
provdb_admin_SOURCES = provdb_admin.cpp
3838
provdb_admin_LDADD = $(LDADD)
@@ -43,6 +43,9 @@ provdb_query_LDADD = $(LDADD)
4343
provdb_shutdown_SOURCES = provdb_shutdown.cpp
4444
provdb_shutdown_LDADD = $(LDADD)
4545

46+
provdb_rebuild_ascii_SOURCES = provdb_rebuild_ascii.cpp
47+
provdb_rebuild_ascii_LDADD = $(LDADD)
48+
4649
else
4750
echo "Provenance DB not being built"
4851
endif

app/provdb_rebuild_ascii.cpp

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
//A tool for rebuilding the unqlite databases from JSON dumps produced when Chimbuko is run without the provenance database component
2+
#include <chimbuko_config.h>
3+
#ifndef ENABLE_PROVDB
4+
#error "Provenance DB build is not enabled"
5+
#endif
6+
7+
#include<nlohmann/json.hpp>
8+
#include<chimbuko/verbose.hpp>
9+
#include<chimbuko/util/string.hpp>
10+
#include<chimbuko/util/error.hpp>
11+
#include<chimbuko/ad/ADProvenanceDBclient.hpp>
12+
#include<chimbuko/pserver/PSProvenanceDBclient.hpp>
13+
#include <sonata/Admin.hpp>
14+
#include <sonata/Provider.hpp>
15+
#include<sstream>
16+
#include<memory>
17+
#include<dirent.h>
18+
#include <sys/types.h>
19+
#include <sys/stat.h>
20+
#include <unistd.h>
21+
#include <regex>
22+
23+
using namespace chimbuko;
24+
25+
void printUsageAndExit(){
26+
std::cout << "Usage: provdb_rebuild_ascii <options> <database> <database rebuild args...>\n"
27+
<< "Where <database> is either 'global' or 'anomalies'\n"
28+
<< "'global' mode expects arguments: <global func stats filename> <global counter stats filename>\n"
29+
<< "'anomalies' mode expects arguments: <number of shards to generate> <pid dir 1> <pid dir 2>....\n"
30+
<< " where <pid dir $i> is the path of the directory $i into which is written the ascii data for pid $i, eg /path/to/0 for pid=0\n"
31+
<< std::endl;
32+
exit(0);
33+
}
34+
35+
bool isDirectory(const std::string &path) {
36+
struct stat statbuf;
37+
if (stat(path.c_str(), &statbuf) != 0)
38+
return 0;
39+
return S_ISDIR(statbuf.st_mode);
40+
}
41+
bool isFile(const std::string &path){
42+
struct stat path_stat;
43+
stat(path.c_str(), &path_stat);
44+
return S_ISREG(path_stat.st_mode);
45+
}
46+
47+
bool fileMatchesRegex(const std::string &filename, const std::string &regex){
48+
std::regex re(regex);
49+
return std::regex_search(filename, re);
50+
}
51+
52+
std::vector<std::string> listDirsMatching(const std::string &path, const std::string &regex){
53+
DIR *dir; struct dirent *diread;
54+
std::vector<std::string> files;
55+
56+
if( (dir = opendir(path.c_str()) ) != nullptr){
57+
while( (diread = readdir(dir) ) != nullptr ){
58+
std::string fname = path + "/" + diread->d_name;
59+
verboseStream << "Got filename " << fname << std::endl;
60+
if(isDirectory(fname)){
61+
verboseStream << "Path is directory" << std::endl;
62+
if(fileMatchesRegex(fname,regex)){
63+
verboseStream << "Path matches regex" << std::endl;
64+
files.push_back(fname);
65+
}else{
66+
verboseStream << "Path *doesn't* match regex" << std::endl;
67+
}
68+
}else{
69+
verboseStream << "Path is *not* a directory" << std::endl;
70+
}
71+
}
72+
closedir(dir);
73+
}
74+
return files;
75+
}
76+
77+
78+
std::vector<std::string> listFilesInDir(const std::string &path){
79+
DIR *dir; struct dirent *diread;
80+
std::vector<std::string> files;
81+
82+
if( (dir = opendir(path.c_str()) ) != nullptr){
83+
while( (diread = readdir(dir) ) != nullptr ){
84+
std::string fname = path + "/" + diread->d_name;
85+
verboseStream << "Got filename " << fname << std::endl;
86+
if(isFile(fname)){
87+
verboseStream << "Path is file" << std::endl;
88+
files.push_back(fname);
89+
}else{
90+
verboseStream << "Path is *not* file" << std::endl;
91+
}
92+
}
93+
closedir(dir);
94+
}
95+
return files;
96+
}
97+
98+
void storeJSONarrayIntoCollection(sonata::Collection &col, const nlohmann::json &json, const std::string &descr){
99+
if(! json.is_array()) throw std::runtime_error("JSON object must be an array");
100+
size_t size = json.size();
101+
102+
if(size > 0){
103+
std::vector<uint64_t> ids(size,-1);
104+
std::vector<std::string> dump(size);
105+
for(int i=0;i<size;i++){
106+
if(!json[i].is_object()) throw std::runtime_error("Array entries must be JSON objects");
107+
dump[i] = json[i].dump();
108+
}
109+
col.store_multi(dump, ids.data());
110+
progressStream << "Stored " << size << " entries into collection " << descr << std::endl;
111+
}
112+
}
113+
114+
115+
int rebuildGlobalDB(int nargs, char** args){
116+
if(nargs < 2)
117+
fatal_error("Instruction arguments expect: <global func stats filename> <global counter stats filename>");
118+
119+
std::ifstream global_func_stats_strm(args[0]);
120+
std::ifstream global_counter_stats_strm(args[1]);
121+
122+
ADProvenanceDBengine::setProtocol("na+sm", THALLIUM_SERVER_MODE);
123+
auto &engine = ADProvenanceDBengine::getEngine();
124+
125+
sonata::Provider *provider;
126+
sonata::Admin *admin;
127+
sonata::Client *client;
128+
129+
provider = new sonata::Provider(engine, 0);
130+
admin = new sonata::Admin(engine);
131+
client = new sonata::Client(engine);
132+
133+
std::string addr = (std::string)engine.self();
134+
135+
std::string glob_db_name = "provdb.global";
136+
std::string glob_db_config = stringize("{ \"path\" : \"./%s.unqlite\" }", glob_db_name.c_str());
137+
progressStream << "Creating global database" << std::endl;
138+
admin->createDatabase(addr, 0, glob_db_name, "unqlite", glob_db_config);
139+
140+
progressStream << "Opening global database" << std::endl;
141+
sonata::Database glob_db = client->open(addr, 0, glob_db_name);
142+
143+
{
144+
progressStream << "Creating collections" << std::endl;
145+
sonata::Collection glob_db_func_stats = glob_db.create("func_stats");
146+
sonata::Collection glob_db_counter_stats = glob_db.create("counter_stats");
147+
148+
progressStream << "Parsing JSON files" << std::endl;
149+
nlohmann::json global_func_stats_json, global_counter_stats_json;
150+
global_func_stats_strm >> global_func_stats_json;
151+
global_counter_stats_strm >> global_counter_stats_json;
152+
153+
progressStream << "Storing records" << std::endl;
154+
storeJSONarrayIntoCollection(glob_db_func_stats, global_func_stats_json, "global_func_stats");
155+
storeJSONarrayIntoCollection(glob_db_counter_stats, global_counter_stats_json, "global_counter_stats");
156+
}
157+
158+
delete client;
159+
delete admin;
160+
delete provider;
161+
progressStream << "Done" << std::endl;
162+
return 0;
163+
}
164+
165+
166+
struct stepData{
167+
std::string anomalies;
168+
std::string normalexecs;
169+
std::string metadata;
170+
int step;
171+
172+
stepData(): step(-1), anomalies(""), normalexecs(""), metadata(""){}
173+
};
174+
175+
std::ostream & operator<<(std::ostream &os, const stepData &s){
176+
os << "(" << s.step << ", " << s.anomalies << ", " << s.normalexecs << ", " << s.metadata << ")";
177+
return os;
178+
}
179+
180+
181+
void addFile(std::map<int, stepData> &into, const std::string &filename){
182+
std::regex r(R"((\d+)\.(anomalies|normalexecs|metadata)\.json)");
183+
std::smatch m;
184+
if(std::regex_search(filename, m,r)){
185+
int step = std::stoi(m[1]);
186+
stepData &sd = into[step];
187+
sd.step = step;
188+
189+
if(m[2] == "anomalies")
190+
sd.anomalies = filename;
191+
else if(m[2] == "normalexecs")
192+
sd.normalexecs = filename;
193+
else if(m[2] == "metadata")
194+
sd.metadata = filename;
195+
else{
196+
fatal_error("Type string failure" + filename);
197+
}
198+
}
199+
}
200+
201+
int getRank(const std::string &path){
202+
std::regex r(R"(\/(\d+)$)");
203+
std::smatch m;
204+
if(std::regex_search(path, m,r)){
205+
return std::stoi(m[1]);
206+
}else{
207+
fatal_error("Rank match failure " + path);
208+
}
209+
return -1;
210+
}
211+
212+
struct shardDB{
213+
sonata::Database shard;
214+
sonata::Collection anomalies;
215+
sonata::Collection normalexecs;
216+
sonata::Collection metadata;
217+
218+
void open(sonata::Client &client, const std::string &addr, const std::string &db_name){
219+
shard = client.open(addr, 0, db_name);
220+
anomalies = shard.create("anomalies");
221+
normalexecs = shard.create("normalexecs");
222+
metadata = shard.create("metadata");
223+
}
224+
225+
void store(const stepData &sd){
226+
sonata::Collection* cols[3] = { &anomalies, &normalexecs, &metadata };
227+
std::string descr[3] = { "anomalies", "normalexecs", "metadata" };
228+
std::string paths[3] = { sd.anomalies, sd.normalexecs, sd.metadata };
229+
230+
for(int i=0;i<3;i++){
231+
if(paths[i].size()){
232+
std::ifstream in(paths[i]);
233+
nlohmann::json json;
234+
in >> json;
235+
storeJSONarrayIntoCollection(*cols[i], json, descr[i]);
236+
}
237+
}
238+
}
239+
};
240+
241+
int rebuildProvDB(int nargs, char** args){
242+
if(nargs < 2)
243+
fatal_error("Instruction arguments expect: <number of shards to generate> <pid dir 1> <pid dir 2>....");
244+
245+
int nshards = std::stoi(args[0]);
246+
247+
ADProvenanceDBengine::setProtocol("na+sm", THALLIUM_SERVER_MODE);
248+
auto &engine = ADProvenanceDBengine::getEngine();
249+
250+
sonata::Provider *provider;
251+
sonata::Admin *admin;
252+
sonata::Client *client;
253+
254+
provider = new sonata::Provider(engine, 0);
255+
admin = new sonata::Admin(engine);
256+
client = new sonata::Client(engine);
257+
258+
std::string addr = (std::string)engine.self();
259+
260+
progressStream << "Creating database shards (nshards = " << nshards << ")" << std::endl;
261+
std::vector<std::string> db_shard_names(nshards);
262+
for(int s=0;s<nshards;s++){
263+
std::string db_name = stringize("provdb.%d",s);
264+
std::string config = stringize("{ \"path\" : \"%s/%s.unqlite\" }", ".", db_name.c_str());
265+
progressStream << "Shard " << s << ": " << db_name << " " << config << std::endl;
266+
admin->createDatabase(addr, 0, db_name, "unqlite", config);
267+
db_shard_names[s] = db_name;
268+
}
269+
270+
{
271+
progressStream << "Creating collections" << std::endl;
272+
std::vector<shardDB> shards(nshards);
273+
for(int s=0;s<nshards;s++)
274+
shards[s].open(*client, addr, db_shard_names[s]);
275+
276+
//Drill down through the directories and add the data
277+
progressStream << "Starting loop over program_idx directories" << std::endl;
278+
int npid = nargs-1;
279+
for(int pid_idx=0;pid_idx<npid;pid_idx++){
280+
std::string pid_path = args[1+pid_idx];
281+
282+
//Get rank subdirectories
283+
std::vector<std::string> rank_dirs = listDirsMatching(pid_path, R"(.*\/\d+$)");
284+
progressStream << "In path " << pid_path << " found directories corresponding to " << rank_dirs.size() << " ranks" << std::endl;
285+
286+
//Sort by rank
287+
std::sort(rank_dirs.begin(), rank_dirs.end(), [&](const std::string &a, const std::string &b){ return getRank(a) < getRank(b); });
288+
289+
for(int i=0;i<rank_dirs.size();i++) verboseStream << rank_dirs[i] << std::endl;
290+
291+
//Compile step data for each rank
292+
for(auto const &dir : rank_dirs){
293+
int rank = getRank(dir);
294+
int shard = rank % nshards; //put data into appropriate shard
295+
296+
std::vector<std::string> dir_files = listFilesInDir(dir);
297+
verboseStream << "In directory " << dir << " found files: " << std::endl;
298+
for(auto const &f : dir_files){
299+
verboseStream << f << std::endl;
300+
}
301+
302+
std::map<int, stepData> step_data;
303+
for(auto const &f : dir_files)
304+
addFile(step_data, f);
305+
306+
for(auto const &s : step_data){
307+
verboseStream << s.first << " " << s.second << std::endl;
308+
progressStream << " Rank " << rank << " Step " << s.first << std::endl;
309+
shards[shard].store(s.second);
310+
}
311+
}
312+
}
313+
314+
}
315+
delete client;
316+
delete admin;
317+
delete provider;
318+
319+
progressStream << "Done" << std::endl;
320+
return 0;
321+
}
322+
323+
324+
325+
326+
327+
int main(int argc, char** argv){
328+
if(argc < 2) printUsageAndExit();
329+
330+
//Parse environment variables
331+
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE")){
332+
std::cout << "Pserver: Enabling verbose debug output" << std::endl;
333+
enableVerboseLogging() = true;
334+
}
335+
336+
int arg_offset = 1;
337+
std::string mode = argv[arg_offset++];
338+
int nargs = argc-arg_offset;
339+
340+
if(mode == "global"){
341+
return rebuildGlobalDB(nargs, argv+arg_offset);
342+
}else if(mode == "anomalies"){
343+
return rebuildProvDB(nargs, argv+arg_offset);
344+
}else{
345+
fatal_error("Unknown mode " + mode);
346+
}
347+
return 0;
348+
}

0 commit comments

Comments
 (0)