Skip to content

Commit 261d639

Browse files
committed
Fixed another race condition in ADThreadNetClient for blocking actions
Moved functionality for sending json data via curl from PSstatSender to a separate class for reuse purposes
1 parent 24b409f commit 261d639

4 files changed

Lines changed: 13 additions & 71 deletions

File tree

include/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
nobase_include_HEADERS = chimbuko/chimbuko.hpp chimbuko/ad/FuncAnomalyMetrics.hpp chimbuko/ad/ADOutlier.hpp chimbuko/ad/ADLocalAnomalyMetrics.hpp chimbuko/ad/ADNetClient.hpp chimbuko/ad/FuncStats.hpp chimbuko/ad/ADAnomalyProvenance.hpp chimbuko/ad/ADParser.hpp chimbuko/ad/ADProvenanceDBengine.hpp chimbuko/ad/ADNormalEventProvenance.hpp chimbuko/ad/ADLocalFuncStatistics.hpp chimbuko/ad/ADMetadataParser.hpp chimbuko/ad/AnomalyData.hpp chimbuko/ad/ADDefine.hpp chimbuko/ad/ADglobalFunctionIndexMap.hpp chimbuko/ad/ADProvenanceDBclient.hpp chimbuko/ad/ADcombinedPSdata.hpp chimbuko/ad/ADEvent.hpp chimbuko/ad/ADCounter.hpp chimbuko/ad/ADLocalCounterStatistics.hpp chimbuko/ad/ExecData.hpp chimbuko/ad/utils.hpp chimbuko/ad/ADio.hpp chimbuko/provdb/setup.hpp chimbuko/verbose.hpp chimbuko/pserver/PSProvenanceDBclient.hpp chimbuko/pserver/GlobalAnomalyStats.hpp chimbuko/pserver/AggregateFuncAnomalyMetrics.hpp chimbuko/pserver/PSglobalFunctionIndexMap.hpp chimbuko/pserver/AggregateFuncStats.hpp chimbuko/pserver/AggregateAnomalyData.hpp chimbuko/pserver/NetPayloadRecvCombinedADdata.hpp chimbuko/pserver/PSstatSender.hpp chimbuko/pserver/GlobalAnomalyMetrics.hpp chimbuko/pserver/GlobalCounterStats.hpp chimbuko/param/sstd_param.hpp chimbuko/param/copod_param.hpp chimbuko/param/hbos_param.hpp chimbuko/AD.hpp chimbuko/message.hpp chimbuko/net.hpp chimbuko/pserver.hpp chimbuko/provdb.hpp chimbuko/param.hpp chimbuko/net/zmqme_net.hpp chimbuko/net/mpi_net.hpp chimbuko/net/zmq_net.hpp chimbuko/net/local_net.hpp chimbuko/util/RunStats.hpp chimbuko/util/ADIOS2parseUtils.hpp chimbuko/util/environment.hpp chimbuko/util/time.hpp chimbuko/util/map.hpp chimbuko/util/error.hpp chimbuko/util/string.hpp chimbuko/util/DispatchQueue.hpp chimbuko/util/PerfStats.hpp chimbuko/util/RunMetric.hpp chimbuko/util/memutils.hpp chimbuko/util/Anomalies.hpp chimbuko/util/barrier.hpp chimbuko/util/threadPool.hpp chimbuko/util/hash.hpp chimbuko/util/mtQueue.hpp chimbuko/util/serialize.hpp chimbuko/util/commandLineParser.hpp
1+
nobase_include_HEADERS = chimbuko/chimbuko.hpp chimbuko/ad/FuncAnomalyMetrics.hpp chimbuko/ad/ADOutlier.hpp chimbuko/ad/ADLocalAnomalyMetrics.hpp chimbuko/ad/ADNetClient.hpp chimbuko/ad/FuncStats.hpp chimbuko/ad/ADAnomalyProvenance.hpp chimbuko/ad/ADParser.hpp chimbuko/ad/ADProvenanceDBengine.hpp chimbuko/ad/ADNormalEventProvenance.hpp chimbuko/ad/ADLocalFuncStatistics.hpp chimbuko/ad/ADMetadataParser.hpp chimbuko/ad/AnomalyData.hpp chimbuko/ad/ADDefine.hpp chimbuko/ad/ADglobalFunctionIndexMap.hpp chimbuko/ad/ADProvenanceDBclient.hpp chimbuko/ad/ADcombinedPSdata.hpp chimbuko/ad/ADEvent.hpp chimbuko/ad/ADCounter.hpp chimbuko/ad/ADLocalCounterStatistics.hpp chimbuko/ad/ExecData.hpp chimbuko/ad/utils.hpp chimbuko/ad/ADio.hpp chimbuko/provdb/setup.hpp chimbuko/verbose.hpp chimbuko/pserver/PSProvenanceDBclient.hpp chimbuko/pserver/GlobalAnomalyStats.hpp chimbuko/pserver/AggregateFuncAnomalyMetrics.hpp chimbuko/pserver/PSglobalFunctionIndexMap.hpp chimbuko/pserver/AggregateFuncStats.hpp chimbuko/pserver/AggregateAnomalyData.hpp chimbuko/pserver/NetPayloadRecvCombinedADdata.hpp chimbuko/pserver/PSstatSender.hpp chimbuko/pserver/GlobalAnomalyMetrics.hpp chimbuko/pserver/GlobalCounterStats.hpp chimbuko/param/sstd_param.hpp chimbuko/param/copod_param.hpp chimbuko/param/hbos_param.hpp chimbuko/AD.hpp chimbuko/message.hpp chimbuko/net.hpp chimbuko/pserver.hpp chimbuko/provdb.hpp chimbuko/param.hpp chimbuko/net/zmqme_net.hpp chimbuko/net/mpi_net.hpp chimbuko/net/zmq_net.hpp chimbuko/net/local_net.hpp chimbuko/util/RunStats.hpp chimbuko/util/ADIOS2parseUtils.hpp chimbuko/util/environment.hpp chimbuko/util/time.hpp chimbuko/util/map.hpp chimbuko/util/error.hpp chimbuko/util/string.hpp chimbuko/util/DispatchQueue.hpp chimbuko/util/PerfStats.hpp chimbuko/util/curlJsonSender.hpp chimbuko/util/RunMetric.hpp chimbuko/util/memutils.hpp chimbuko/util/Anomalies.hpp chimbuko/util/barrier.hpp chimbuko/util/threadPool.hpp chimbuko/util/hash.hpp chimbuko/util/mtQueue.hpp chimbuko/util/serialize.hpp chimbuko/util/commandLineParser.hpp

