Skip to content

Commit acc470f

Browse files
committed
Imported improvements from provdb_multiprovider_committer_shutdown_test branch:
ProvDB now supports Sonata bypass functionality for faster stores (default: on) ProvDB now supports Sonata aggregator backend (this backend is currently broken so default: off)
1 parent 7a65dff commit acc470f

1 file changed

Lines changed: 37 additions & 11 deletions

File tree

app/provdb_admin.cpp

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,14 @@ struct ProvdbArgs{
141141
bool db_in_mem; //database is in-memory not written to disk, for testing
142142
std::string db_base_config;
143143
std::string db_margo_config;
144+
145+
bool db_use_aggregator; /**< Use the aggregator backend*/
146+
int db_batch_size; /**< Batch size for "aggregator" backend*/
147+
148+
bool db_bypass_unqlite;
144149

145-
ProvdbArgs(): engine("ofi+tcp"), autoshutdown(true), server_instance(0), ninstances(1), nshards(1), db_type("unqlite"), db_commit_freq(10000), db_write_dir("."), db_in_mem(false), db_base_config(""), db_margo_config(""){}
150+
ProvdbArgs(): engine("ofi+tcp"), autoshutdown(true), server_instance(0), ninstances(1), nshards(1), db_type("unqlite"), db_commit_freq(10000), db_write_dir("."), db_in_mem(false), db_base_config(""), db_margo_config(""),
151+
db_use_aggregator(false), db_batch_size(64), db_bypass_unqlite(true){}
146152
};
147153

148154

@@ -170,6 +176,9 @@ int main(int argc, char** argv) {
170176
addOptionalCommandLineArg(parser, db_in_mem, "Use an in-memory database rather than writing to disk (*unqlite backend only*) (default false)");
171177
addOptionalCommandLineArg(parser, db_base_config, "Provide the *absolute path* to a JSON file to use as the base configuration of the Sonata databases. The database path will be appended automatically (default \"\" - not used)");
172178
addOptionalCommandLineArg(parser, db_margo_config, "Provide the *absolute path* to a JSON file containing the Margo configuration (default \"\" - not used)");
179+
addOptionalCommandLineArg(parser, db_use_aggregator, "Use the \"aggregator\" backend layer (default false)");
180+
addOptionalCommandLineArg(parser, db_batch_size, "Provide the batch size for the \"aggregator\" backend if in use (default 64)");
181+
addOptionalCommandLineArg(parser, db_bypass_unqlite, "Use Sonata's bypass method for faster unqlite stores (default true)");
173182
addOptionalCommandLineArgMultiValue(parser, server_instance, "Provide the index of the server instance and the total number of instances (if using more than 1) in the format \"$instance $ninstances\" (default \"0 1\")", server_instance, ninstances);
174183

175184
if(argc-1 < parser.nMandatoryArgs() || (argc == 2 && std::string(argv[1]) == "-help")){
@@ -205,12 +214,22 @@ int main(int argc, char** argv) {
205214
return 0;
206215
}
207216

208-
//Get Sonata config
217+
//Determine the backend
218+
std::string sonata_backend = args.db_use_aggregator ? "aggregator" : args.db_type;
219+
220+
//Get base Sonata backend configuration
209221
nlohmann::json base_config;
210222
if(args.db_base_config.size() > 0){
211223
std::ifstream in(args.db_base_config);
212224
if(in.fail()){ throw std::runtime_error("Failed to read db config file " + args.db_base_config); }
213225
base_config = nlohmann::json::parse(in);
226+
}else if(args.db_use_aggregator){
227+
base_config["batch_size"] = args.db_batch_size;
228+
base_config["backend"] = args.db_type; //backend of aggregator backend is still configurable!
229+
base_config["config"]["mutex"] = "global"; //as we have only 1 ES per DB we don't need a mutex
230+
}else{
231+
base_config["mutex"] = "global"; //as we have only 1 ES per DB we don't need a mutex
232+
if(args.db_bypass_unqlite) base_config["bypass"] = true;
214233
}
215234

216235
PSprogressStream << "initializing thallium with address: " << eng_opt << std::endl;
@@ -328,28 +347,35 @@ int main(int argc, char** argv) {
328347
{ //Scope in which admin object is active
329348
sonata::Admin admin(engine);
330349

350+
//Setup global databaase
331351
std::string glob_db_name = setup.getGlobalDBname();
332352
if(instance_do_global_db){
353+
std::string path = args.db_in_mem ? ":mem:" : stringize("%s/%s.unqlite", args.db_write_dir.c_str(), glob_db_name.c_str());
354+
333355
nlohmann::json glob_db_config = base_config;
334-
glob_db_config["path"] = args.db_in_mem ? ":mem:" : stringize("%s/%s.unqlite", args.db_write_dir.c_str(), glob_db_name.c_str());
335-
glob_db_config["mutex"] = "none"; //as we have only 1 ES per DB we don't need a mutex
356+
if(args.db_use_aggregator) glob_db_config["config"]["path"] = path;
357+
else glob_db_config["path"] = path;
336358

337-
PSprogressStream << "creating global data database: " << glob_db_name << " " << glob_db_config.dump() << " " << args.db_type << std::endl;
338-
admin.createDatabase(addr, setup.getGlobalDBproviderIndex(), glob_db_name, args.db_type, glob_db_config.dump());
359+
PSprogressStream << "creating global data database: " << glob_db_name << " " << glob_db_config.dump() << " " << sonata_backend << std::endl;
360+
admin.createDatabase(addr, setup.getGlobalDBproviderIndex(), glob_db_name, sonata_backend, glob_db_config.dump());
339361
}
340362

363+
//Setup database shards
341364
PSprogressStream << "creating " << nshard_instance << " database shards with index offset " << instance_shard_offset << std::endl;
342365

343366
std::vector<std::string> db_shard_names(nshard_instance);
344367
for(int s=0;s<nshard_instance;s++){
345368
int shard = instance_shard_offset + s;
346-
db_shard_names[s] = setup.getShardDBname(shard);
369+
db_shard_names[s] = setup.getShardDBname(shard);
370+
371+
std::string path = args.db_in_mem ? ":mem:" : stringize("%s/%s.unqlite", args.db_write_dir.c_str(), db_shard_names[s].c_str());
372+
347373
nlohmann::json config = base_config;
348-
config["path"] = args.db_in_mem ? ":mem:" : stringize("%s/%s.unqlite", args.db_write_dir.c_str(), db_shard_names[s].c_str());
349-
config["mutex"] = "none";
374+
if(args.db_use_aggregator) config["config"]["path"] = path;
375+
else config["path"] = path;
350376

351-
PSprogressStream << "shard " << shard << ": " << db_shard_names[s] << " " << config.dump() << " " << args.db_type << std::endl;
352-
admin.createDatabase(addr, shard_provider_indices[s], db_shard_names[s], args.db_type, config.dump());
377+
PSprogressStream << "shard " << shard << ": " << db_shard_names[s] << " " << config.dump() << " " << sonata_backend << std::endl;
378+
admin.createDatabase(addr, shard_provider_indices[s], db_shard_names[s], sonata_backend, config.dump());
353379
}
354380

355381
//Create the collections

0 commit comments

Comments
 (0)