Skip to content

Commit cff8175

Browse files
author
Nathan Lee
committed
Fixed bug due to strange interaction between multiprocessing.Value and Workers using requests module.
1 parent 07669b0 commit cff8175

5 files changed

Lines changed: 39 additions & 46 deletions

File tree

pyrunner/core/engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def initiate(self, **kwargs):
119119
if p.id >= 0 and p not in self.register.completed_nodes.union(self.register.norun_nodes):
120120
runnable = False
121121
break
122-
if runnable:
122+
if runnable and node.is_runnable():
123123
self.register.pending_nodes.remove(node)
124124
node.context = self.context
125125
node.execute()

pyrunner/core/node.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,22 @@ def __init__(self, id=-1, name=None):
4343
self.name = name
4444

4545
self._id = int(id)
46+
47+
# Num attempts/restart management
4648
self._attempts = 0
4749
self._max_attempts = 1
4850
self._retry_wait_time = 0
4951
self._wait_until = 0
52+
5053
self._start_time = 0
5154
self._end_time = 0
5255
self._timeout = float('inf')
53-
self._retcode = 0
54-
self._proc_retcode = None
5556
self._proc = None
5657
self._context = None
58+
5759
self._module = None
5860
self._worker = None
61+
self._worker_instance = None
5962

6063
self._parent_nodes = set()
6164
self._child_nodes = set()
@@ -71,6 +74,9 @@ def __ne__(self, other):
7174
def __lt__(self, other):
7275
return self._id < other._id
7376

77+
def is_runnable(self):
78+
return time.time() >= self._wait_until
79+
7480
def execute(self):
7581
"""
7682
Spawns a new process via the `run` method of defined Worker class.
@@ -82,29 +88,22 @@ def execute(self):
8288
logfile handle, and task-level arguments.
8389
"""
8490
# Return early if retry triggered and wait time has not yet fully elapsed
85-
if time.time() < self._wait_until:
91+
if not self.is_runnable():
8692
return
8793

88-
self._proc_retcode = multiprocessing.Value('i', 0)
89-
self._proc_retcode.value = 0
9094
self._attempts += 1
9195

9296
if not self._start_time:
9397
self._start_time = time.time()
9498

9599
try:
96-
worker_class = getattr(importlib.import_module(self.module), self.worker)
97-
98100
# Check if provided worker actually extends the Worker class.
99-
if issubclass(worker_class, Worker):
100-
worker = worker_class(self.context, self._proc_retcode, self.logfile, self.argv)
101-
# If it does not extend the Worker class, initialize a reverse-Worker in which the
102-
# worker extends the provided class.
103-
else:
101+
if not issubclass(self.worker_class, Worker):
104102
raise TypeError('{}.{} is not an extension of pyrunner.Worker'.format(self.module, self.worker))
105103

106104
# Launch the "run" method of the provided Worker under a new process.
107-
self._proc = multiprocessing.Process(target=worker.protected_run, daemon=False)
105+
self._worker_instance = self.worker_class(self.context, self.logfile, self.argv)
106+
self._proc = multiprocessing.Process(target=self._worker_instance.protected_run, daemon=False)
108107
self._proc.start()
109108
except Exception as e:
110109
logger = lg.FileLogger(self.logfile)
@@ -127,25 +126,24 @@ def poll(self, wait=False):
127126
Integer return code if process has exited, otherwise `None`.
128127
"""
129128
if not self._proc:
130-
self.retcode = 905
131-
return self.retcode
129+
return 905
132130

133131
running = self._proc.is_alive()
132+
retcode = 0
134133

135134
if not running or wait:
136135
# Note that if wait is True, then the join() method is invoked immediately,
137136
# causing the thread to block until it's job is complete.
138137
self._proc.join()
139138
self._end_time = time.time()
140-
self.retcode = self._proc_retcode.value
141-
if self.retcode > 0 and (self._attempts < self.max_attempts):
139+
retcode = self._worker_instance.retcode
140+
if retcode > 0 and (self._attempts < self.max_attempts):
142141
logger = lg.FileLogger(self.logfile)
143142
logger.open(False)
144-
logger.info('Waiting {} seconds before retrying...'.format(self._retry_wait_time))
145143
self._wait_until = time.time() + self._retry_wait_time
146-
logger.restart_message(self._attempts)
147-
logger.close()
148-
self.retcode = -1
144+
logger.restart_message(self._attempts, 'Waiting {} seconds before retrying...'.format(self._retry_wait_time))
145+
logger.close(False)
146+
retcode = -1
149147
self.cleanup()
150148
elif (time.time() - self._start_time) >= self._timeout:
151149
self._proc.terminate()
@@ -155,9 +153,9 @@ def poll(self, wait=False):
155153
logger.error('Worker runtime has exceeded the set maximum/timeout of {} seconds.'.format(self._timeout))
156154
logger.close()
157155
self.cleanup()
158-
self.retcode = 906
156+
retcode = 906
159157

160-
return self.retcode if (not running or wait) else None
158+
return retcode if (not running or wait) else None
161159

162160
def terminate(self):
163161
"""
@@ -167,15 +165,13 @@ def terminate(self):
167165
self._proc.terminate()
168166
logger = lg.FileLogger(self.logfile)
169167
logger.open(False)
170-
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
168+
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating Worker and exiting.")
171169
logger.close()
172-
self.retcode = 907
173170
self.cleanup()
174-
return
171+
return 907
175172