src/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/3rdparty @PS_FLAGS@
22

33
lib_LTLIBRARIES = libchimbuko.la
4-
libchimbuko_la_SOURCES = chimbuko.cpp param.cpp util/ADIOS2parseUtils.cpp util/RunStats.cpp util/DispatchQueue.cpp util/Anomalies.cpp util/map.cpp util/string.cpp util/error.cpp util/memutils.cpp util/time.cpp util/PerfStats.cpp util/environment.cpp ad/ADOutlier.cpp ad/ADParser.cpp ad/ADio.cpp ad/utils.cpp ad/ADEvent.cpp ad/AnomalyData.cpp ad/FuncStats.cpp ad/ExecData.cpp ad/ADCounter.cpp ad/ADAnomalyProvenance.cpp ad/ADNetClient.cpp ad/ADLocalFuncStatistics.cpp ad/ADLocalCounterStatistics.cpp ad/ADProvenanceDBengine.cpp ad/ADProvenanceDBclient.cpp ad/ADMetadataParser.cpp ad/ADglobalFunctionIndexMap.cpp ad/ADNormalEventProvenance.cpp ad/ADcombinedPSdata.cpp ad/FuncAnomalyMetrics.cpp ad/ADLocalAnomalyMetrics.cpp pserver/PSstatSender.cpp pserver/PSglobalFunctionIndexMap.cpp pserver/PSProvenanceDBclient.cpp pserver/AggregateAnomalyData.cpp pserver/AggregateFuncStats.cpp pserver/NetPayloadRecvCombinedADdata.cpp pserver/AggregateFuncAnomalyMetrics.cpp pserver/GlobalAnomalyStats.cpp pserver/GlobalCounterStats.cpp pserver/GlobalAnomalyMetrics.cpp message.cpp net/zmq_net.cpp net/mpi_net.cpp net/zmqme_net.cpp net/local_net.cpp net.cpp param/hbos_param.cpp param/copod_param.cpp param/sstd_param.cpp
4+
libchimbuko_la_SOURCES = chimbuko.cpp param.cpp util/ADIOS2parseUtils.cpp util/RunStats.cpp util/DispatchQueue.cpp util/Anomalies.cpp util/map.cpp util/string.cpp util/error.cpp util/memutils.cpp util/time.cpp util/PerfStats.cpp util/environment.cpp util/curlJsonSender.cpp ad/ADOutlier.cpp ad/ADParser.cpp ad/ADio.cpp ad/utils.cpp ad/ADEvent.cpp ad/AnomalyData.cpp ad/FuncStats.cpp ad/ExecData.cpp ad/ADCounter.cpp ad/ADAnomalyProvenance.cpp ad/ADNetClient.cpp ad/ADLocalFuncStatistics.cpp ad/ADLocalCounterStatistics.cpp ad/ADProvenanceDBengine.cpp ad/ADProvenanceDBclient.cpp ad/ADMetadataParser.cpp ad/ADglobalFunctionIndexMap.cpp ad/ADNormalEventProvenance.cpp ad/ADcombinedPSdata.cpp ad/FuncAnomalyMetrics.cpp ad/ADLocalAnomalyMetrics.cpp pserver/PSstatSender.cpp pserver/PSglobalFunctionIndexMap.cpp pserver/PSProvenanceDBclient.cpp pserver/AggregateAnomalyData.cpp pserver/AggregateFuncStats.cpp pserver/NetPayloadRecvCombinedADdata.cpp pserver/AggregateFuncAnomalyMetrics.cpp pserver/GlobalAnomalyStats.cpp pserver/GlobalCounterStats.cpp pserver/GlobalAnomalyMetrics.cpp message.cpp net/zmq_net.cpp net/mpi_net.cpp net/zmqme_net.cpp net/local_net.cpp net.cpp param/hbos_param.cpp param/copod_param.cpp param/sstd_param.cpp
55
libchimbuko_la_LDFLAGS = -version-info 3:0:0

