66#endif
77#include " chimbuko/util/commandLineParser.hpp"
88#include " chimbuko/util/string.hpp"
9+ #include " chimbuko/util/time.hpp"
910#include < iostream>
1011#include < sonata/Admin.hpp>
1112#include < sonata/Provider.hpp>
1617#include < iostream>
1718#include < csignal>
1819#include < cassert>
20+ #include < nlohmann/json.hpp>
1921
2022#include < spdlog/spdlog.h>
2123#include " chimbuko/verbose.hpp"
22-
24+ # include < thallium/serialization/stl/string.hpp >
2325
2426namespace tl = thallium;
2527
@@ -38,6 +40,9 @@ bool a_client_has_connected = false; //has an AD rank previously connected?
3840bool pserver_connected = false ; // is the pserver connected
3941bool pserver_has_connected = false ; // did the pserver ever connect
4042
43+ bool committer_connected = false ; // is the committer connected?
44+ bool committer_has_connected = false ; // did the committer ever connect?
45+
4146// Allows a client to register with the provider
4247void client_hello (const tl::request& req, const int rank) {
4348 std::lock_guard<tl::mutex> lock (*mtx);
@@ -66,6 +71,33 @@ void pserver_goodbye(const tl::request& req) {
6671 progressStream << " ProvDB Admin: Pserver has said goodbye" << std::endl;
6772}
6873
74+ // Allows the committer to register with the provider
75+ void committer_hello (const tl::request& req) {
76+ std::lock_guard<tl::mutex> lock (*mtx);
77+ committer_connected = true ;
78+ committer_has_connected = true ;
79+ progressStream << " ProvDB Admin: Committer has said hello" << std::endl;
80+ }
81+ // Allows the committer to deregister from the provider
82+ void committer_goodbye (const tl::request& req) {
83+ std::lock_guard<tl::mutex> lock (*mtx);
84+ committer_connected = false ;
85+ progressStream << " ProvDB Admin: Committer has said goodbye" << std::endl;
86+ }
87+
88+ // Get the connection status as a string of format
89+ // "<current clients> <a client has connected (0/1)> <pserver connected (0/1)> <pserver has connected (0/1)> <committer connected (0/1)> <commiter has connected (0/1)>"
90+ void connection_status (const tl::request& req) {
91+ std::lock_guard<tl::mutex> lock (*mtx);
92+ std::stringstream ss;
93+ ss << connected.size () << " " << int (a_client_has_connected) << " "
94+ << int (pserver_connected) << " " << int (pserver_has_connected) << " "
95+ << int (committer_connected) << " " << int (committer_has_connected);
96+ req.respond (ss.str ());
97+ }
98+
99+
100+
69101bool cmd_shutdown = false ; // true if a client has requested that the server shut down
70102
71103void client_stop_rpc (const tl::request& req) {
@@ -74,6 +106,18 @@ void client_stop_rpc(const tl::request& req) {
74106 progressStream << " ProvDB Admin: Received shutdown request from client" << std::endl;
75107}
76108
109+ margo_instance_id margo_id;
110+
111+ void margo_dump (const std::string &stub){
112+ std::string fn = stub + " ." + getDateTimeFileExt ();
113+ progressStream << " ProvDB Admin: margo dump to " << fn << std::endl;
114+ margo_state_dump (margo_id, fn.c_str (), 0 , nullptr );
115+ }
116+
117+ void margo_dump_rpc (const tl::request& req) {
118+ const static std::string stub (" margo_dump" );
119+ margo_dump (stub);
120+ }
77121
78122
79123
@@ -87,8 +131,11 @@ struct ProvdbArgs{
87131 std::string db_type;
88132 unsigned long db_commit_freq;
89133 std::string db_write_dir;
90-
91- ProvdbArgs (): engine(" ofi+tcp" ), autoshutdown(true ), nshards(1 ), db_type(" unqlite" ), nthreads(1 ), db_commit_freq(10000 ), db_write_dir(" ." ){}
134+ bool db_in_mem; // database is in-memory not written to disk, for testing
135+ std::string db_base_config;
136+ std::string db_margo_config;
137+
138+ ProvdbArgs (): engine(" ofi+tcp" ), autoshutdown(true ), nshards(1 ), db_type(" unqlite" ), nthreads(1 ), db_commit_freq(10000 ), db_write_dir(" ." ), db_in_mem(false ), db_base_config(" " ), db_margo_config(" " ){}
92139};
93140
94141
@@ -112,8 +159,11 @@ int main(int argc, char** argv) {
112159 addOptionalCommandLineArg (parser, nshards, " Specify the number of database shards (default 1)" );
113160 addOptionalCommandLineArg (parser, nthreads, " Specify the number of RPC handler threads (default 1)" );
114161 addOptionalCommandLineArg (parser, db_type, " Specify the Sonata database type (default \" unqlite\" )" );
115- addOptionalCommandLineArg (parser, db_commit_freq, " Specify the frequency at which the database flushes to disk in ms (default 10000)" );
162+ addOptionalCommandLineArg (parser, db_commit_freq, " Specify the frequency at which the database flushes to disk in ms (default 10000). 0 disables the flush until the end. " );
116163 addOptionalCommandLineArg (parser, db_write_dir, " Specify the directory in which the database shards will be written (default \" .\" )" );
164+ addOptionalCommandLineArg (parser, db_in_mem, " Use an in-memory database rather than writing to disk (*unqlite backend only*) (default false)" );
165+ addOptionalCommandLineArg (parser, db_base_config, " Provide the *absolute path* to a JSON file to use as the base configuration of the Sonata databases. The database path will be appended automatically (default \"\" - not used)" );
166+ addOptionalCommandLineArg (parser, db_margo_config, " Provide the *absolute path* to a JSON file containing the Margo configuration (default \"\" - not used)" );
117167
118168 if (argc-1 < parser.nMandatoryArgs () || (argc == 2 && std::string (argv[1 ]) == " -help" )){
119169 parser.help (std::cout);
@@ -124,16 +174,38 @@ int main(int argc, char** argv) {
124174 parser.parseCmdLineArgs (args, argc, argv);
125175
126176 if (args.nshards < 1 ) throw std::runtime_error (" Must have at least 1 database shard" );
177+ if (args.db_in_mem && args.db_type != " unqlite" ) throw std::runtime_error (" -db_in_mem option not valid for backends other than unqlite" );
178+
179+ if (args.db_in_mem ){ progressStream << " Using in-memory database" << std::endl; }
127180
128181 std::string eng_opt = args.engine ;
129182 if (args.ip .size () > 0 ){
130183 eng_opt += std::string (" ://" ) + args.ip ;
131184 }
132185
186+ // Get Sonata config
187+ nlohmann::json base_config;
188+ if (args.db_base_config .size () > 0 ){
189+ std::ifstream in (args.db_base_config );
190+ if (in.fail ()){ throw std::runtime_error (" Failed to read db config file " + args.db_base_config ); }
191+ base_config = nlohmann::json::parse (in);
192+ }
193+
133194 progressStream << " ProvDB Admin: initializing thallium with address: " << eng_opt << std::endl;
134195
196+ // Get Margo config
197+ std::string margo_config = stringize (" { \" rpc_thread_count\" : %d, \" use_progress_thread\" : true }" , args.nthreads );
198+ if (args.db_margo_config .size () > 0 ){
199+ std::ifstream in (args.db_margo_config );
200+ if (in.fail ()){ throw std::runtime_error (" Failed to read Margo config file " + args.db_margo_config ); }
201+ auto cfg = nlohmann::json::parse (in);
202+ margo_config = cfg.dump ();
203+ }
204+ progressStream << " ProvDB Admin: Margo configuration " << margo_config << std::endl;
205+
135206 // Initialize provider engine
136- tl::engine engine (eng_opt, THALLIUM_SERVER_MODE, true , args.nthreads );
207+ tl::engine engine (eng_opt, THALLIUM_SERVER_MODE, margo_config);
208+ margo_id = engine.get_margo_instance ();
137209
138210#ifdef _PERF_METRIC
139211 // Get Margo to output profiling information
@@ -154,7 +226,12 @@ int main(int argc, char** argv) {
154226 engine.define (" client_goodbye" ,client_goodbye).disable_response ();
155227 engine.define (" pserver_hello" ,pserver_hello).disable_response ();
156228 engine.define (" pserver_goodbye" ,pserver_goodbye).disable_response ();
229+ engine.define (" committer_hello" ,committer_hello).disable_response ();
230+ engine.define (" committer_goodbye" ,committer_goodbye).disable_response ();
157231 engine.define (" stop_server" ,client_stop_rpc).disable_response ();
232+ engine.define (" margo_dump" , margo_dump_rpc).disable_response ();
233+ engine.define (" connection_status" , connection_status);
234+
158235
159236 std::string addr = (std::string)engine.self (); // ip and port of admin
160237
@@ -167,19 +244,25 @@ int main(int argc, char** argv) {
167244
168245 { // Scope in which admin object is active
169246 sonata::Admin admin (engine);
170- progressStream << " ProvDB Admin: creating global data database" << std::endl;
171247 std::string glob_db_name = " provdb.global" ;
172- std::string glob_db_config = stringize (" { \" path\" : \" %s/%s.unqlite\" }" , args.db_write_dir .c_str (), glob_db_name.c_str ());
173- admin.createDatabase (addr, 0 , glob_db_name, args.db_type , glob_db_config);
174248
249+ nlohmann::json glob_db_config = base_config;
250+ glob_db_config[" path" ] = args.db_in_mem ? " :mem:" : stringize (" %s/%s.unqlite" , args.db_write_dir .c_str (), glob_db_name.c_str ());
251+
252+ progressStream << " ProvDB Admin: creating global data database: " << glob_db_name << " " << glob_db_config.dump () << " " << args.db_type << std::endl;
253+ admin.createDatabase (addr, 0 , glob_db_name, args.db_type , glob_db_config.dump ());
254+
175255 progressStream << " ProvDB Admin: creating " << args.nshards << " database shards" << std::endl;
176256
177257 std::vector<std::string> db_shard_names (args.nshards );
178258 for (int s=0 ;s<args.nshards ;s++){
179259 std::string db_name = stringize (" provdb.%d" ,s);
180- std::string config = stringize (" { \" path\" : \" %s/%s.unqlite\" }" , args.db_write_dir .c_str (), db_name.c_str ());
181- progressStream << " ProvDB Admin: Shard " << s << " : " << db_name << " " << config << " " << args.db_type << std::endl;
182- admin.createDatabase (addr, 0 , db_name, args.db_type , config);
260+
261+ nlohmann::json config = base_config;
262+ config[" path" ] = args.db_in_mem ? " :mem:" : stringize (" %s/%s.unqlite" , args.db_write_dir .c_str (), db_name.c_str ());
263+
264+ progressStream << " ProvDB Admin: Shard " << s << " : " << db_name << " " << config.dump () << " " << args.db_type << std::endl;
265+ admin.createDatabase (addr, 0 , db_name, args.db_type , config.dump ());
183266 db_shard_names[s] = db_name;
184267 }
185268
@@ -213,18 +296,20 @@ int main(int argc, char** argv) {
213296 typedef std::chrono::high_resolution_clock Clock;
214297 Clock::time_point commit_timer_start = Clock::now ();
215298
216-
217299 // Spin quietly until SIGTERM sent
218300 signal (SIGTERM, termSignalHandler);
219301 progressStream << " ProvDB Admin: main thread waiting for completion" << std::endl;
220302 while (!stop_wait_loop) { // stop wait loop will be set by SIGTERM handler
221303 tl::thread::sleep (engine, 1000 ); // Thallium engine sleeps but listens for rpc requests
222304
223305 unsigned long commit_timer_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now () - commit_timer_start).count ();
224- if (commit_timer_ms >= args.db_commit_freq ){
306+ if (args. db_commit_freq > 0 && commit_timer_ms >= args.db_commit_freq ){
225307 verboseStream << " ProvDB Admin: committing database to disk" << std::endl;
226- for (int s=0 ;s<args.nshards ;s++)
308+ for (int s=0 ;s<args.nshards ;s++){
309+ progressStream << " ProvDB Admin: committing shard " << s << std::endl;
227310 db[s].commit ();
311+ }
312+ progressStream << " ProvDB Admin: committing global db" << std::endl;
228313 glob_db.commit ();
229314 commit_timer_start = Clock::now ();
230315 }
@@ -236,9 +321,11 @@ int main(int argc, char** argv) {
236321 if (
237322 (args.autoshutdown || cmd_shutdown) &&
238323 ( a_client_has_connected && connected.size () == 0 ) &&
239- ( !pserver_has_connected || (pserver_has_connected && !pserver_connected) )
324+ ( !pserver_has_connected || (pserver_has_connected && !pserver_connected) ) &&
325+ ( !committer_has_connected || (committer_has_connected && !committer_connected) )
240326 ){
241327 progressStream << " ProvDB Admin: detected all clients disconnected, shutting down" << std::endl;
328+ margo_dump (" margo_dump_all_client_disconnected" );
242329 break ;
243330 }
244331 }
@@ -251,9 +338,11 @@ int main(int argc, char** argv) {
251338 }
252339
253340 progressStream << " ProvDB Admin: ending admin scope" << std::endl;
341+ margo_dump (" margo_dump_end_admin_scope" );
254342 }// admin scope
255343
256344 progressStream << " ProvDB Admin: ending provider scope" << std::endl;
345+ margo_dump (" margo_dump_end_provide_scope" );
257346 }// provider scope
258347
259348 progressStream << " ProvDB Admin: shutting down server engine" << std::endl;
0 commit comments