176173
def cleanup(self):
177174
self._proc = None
178-
self._proc_retcode = None
179175
self._context = None
180176

181177

@@ -264,16 +260,6 @@ def name(self, value):
264260
self._name = str(value).strip()
265261
return self
266262

267-
@property
268-
def retcode(self):
269-
return self._retcode
270-
@retcode.setter
271-
def retcode(self, value):
272-
if int(value) < -2:
273-
raise ValueError('retcode must be -2 or greater - received: {}'.format(value))
274-
self._retcode = int(value)
275-
return self
276-
277263
@property
278264
def context(self):
279265
return getattr(self, '_context', None)

pyrunner/logger/abstract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def _system_(self, text):
6767
self._emit_('SYSTEM', text)
6868

6969
@abstractmethod
70-
def restart_message(self, restart_count):
70+
def restart_message(self, restart_count, extra_text):
7171
"""
7272
Write a RESTART attempt indication message.
7373
"""

pyrunner/logger/file.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,27 @@ def _emit_(self, level, text):
5050
self.logfile_handle.flush()
5151
return
5252

53-
def restart_message(self, restart_count):
53+
def restart_message(self, restart_count, extra_text=None):
5454
"""
5555
Write a RESTART attempt indication message.
5656
"""
5757
self.logfile_handle.write("\n############################################################################\n")
5858
self.logfile_handle.write("# RESTART ATTEMPT {} - {}\n".format(restart_count, datetime.now()))
59+
if extra_text:
60+
self.logfile_handle.write("# {}\n".format(extra_text))
5961
self.logfile_handle.write("############################################################################\n\n")
6062
self.logfile_handle.flush()
6163
return
6264

63-
def close(self):
65+
def close(self, close_message=True):
6466
"""
6567
Close stream for target log file.
6668
"""
6769
if self.logfile_handle:
68-
self.logfile_handle.write("\n############################################################################\n")
69-
self.logfile_handle.write("# LOG END - {}\n".format(datetime.now()))
70-
self.logfile_handle.write("############################################################################\n\n")
70+
if close_message:
71+
self.logfile_handle.write("\n############################################################################\n")
72+
self.logfile_handle.write("# LOG END - {}\n".format(datetime.now()))
73+
self.logfile_handle.write("############################################################################\n\n")
7174
self.logfile_handle.close()
7275
return
7376

pyrunner/worker/abstract.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# SPDX-License-Identifier: Apache-2.0
1616

1717
import traceback, sys
18+
import multiprocessing.sharedctypes
1819

1920
import pyrunner.logger.file as lg
2021

@@ -32,14 +33,17 @@ class Worker(ABC):
3233
- on_exit()
3334
"""
3435

35-
def __init__(self, context, retcode, logfile, argv):
36+
def __init__(self, context, logfile, argv):
3637
self.context = context
37-
self._retcode = retcode
38+
self._retcode = multiprocessing.sharedctypes.Value('i', 0)
3839
self.logfile = logfile
3940
self.logger = None
4041
self.argv = argv
4142
return
4243

44+
def cleanup(self):
45+
self._retcode = None
46+
4347
# The _retcode is handled by multiprocessing.Manager and requires special handling.
4448
@property
4549
def retcode(self):

0 commit comments

Comments
 (0)