Skip to content

Commit 673d8e4

Browse files
committed
Added an application to reconstruct the unqlite provDB from ASCII output produced when the provDB component is deactivated
1 parent 9f629f3 commit 673d8e4

2 files changed

Lines changed: 349 additions & 1 deletion

File tree

app/Makefile.am

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ bpfile_replay_LDADD = $(LDADD)
3232

3333

3434
if ENABLE_PROVDB
35-
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown
35+
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown provdb_rebuild_ascii
3636

3737
provdb_admin_SOURCES = provdb_admin.cpp
3838
provdb_admin_LDADD = $(LDADD)
@@ -43,6 +43,9 @@ provdb_query_LDADD = $(LDADD)
4343
provdb_shutdown_SOURCES = provdb_shutdown.cpp
4444
provdb_shutdown_LDADD = $(LDADD)
4545

46+
provdb_rebuild_ascii_SOURCES = provdb_rebuild_ascii.cpp
47+
provdb_rebuild_ascii_LDADD = $(LDADD)
48+
4649
else
4750
echo "Provenance DB not being built"
4851
endif

app/provdb_rebuild_ascii.cpp

Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
//A tool for rebuilding the unqlite databases from JSON dumps produced when Chimbuko is run without the provenance database component
2+
#include <chimbuko_config.h>
3+
#ifndef ENABLE_PROVDB
4+
#error "Provenance DB build is not enabled"
5+
#endif
6+
7+
#include<nlohmann/json.hpp>
8+
#include<chimbuko/verbose.hpp>
9+
#include<chimbuko/util/string.hpp>
10+
#include<chimbuko/util/error.hpp>
11+
#include<chimbuko/ad/ADProvenanceDBclient.hpp>
12+
#include<chimbuko/pserver/PSProvenanceDBclient.hpp>
13+
#include <sonata/Admin.hpp>
14+
#include <sonata/Provider.hpp>
15+
#include<sstream>
16+
#include<memory>
17+
#include<dirent.h>
18+
#include <sys/types.h>
19+
#include <sys/stat.h>
20+
#include <unistd.h>
21+
#include <regex>
22+
23+
using namespace chimbuko;
24+
25+
void printUsageAndExit(){
26+
std::cout << "Usage: provdb_rebuild_ascii <options> <database> <database rebuild args...>\n"
27+
<< "Where <database> is either 'global' or 'anomalies'\n"
28+
<< "'global' mode expects arguments: <global func stats filename> <global counter stats filename>\n"
29+
<< "'anomalies' mode expects arguments: <number of shards to generate> <pid dir 1> <pid dir 2>....\n"
30+
<< " where <pid dir $i> is the path of the directory $i into which is written the ascii data for pid $i, eg /path/to/0 for pid=0\n"
31+
<< std::endl;
32+
exit(0);
33+
}
34+
35+
bool isDirectory(const std::string &path) {
36+
struct stat statbuf;
37+
if (stat(path.c_str(), &statbuf) != 0)
38+
return 0;
39+
return S_ISDIR(statbuf.st_mode);
40+
}
41+
bool isFile(const std::string &path){
42+
struct stat path_stat;
43+
stat(path.c_str(), &path_stat);
44+
return S_ISREG(path_stat.st_mode);
45+
}
46+
47+
bool fileMatchesRegex(const std::string &filename, const std::string &regex){
48+
std::regex re(regex);
49+
return std::regex_search(filename, re);
50+
}
51+
52+
std::vector<std::string> listDirsMatching(const std::string &path, const std::string &regex){
53+
DIR *dir; struct dirent *diread;
54+
std::vector<std::string> files;
55+
56+
if( (dir = opendir(path.c_str()) ) != nullptr){
57+
while( (diread = readdir(dir) ) != nullptr ){
58+
std::string fname = path + "/" + diread->d_name;
59+
verboseStream << "Got filename " << fname << std::endl;
60+
if(isDirectory(fname)){
61+
verboseStream << "Path is directory" << std::endl;
62+
if(fileMatchesRegex(fname,regex)){
63+
verboseStream << "Path matches regex" << std::endl;
64+
files.push_back(fname);
65+
}else{
66+
verboseStream << "Path *doesn't* match regex" << std::endl;
67+
}
68+
}else{
69+
verboseStream << "Path is *not* a directory" << std::endl;
70+
}
71+
}
72+
closedir(dir);
73+
}
74+
return files;
75+
}
76+
77+
78+
std::vector<std::string> listFilesInDir(const std::string &path){
79+
DIR *dir; struct dirent *diread;
80+
std::vector<std::string> files;
81+
82+
if( (dir = opendir(path.c_str()) ) != nullptr){
83+
while( (diread = readdir(dir) ) != nullptr ){
84+
std::string fname = path + "/" + diread->d_name;
85+
verboseStream << "Got filename " << fname << std::endl;
86+
if(isFile(fname)){
87+
verboseStream << "Path is file" << std::endl;
88+
files.push_back(fname);
89+
}else{
90+
verboseStream << "Path is *not* file" << std::endl;
91+
}
92+
}
93+
closedir(dir);
94+
}
95+
return files;
96+
}
97+
98+
void storeJSONarrayIntoCollection(sonata::Collection &col, const nlohmann::json &json, const std::string &descr){
99+
if(! json.is_array()) throw std::runtime_error("JSON object must be an array");
100+
size_t size = json.size();
101+
102+
if(size > 0){
103+
std::vector<uint64_t> ids(size,-1);
104+
std::vector<std::string> dump(size);
105+
for(int i=0;i<size;i++){
106+
if(!json[i].is_object()) throw std::runtime_error("Array entries must be JSON objects");
107+
dump[i] = json[i].dump();
108+
}
109+
col.store_multi(dump, ids.data());
110+
progressStream << "Stored " << size << " entries into collection " << descr << std::endl;
111+
}
112+
}
113+
114+
115+
int rebuildGlobalDB(int nargs, char** args){
116+
if(nargs < 2)
117+
fatal_error("Instruction arguments expect: <global func stats filename> <global counter stats filename>");
118+
119+
std::ifstream global_func_stats_strm(args[0]);
120+
std::ifstream global_counter_stats_strm(args[1]);
121+
122+
ADProvenanceDBengine::setProtocol("na+sm", THALLIUM_SERVER_MODE);
123+
auto &engine = ADProvenanceDBengine::getEngine();
124+
125+
sonata::Provider *provider;
126+
sonata::Admin *admin;
127+
sonata::Client *client;
128+
129+
provider = new sonata::Provider(engine, 0);
130+
admin = new sonata::Admin(engine);
131+
client = new sonata::Client(engine);
132+
133+
std::string addr = (std::string)engine.self();
134+
135+
std::string glob_db_name = "provdb.global";
136+
std::string glob_db_config = stringize("{ \"path\" : \"./%s.unqlite\" }", glob_db_name.c_str());
137+
progressStream << "Creating global database" << std::endl;
138+
admin->createDatabase(addr, 0, glob_db_name, "unqlite", glob_db_config);
139+
140+
progressStream << "Opening global database" << std::endl;
141+
sonata::Database glob_db = client->open(addr, 0, glob_db_name);
142+
143+
{
144+
progressStream << "Creating collections" << std::endl;
145+
sonata::Collection glob_db_func_stats = glob_db.create("func_stats");
146+
sonata::Collection glob_db_counter_stats = glob_db.create("counter_stats");
147+
148+
progressStream << "Parsing JSON files" << std::endl;
149+
nlohmann::json global_func_stats_json, global_counter_stats_json;
150+
global_func_stats_strm >> global_func_stats_json;
151+
global_counter_stats_strm >> global_counter_stats_json;
152+
153+
progressStream << "Storing records" << std::endl;
154+
storeJSONarrayIntoCollection(glob_db_func_stats, global_func_stats_json, "global_func_stats");
155+
storeJSONarrayIntoCollection(glob_db_counter_stats, global_counter_stats_json, "global_counter_stats");
156+
}
157+
158+
delete client;
159+
delete admin;
160+
delete provider;
161+
progressStream << "Done" << std::endl;
162+
}
163+
164+
165+
struct stepData{
166+
std::string anomalies;
167+
std::string normalexecs;
168+
std::string metadata;
169+
int step;
170+
171+
stepData(): step(-1), anomalies(""), normalexecs(""), metadata(""){}
172+
};
173+
174+
std::ostream & operator<<(std::ostream &os, const stepData &s){
175+
os << "(" << s.step << ", " << s.anomalies << ", " << s.normalexecs << ", " << s.metadata << ")";
176+
return os;
177+
}
178+
179+
180+
void addFile(std::map<int, stepData> &into, const std::string &filename){
181+
std::regex r(R"((\d+)\.(anomalies|normalexecs|metadata)\.json)");
182+
std::smatch m;
183+
if(std::regex_search(filename, m,r)){
184+
int step = std::stoi(m[1]);
185+
stepData &sd = into[step];
186+
sd.step = step;
187+
188+
if(m[2] == "anomalies")
189+
sd.anomalies = filename;
190+
else if(m[2] == "normalexecs")
191+
sd.normalexecs = filename;
192+
else if(m[2] == "metadata")
193+
sd.metadata = filename;
194+
else{
195+
fatal_error("Type string failure" + filename);
196+
}
197+
}
198+
}
199+
200+
int getRank(const std::string &path){
201+
std::regex r(R"(\/(\d+)$)");
202+
std::smatch m;
203+
if(std::regex_search(path, m,r)){
204+
return std::stoi(m[1]);
205+
}else{
206+
fatal_error("Rank match failure " + path);
207+
}
208+
}
209+
210+
struct shardDB{
211+
sonata::Database shard;
212+
sonata::Collection anomalies;
213+
sonata::Collection normalexecs;
214+
sonata::Collection metadata;
215+
216+
void open(sonata::Client &client, const std::string &addr, const std::string &db_name){
217+
shard = client.open(addr, 0, db_name);
218+
anomalies = shard.create("anomalies");
219+
normalexecs = shard.create("normalexecs");
220+
metadata = shard.create("metadata");
221+
}
222+
223+
void store(const stepData &sd){
224+
sonata::Collection* cols[3] = { &anomalies, &normalexecs, &metadata };
225+
std::string descr[3] = { "anomalies", "normalexecs", "metadata" };
226+
std::string paths[3] = { sd.anomalies, sd.normalexecs, sd.metadata };
227+
228+
for(int i=0;i<3;i++){
229+
if(paths[i].size()){
230+
std::ifstream in(paths[i]);
231+
nlohmann::json json;
232+
in >> json;
233+
storeJSONarrayIntoCollection(*cols[i], json, descr[i]);
234+
}
235+
}
236+
}
237+
};
238+
239+
int rebuildProvDB(int nargs, char** args){
240+
if(nargs < 2)
241+
fatal_error("Instruction arguments expect: <number of shards to generate> <pid dir 1> <pid dir 2>....");
242+
243+
int nshards = std::stoi(args[0]);
244+
245+
ADProvenanceDBengine::setProtocol("na+sm", THALLIUM_SERVER_MODE);
246+
auto &engine = ADProvenanceDBengine::getEngine();
247+
248+
sonata::Provider *provider;
249+
sonata::Admin *admin;
250+
sonata::Client *client;
251+
252+
provider = new sonata::Provider(engine, 0);
253+
admin = new sonata::Admin(engine);
254+
client = new sonata::Client(engine);
255+
256+
std::string addr = (std::string)engine.self();
257+
258+
progressStream << "Creating database shards (nshards = " << nshards << ")" << std::endl;
259+
std::vector<std::string> db_shard_names(nshards);
260+
for(int s=0;s<nshards;s++){
261+
std::string db_name = stringize("provdb.%d",s);
262+
std::string config = stringize("{ \"path\" : \"%s/%s.unqlite\" }", ".", db_name.c_str());
263+
progressStream << "Shard " << s << ": " << db_name << " " << config << std::endl;
264+
admin->createDatabase(addr, 0, db_name, "unqlite", config);
265+
db_shard_names[s] = db_name;
266+
}
267+
268+
{
269+
progressStream << "Creating collections" << std::endl;
270+
std::vector<shardDB> shards(nshards);
271+
for(int s=0;s<nshards;s++)
272+
shards[s].open(*client, addr, db_shard_names[s]);
273+
274+
//Drill down through the directories and add the data
275+
progressStream << "Starting loop over program_idx directories" << std::endl;
276+
int npid = nargs-1;
277+
for(int pid_idx=0;pid_idx<npid;pid_idx++){
278+
std::string pid_path = args[1+pid_idx];
279+
280+
//Get rank subdirectories
281+
std::vector<std::string> rank_dirs = listDirsMatching(pid_path, R"(.*\/\d+$)");
282+
progressStream << "In path " << pid_path << " found directories corresponding to " << rank_dirs.size() << " ranks" << std::endl;
283+
284+
//Sort by rank
285+
std::sort(rank_dirs.begin(), rank_dirs.end(), [&](const std::string &a, const std::string &b){ return getRank(a) < getRank(b); });
286+
287+
for(int i=0;i<rank_dirs.size();i++) verboseStream << rank_dirs[i] << std::endl;
288+
289+
//Compile step data for each rank
290+
for(auto const &dir : rank_dirs){
291+
int rank = getRank(dir);
292+
int shard = rank % nshards; //put data into appropriate shard
293+
294+
std::vector<std::string> dir_files = listFilesInDir(dir);
295+
verboseStream << "In directory " << dir << " found files: " << std::endl;
296+
for(auto const &f : dir_files){
297+
verboseStream << f << std::endl;
298+
}
299+
300+
std::map<int, stepData> step_data;
301+
for(auto const &f : dir_files)
302+
addFile(step_data, f);
303+
304+
for(auto const &s : step_data){
305+
verboseStream << s.first << " " << s.second << std::endl;
306+
progressStream << " Rank " << rank << " Step " << s.first << std::endl;
307+
shards[shard].store(s.second);
308+
}
309+
}
310+
}
311+
312+
}
313+
delete client;
314+
delete admin;
315+
delete provider;
316+
317+
progressStream << "Done" << std::endl;
318+
}
319+
320+
321+
322+
323+
324+
int main(int argc, char** argv){
325+
if(argc < 2) printUsageAndExit();
326+
327+
//Parse environment variables
328+
if(const char* env_p = std::getenv("CHIMBUKO_VERBOSE")){
329+
std::cout << "Pserver: Enabling verbose debug output" << std::endl;
330+
enableVerboseLogging() = true;
331+
}
332+
333+
int arg_offset = 1;
334+
std::string mode = argv[arg_offset++];
335+
int nargs = argc-arg_offset;
336+
337+
if(mode == "global"){
338+
return rebuildGlobalDB(nargs, argv+arg_offset);
339+
}else if(mode == "anomalies"){
340+
return rebuildProvDB(nargs, argv+arg_offset);
341+
}else{
342+
fatal_error("Unknown mode " + mode);
343+
}
344+
return 0;
345+
}

0 commit comments

Comments
 (0)