Skip to content

Commit 6da4407

Browse files
committed
Added "provdb_commit" main program that remotely requests the provDB to commit its database to disk at some regular frequency
provdb changes: Added RPCs to register connection/disconnection of commit client Added RPC that returns the status of connected clients Setting db_commmit_freq cmdline option to 0 now disables the periodic commit called within provdb_admin Modified launch script to disable the periodic commit inside provdb_admin and use provdb_commit instead. This appears to workaround the provdb hang
1 parent 6e81ced commit 6da4407

5 files changed

Lines changed: 179 additions & 8 deletions

File tree

app/Makefile.am

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ bpfile_replay_LDADD = $(LDADD)
3535

3636

3737
if ENABLE_PROVDB
38-
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown provdb_rebuild_ascii provdb_dump_state
38+
bin_PROGRAMS += provdb_admin provdb_query provdb_shutdown provdb_rebuild_ascii provdb_dump_state provdb_commit
3939

4040
provdb_admin_SOURCES = provdb_admin.cpp
4141
provdb_admin_LDADD = $(LDADD)
@@ -49,6 +49,10 @@ provdb_shutdown_LDADD = $(LDADD)
4949
provdb_rebuild_ascii_SOURCES = provdb_rebuild_ascii.cpp
5050
provdb_rebuild_ascii_LDADD = $(LDADD)
5151

52+
provdb_commit_SOURCES = provdb_commit.cpp
53+
provdb_commit_LDADD = $(LDADD)
54+
55+
5256
else
5357
echo "Provenance DB not being built"
5458
endif

app/provdb_admin.cpp

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
#include <spdlog/spdlog.h>
2222
#include "chimbuko/verbose.hpp"
23-
23+
#include <thallium/serialization/stl/string.hpp>
2424

2525
namespace tl = thallium;
2626

@@ -39,6 +39,9 @@ bool a_client_has_connected = false; //has an AD rank previously connected?
3939
bool pserver_connected = false; //is the pserver connected
4040
bool pserver_has_connected = false; //did the pserver ever connect
4141

