Skip to content

Commit 7ef45a3

Browse files
author
Nathan Lee
committed
Patched performance issue caused by multiprocessing.Value in Node being initialized at PyRunner startup, instead of at start of Node execution.
1 parent 01788d7 commit 7ef45a3

2 files changed

Lines changed: 24 additions & 14 deletions

File tree

pyrunner/core/node.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def __init__(self, id=-1, name=None):
5151
self._start_time = 0
5252
self._end_time = 0
5353
self._timeout = float('inf')
54-
self._retcode = multiprocessing.Value('i', 0)
54+
self._retcode = 0
55+
self._proc_retcode = None
5556
self._thread = None
5657
self._context = None
5758
self._module = None
@@ -94,7 +95,8 @@ def execute(self):
9495
if self._must_wait and (time.time() - self._wait_start) < self._retry_wait_time:
9596
return
9697

97-
self._retcode.value = 0
98+
self._proc_retcode = multiprocessing.Value('i', 0)
99+
self._proc_retcode.value = 0
98100
self._must_wait = False
99101
self._attempts += 1
100102

@@ -106,7 +108,7 @@ def execute(self):
106108

107109
# Check if provided worker actually extends the Worker class.
108110
if issubclass(worker_class, Worker):
109-
worker = worker_class(self.context, self._retcode, self.logfile, self.argv)
111+
worker = worker_class(self.context, self._proc_retcode, self.logfile, self.argv)
110112
# If it does not extend the Worker class, initialize a reverse-Worker in which the
111113
# worker extends the provided class.
112114
else:
@@ -146,8 +148,8 @@ def poll(self, wait=False):
146148
# causing the thread to block until it's job is complete.
147149
self._thread.join()
148150
self._end_time = time.time()
149-
self._thread = None
150-
if self.retcode > 0:
151+
self.retcode = self._proc_retcode.value
152+
if self._proc_retcode.value > 0:
151153
if self._attempts < self.max_attempts:
152154
logger = lg.FileLogger(self.logfile)
153155
logger.open(False)
@@ -156,15 +158,16 @@ def poll(self, wait=False):
156158
self._wait_start = time.time()
157159
logger.restart_message(self._attempts)
158160
logger.close()
159-
self._retcode.value = -1
161+
self.retcode = -1
162+
self.cleanup()
160163
elif (time.time() - self._start_time) >= self._timeout:
161164
self._thread.terminate()
162165
running = False
163166
logger = lg.FileLogger(self.logfile)
164167
logger.open(False)
165168
logger.error('Worker runtime has exceeded the set maximum/timeout of {} seconds.'.format(self._timeout))
166169
logger.close()
167-
self._retcode.value = 906
170+
self.retcode = 906
168171

169172
return self.retcode if (not running or wait) else None
170173

@@ -178,9 +181,15 @@ def terminate(self):
178181
logger.open(False)
179182
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
180183
logger.close()
181-
self._retcode.value = 907
184+
self.cleanup()
185+
self.retcode = 907
182186
return
183187

188+
def cleanup(self):
189+
self._thread = None
190+
self._proc_retcode = None
191+
self._context = None
192+
184193

185194
# ########################## MISC ########################## #
186195

@@ -269,12 +278,12 @@ def name(self, value):
269278

270279
@property
271280
def retcode(self):
272-
return self._retcode.value
281+
return self._retcode
273282
@retcode.setter
274283
def retcode(self, value):
275284
if int(value) < -2:
276285
raise ValueError('retcode must be -2 or greater - received: {}'.format(value))
277-
self._retcode.value = int(value)
286+
self._retcode = int(value)
278287
return self
279288

280289
@property

pyrunner/worker/abstract.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(self, context, retcode, logfile, argv):
3636
self.context = context
3737
self._retcode = retcode
3838
self.logfile = logfile
39-
self.logger = lg.FileLogger(logfile).open()
39+
self.logger = None
4040
self.argv = argv
4141
return
4242

@@ -57,9 +57,9 @@ def protected_run(self):
5757
methods, if defined.
5858
"""
5959

60-
if self.logfile:
61-
sys.stdout = open(self.logfile, 'a')
62-
sys.stderr = open(self.logfile, 'a')
60+
self.logger = lg.FileLogger(self.logfile).open()
61+
sys.stdout = self.logger.logfile_handle
62+
sys.stderr = self.logger.logfile_handle
6363

6464
# RUN
6565
try:
@@ -107,6 +107,7 @@ def protected_run(self):
107107
if sys.stdout: sys.stdout.close()
108108
if sys.stderr: sys.stderr.close()
109109
self.logger.close()
110+
self.logger = None
110111

111112
return
112113

0 commit comments

Comments
 (0)