src/ad/ADNetClient.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,12 @@ void ADThreadNetClient::run(bool local){
429429
while(nwork > 0){
430430
ClientAction* work_item = getWorkItem();
431431
//Ask if shutdown is to be done *before* calling perform as blocking actions unlock the parent thread in perform which destroys the ClientAction object making the pointer invalid!
432-
shutdown = shutdown || work_item->shutdown_worker();
432+
shutdown = shutdown || work_item->shutdown_worker();
433+
//Ask if need to delete before perform for same reasons as above
434+
bool do_delete = work_item->do_delete();
433435
work_item->perform(*client);
434436

435-
if(work_item->do_delete()) delete work_item;
437+
if(do_delete) delete work_item;
436438
nwork = getNwork();
437439
}
438440
if(shutdown){

src/pserver/PSstatSender.cpp

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#include <chimbuko/pserver/PSstatSender.hpp>
2-
#include <curl/curl.h>
2+
#include <chimbuko/util/curlJsonSender.hpp>
33
#include <chimbuko/verbose.hpp>
44
#include <sstream>
55
#include <fstream>
6+
67
using namespace chimbuko;
78

89

@@ -17,29 +18,6 @@ PSstatSender::~PSstatSender()
1718
for(auto &payload : m_payloads) delete payload;
1819
}
1920

20-
21-
// holder for curl fetch
22-
struct curl_fetch_str {
23-
std::string m_payload;
24-
bool m_do_fetch;
25-
curl_fetch_str(bool do_fetch) : m_do_fetch(do_fetch)
26-
{
27-
28-
}
29-
};
30-
31-
static size_t _curl_writefunc(char *ptr, size_t size, size_t nmemb, void* userp)
32-
{
33-
// std::cout << "curl_writefunc: " << std::string(ptr) << std::endl;
34-
struct curl_fetch_str *p = (struct curl_fetch_str *) userp;
35-
36-
if (ptr && p->m_do_fetch) {
37-
p->m_payload = std::string(ptr);
38-
}
39-
40-
return size * nmemb;
41-
}
42-
4321
static void send_stat(std::string url, std::string stat_save_dir,
4422
std::atomic_bool& bStop,
4523
const std::vector<PSstatSenderPayloadBase*> &payloads,
@@ -49,29 +27,7 @@ static void send_stat(std::string url, std::string stat_save_dir,
4927
bool write_curl = url.size() > 0;
5028
bool write_disk = stat_save_dir.size() > 0;
5129

52-
CURL* curl = nullptr;
53-
struct curl_slist * headers = nullptr;
54-
CURLcode res;
55-
long httpCode(0);
56-
57-
//Initialize
58-
if(write_curl){
59-
curl_global_init(CURL_GLOBAL_ALL);
60-
61-
curl = curl_easy_init();
62-
if (curl == nullptr) throw std::runtime_error("Failed to initialize curl easy handler");
63-
64-
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
65-
// Dont bother trying IPv6, which would increase DNS resolution time
66-
curl_easy_setopt(curl, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4);
67-
// Don't wait forever, time out after 10 seconds
68-
//curl_easy_setopt(curl, CURLOPT_TIMEOUT, 10);
69-
// Follow HTTP redirects if necessary
70-
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
71-
// header
72-
headers = curl_slist_append(headers, "Content-Type: application/json");
73-
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
74-
}
30+
curlJsonSender *curl_sender = write_curl ? new curlJsonSender(url) : nullptr;
7531

7632
size_t iter = 0;
7733

@@ -100,22 +56,11 @@ static void send_stat(std::string url, std::string stat_save_dir,
10056
of << packet;
10157
}
10258
if(write_curl){ //Send data via curl
103-
curl_fetch_str fetch(do_fetch); //request callback if
104-
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, packet.c_str());
105-
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, packet.size());
106-
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
107-
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &_curl_writefunc);
108-
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void*)&fetch);
109-
110-
res = curl_easy_perform(curl);
111-
if (res != CURLE_OK){
112-
std::cerr << "curl_easy_perform() failed: " << curl_easy_strerror(res) << std::endl;
113-
}
114-
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
115-
59+
std::string response;
60+
curl_sender->send(packet, do_fetch ? &response : nullptr);
11661
if(do_fetch){
11762
for(auto payload : payloads){
118-
if(payload->do_fetch()) payload->process_callback(packet, fetch.m_payload);
63+
if(payload->do_fetch()) payload->process_callback(packet, response);
11964
}
12065
}
12166
}
@@ -124,12 +69,7 @@ static void send_stat(std::string url, std::string stat_save_dir,
12469
++iter;
12570
}
12671

127-
128-
if (curl){
129-
if (headers) curl_slist_free_all(headers);
130-
curl_easy_cleanup(curl);
131-
curl_global_cleanup();
132-
}
72+
if(curl_sender) delete curl_sender;
13373
}catch(const std::exception &exc){
13474
std::cerr << "PSstatSender caught exception:" << exc.what() << std::endl;
13575
bad = true;

0 commit comments

Comments
 (0)