42+
bool committer_connected = false; //is the committer connected?
43+
bool committer_has_connected = false; //did the committer ever connect?
44+
4245
//Allows a client to register with the provider
4346
void client_hello(const tl::request& req, const int rank) {
4447
std::lock_guard<tl::mutex> lock(*mtx);
@@ -67,6 +70,33 @@ void pserver_goodbye(const tl::request& req) {
6770
progressStream << "ProvDB Admin: Pserver has said goodbye" << std::endl;
6871
}
6972

73+
//Allows the committer to register with the provider
74+
void committer_hello(const tl::request& req) {
75+
std::lock_guard<tl::mutex> lock(*mtx);
76+
committer_connected = true;
77+
committer_has_connected = true;
78+
progressStream << "ProvDB Admin: Committer has said hello" << std::endl;
79+
}
80+
//Allows the committer to deregister from the provider
81+
void committer_goodbye(const tl::request& req) {
82+
std::lock_guard<tl::mutex> lock(*mtx);
83+
committer_connected = false;
84+
progressStream << "ProvDB Admin: Committer has said goodbye" << std::endl;
85+
}
86+
87+
//Get the connection status as a string of format
88+
//"<current clients> <a client has connected (0/1)> <pserver connected (0/1)> <pserver has connected (0/1)> <committer connected (0/1)> <commiter has connected (0/1)>"
89+
void connection_status(const tl::request& req) {
90+
std::lock_guard<tl::mutex> lock(*mtx);
91+
std::stringstream ss;
92+
ss << connected.size() << " " << int(a_client_has_connected) << " "
93+
<< int(pserver_connected) << " " << int(pserver_has_connected) << " "
94+
<< int(committer_connected) << " " << int(committer_has_connected);
95+
req.respond(ss.str());
96+
}
97+
98+
99+
70100
bool cmd_shutdown = false; //true if a client has requested that the server shut down
71101

72102
void client_stop_rpc(const tl::request& req) {
@@ -120,7 +150,7 @@ int main(int argc, char** argv) {
120150
addOptionalCommandLineArg(parser, nshards, "Specify the number of database shards (default 1)");
121151
addOptionalCommandLineArg(parser, nthreads, "Specify the number of RPC handler threads (default 1)");
122152
addOptionalCommandLineArg(parser, db_type, "Specify the Sonata database type (default \"unqlite\")");
123-
addOptionalCommandLineArg(parser, db_commit_freq, "Specify the frequency at which the database flushes to disk in ms (default 10000)");
153+
addOptionalCommandLineArg(parser, db_commit_freq, "Specify the frequency at which the database flushes to disk in ms (default 10000). 0 disables the flush until the end.");
124154
addOptionalCommandLineArg(parser, db_write_dir, "Specify the directory in which the database shards will be written (default \".\")");
125155

126156
if(argc-1 < parser.nMandatoryArgs() || (argc == 2 && std::string(argv[1]) == "-help")){
@@ -140,8 +170,13 @@ int main(int argc, char** argv) {
140170

141171
progressStream << "ProvDB Admin: initializing thallium with address: " << eng_opt << std::endl;
142172

173+
//Disable loopback so that RPC calls from this process are forced to go through the net interface
174+
hg_init_info info;
175+
memset(&info, 0, sizeof(info));
176+
info.no_loopback = HG_TRUE;
177+
143178
//Initialize provider engine
144-
tl::engine engine(eng_opt, THALLIUM_SERVER_MODE, true, args.nthreads);
179+
tl::engine engine(eng_opt, THALLIUM_SERVER_MODE, true, args.nthreads, &info);
145180
margo_id = engine.get_margo_instance();
146181

147182
#ifdef _PERF_METRIC
@@ -163,8 +198,12 @@ int main(int argc, char** argv) {
163198
engine.define("client_goodbye",client_goodbye).disable_response();
164199
engine.define("pserver_hello",pserver_hello).disable_response();
165200
engine.define("pserver_goodbye",pserver_goodbye).disable_response();
201+
engine.define("committer_hello",committer_hello).disable_response();
202+
engine.define("committer_goodbye",committer_goodbye).disable_response();
166203
engine.define("stop_server",client_stop_rpc).disable_response();
167204
engine.define("margo_dump", margo_dump_rpc).disable_response();
205+
engine.define("connection_status", connection_status);
206+
168207

169208
std::string addr = (std::string)engine.self(); //ip and port of admin
170209

@@ -230,7 +269,7 @@ int main(int argc, char** argv) {
230269
tl::thread::sleep(engine, 1000); //Thallium engine sleeps but listens for rpc requests
231270

232271
unsigned long commit_timer_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - commit_timer_start).count();
233-
if(commit_timer_ms >= args.db_commit_freq){
272+
if(args.db_commit_freq > 0 && commit_timer_ms >= args.db_commit_freq){
234273
verboseStream << "ProvDB Admin: committing database to disk" << std::endl;
235274
for(int s=0;s<args.nshards;s++){
236275
progressStream << "ProvDB Admin: committing shard " << s << std::endl;
@@ -248,7 +287,8 @@ int main(int argc, char** argv) {
248287
if(
249288
(args.autoshutdown || cmd_shutdown) &&
250289
( a_client_has_connected && connected.size() == 0 ) &&
251-
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) )
290+
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) ) &&
291+
( !committer_has_connected || (committer_has_connected && !committer_connected) )
252292
){
253293
progressStream << "ProvDB Admin: detected all clients disconnected, shutting down" << std::endl;
254294
break;

app/provdb_commit.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#include<atomic>
2+
#include<chrono>
3+
#include<thread>
4+
#include<cassert>
5+
#include<chimbuko/ad/ADProvenanceDBengine.hpp>
6+
#include<chimbuko/verbose.hpp>
7+
#include<chimbuko/util/string.hpp>
8+
#include <thallium/serialization/stl/string.hpp>
9+
10+
using namespace chimbuko;
11+
12+
bool stop_wait_loop = false; //trigger breakout of main thread spin loop
13+
14+
void termSignalHandler( int signum ) {
15+
stop_wait_loop = true;
16+
progressStream << "ProvDB Commit caught SIGTERM, exiting " << std::endl;
17+
}
18+
19+
bool exit_check(thallium::endpoint &server,
20+
thallium::remote_procedure &connection_status){
21+
std::string conn = connection_status.on(server)();
22+
int connected =-1;
23+
int a_client_has_connected=-1;
24+
int pserver_connected=-1;
25+
int pserver_has_connected=-1;
26+
std::stringstream of(conn);
27+
of >> connected >> a_client_has_connected >> pserver_connected >> pserver_has_connected;
28+
assert(!of.fail());
29+
30+
progressStream << "ProvDB Commit determined status: clients connected=" << connected << " (a client has connected ? " << a_client_has_connected << ") "
31+
<< "pserver connected=" << pserver_connected << " (pserver has connected ? " << pserver_has_connected << ")" << std::endl;
32+
return
33+
( a_client_has_connected && connected == 0 ) &&
34+
( !pserver_has_connected || (pserver_has_connected && !pserver_connected) );
35+
}
36+
37+
38+
int main(int argc, char** argv){
39+
#ifdef ENABLE_PROVDB
40+
if(argc != 4){
41+
std::cout << "Usage: provdb_commit <server address e.g. ofi+tcp;ofi_rxm://172.17.0.4:5000> <nshards> <freq (ms)>" << std::endl;
42+
return 1;
43+
}
44+
std::string addr = argv[1];
45+
int nshards = std::stoi(argv[2]);
46+
int freq_ms = std::stoi(argv[3]);
47+
48+
std::string protocol = ADProvenanceDBengine::getProtocolFromAddress(addr);
49+
if(ADProvenanceDBengine::getProtocol().first != protocol){
50+
int mode = ADProvenanceDBengine::getProtocol().second;
51+
verboseStream << "DB client reinitializing engine with protocol \"" << protocol << "\"" << std::endl;
52+
ADProvenanceDBengine::finalize();
53+
ADProvenanceDBengine::setProtocol(protocol,mode);
54+
}
55+
56+
thallium::engine &eng = ADProvenanceDBengine::getEngine();
57+
58+
thallium::endpoint server = eng.lookup(addr);
59+
60+
thallium::remote_procedure client_hello(eng.define("committer_hello").disable_response());
61+
thallium::remote_procedure client_goodbye(eng.define("committer_goodbye").disable_response());
62+
thallium::remote_procedure connection_status(eng.define("connection_status"));
63+
client_hello.on(server)();
64+
65+
{
66+
sonata::Client client = sonata::Client(eng);
67+
{
68+
std::vector<sonata::Database> databases(nshards);
69+
for(int i=0;i<nshards;i++){
70+
std::string db_name = stringize("provdb.%d", i);
71+
databases[i] = client.open(addr, 0, db_name);
72+
}
73+
sonata::Database glob_db = client.open(addr, 0, "provdb.global");
74+
75+
typedef std::chrono::high_resolution_clock Clock;
76+
Clock::time_point commit_timer_start = Clock::now();
77+
78+
signal(SIGTERM, termSignalHandler);
79+
progressStream << "ProvDB Commit: starting commit loop" << std::endl;
80+
81+
int iter = 0;
82+
while(!stop_wait_loop) { //stop wait loop will be set by SIGTERM handler
83+
thallium::thread::sleep(eng, 1000); //Thallium engine sleeps but listens for rpc requests
84+
85+
//Check every 10 seconds for connection status
86+
if(iter % 10 == 0 &&
87+
exit_check(server, connection_status ) ){
88+
progressStream << "ProvDB Commit: no clients connected, exiting" << std::endl;
89+
break;
90+
}
91+
92+
unsigned long commit_timer_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - commit_timer_start).count();
93+
if(commit_timer_ms >= freq_ms){
94+
verboseStream << "ProvDB Commit: committing database to disk" << std::endl;
95+
for(int s=0;s<nshards;s++){
96+
progressStream << "ProvDB Commit: committing shard " << s << std::endl;
97+
databases[s].commit();
98+
}
99+
progressStream << "ProvDB Commit: committing global db" << std::endl;
100+
glob_db.commit();
101+
commit_timer_start = Clock::now();
102+
}
103+
++iter;
104+
}//while(!stop_wait_loop)
105+
} //db scope
106+
}//client scope
107+
108+
progressStream << "ProvDB Commit sending goodbye handshake" << std::endl;
109+
client_goodbye.on(server)();
110+
client_hello.deregister();
111+
client_goodbye.deregister();
112+
connection_status.deregister();
113+
#endif
114+
115+
progressStream << "ProvDB Commit done" << std::endl;
116+
return 0;
117+
}

scripts/launch/run_services.sh

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ if (( ${use_provdb} == 1 )); then
110110
#Enable better error reporting from Mercury
111111
export HG_LOG_SUBSYS=hg export HG_LOG_LEVEL=error
112112

113-
provdb_admin "${provdb_addr}" ${provdb_extra_args} -engine ${provdb_engine} -nshards ${provdb_nshards} -nthreads ${provdb_nthreads} -db_write_dir ${provdb_writedir} 2>&1 | tee ${log_dir}/provdb.log &
113+
provdb_admin "${provdb_addr}" ${provdb_extra_args} -engine ${provdb_engine} -nshards ${provdb_nshards} -nthreads ${provdb_nthreads} -db_write_dir ${provdb_writedir} -db_commit_freq 0 2>&1 | tee ${log_dir}/provdb.log &
114114
provdb_pid=$!
115115

116116
start_time=$SECONDS
@@ -135,6 +135,16 @@ else
135135
ps_extra_args+=" -prov_outputpath ${provdb_writedir}"
136136
fi
137137

138+
#Committer
139+
if (( ${use_provdb} == 1 )); then
140+
echo "==========================================="
141+
echo "Instantiating committer"
142+
echo "==========================================="
143+
prov_add=$(cat ${provdb_dir}/provider.address)
144+
provdb_commit "${prov_add}" ${provdb_nshards} 10000 2>&1 | tee ${log_dir}/committer.log &
145+
sleep 3
146+
fi
147+
138148
#Visualization
139149
if (( ${use_viz} == 1 )); then
140150
if (( ${use_pserver} != 1 )); then

src/ad/ADParser.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ int ADParser::beginStep(bool verbose) {
8383
}else if(status == adios2::StepStatus::NotReady){
8484
double elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - start).count();
8585
if(elapsed > m_beginstep_timeout * 1000){
86-
recoverable_error("ADParser::beginStep : ADIOS2::BeginStep timed out waiting for next step to be ready\n");
86+
recoverable_error("ADParser::beginStep : ADIOS2::BeginStep timed out after " + std::to_string(elapsed) + " ms waiting for next step to be ready\n");
8787
m_status = false;
8888
m_current_step = -1;
8989
break;

0 commit comments

Comments
 (0)