Skip to content

Commit 07669b0

Browse files
author
Nathan Lee
committed
Refactoring
1 parent 7ef45a3 commit 07669b0

4 files changed

Lines changed: 52 additions & 64 deletions

File tree

pyrunner/core/engine.py

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ def initiate(self, **kwargs):
7272
self.start_time = time.time()
7373
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] >= 1 else 0
7474
last_save = 0
75-
ab_code = 0
7675

7776
if not self.register: raise RuntimeError('NodeRegister has not been initialized!')
7877

@@ -147,7 +146,6 @@ def initiate(self, **kwargs):
147146
print('\nKeyboard Interrupt Received')
148147
print('\nCancelling Execution')
149148
self._abort_all_workers()
150-
self.save_state_func(False, True)
151149
return -1
152150

153151
# App lifecycle - SUCCESS
@@ -164,7 +162,7 @@ def initiate(self, **kwargs):
164162
self._on_destroy_func()
165163

166164
if not kwargs.get('silent'):
167-
self._print_final_state(ab_code)
165+
self._print_final_state()
168166

169167
if not self.config['test_mode'] and self.save_state_func:
170168
self.save_state_func()
@@ -177,6 +175,8 @@ def _abort_all_workers(self):
177175
self.register.running_nodes.remove(node)
178176
self.register.aborted_nodes.add(node)
179177
self.register.set_children_defaulted(node)
178+
self.save_state_func(False, True)
179+
self._print_final_state(True)
180180

181181
def _print_current_state(self):
182182
elapsed = time.time() - self.start_time
@@ -205,39 +205,42 @@ def _print_current_state(self):
205205

206206
return
207207

208-
def _print_final_state(self, ab_code=0):
208+
def _print_final_state(self, aborted=False):
209209
print('\nCompleted in {:0.2f} seconds\n'.format(time.time() - self.start_time))
210210

211-
if ab_code > 0:
211+
if aborted:
212212
print('Final Status: ABORTED\n')
213+
print('Aborted Processes:\n')
214+
215+
for n in self.register.aborted_nodes:
216+
self._print_node_info(n, self.config['dump_logs'])
217+
213218
elif len(self.register.failed_nodes) + len(self.register.defaulted_nodes):
214219
print('Final Status: FAILURE\n')
215220
print('Failed Processes:\n')
216221

217222
for n in self.register.failed_nodes:
218-
print('ID: {}'.format(n.id))
219-
print('Name: {}'.format(n.name))
220-
print('Module: {}'.format(n.module))
221-
print('Worker: {}'.format(n.worker))
222-
print('Arguments: {}'.format(n.arguments))
223-
print('Log File: {}\n'.format(n.logfile))
224-
225-
if self.config['dump_logs']:
226-
print('DUMPING FAILURE LOGS\n')
227-
228-
for n in self.register.failed_nodes:
229-
print('############################################################################')
230-
print('# ID: {}'.format(n.id))
231-
print('# Name: {}'.format(n.name))
232-
print('# Module: {}'.format(n.module))
233-
print('# Worker: {}'.format(n.worker))
234-
print('# Arguments: {}'.format(n.arguments))
235-
print('# Log File: {}'.format(n.logfile))
236-
with open(n.logfile, 'r') as f:
237-
for line in f:
238-
print(line, end='')
223+
self._print_node_info(n, self.config['dump_logs'])
239224

240225
else:
241226
print('Final Status: SUCCESS\n')
242227

243-
return
228+
return
229+
230+
def _print_node_info(self, n, dump_logs=False):
231+
if dump_logs:
232+
print('############################################################################')
233+
234+
print('# ID: {}'.format(n.id))
235+
print('# Name: {}'.format(n.name))
236+
print('# Module: {}'.format(n.module))
237+
print('# Worker: {}'.format(n.worker))
238+
print('# Arguments: {}'.format(n.arguments))
239+
print('# Log File: {}'.format(n.logfile))
240+
241+
if dump_logs:
242+
with open(n.logfile, 'r') as f:
243+
for line in f:
244+
print(line, end='')
245+
246+
print('')

pyrunner/core/node.py

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,13 @@ def __init__(self, id=-1, name=None):
4646
self._attempts = 0
4747
self._max_attempts = 1
4848
self._retry_wait_time = 0
49-
self._must_wait = False
50-
self._wait_start = 0
49+
self._wait_until = 0
5150
self._start_time = 0
5251
self._end_time = 0
5352
self._timeout = float('inf')
5453
self._retcode = 0
5554
self._proc_retcode = None
56-
self._thread = None
55+
self._proc = None
5756
self._context = None
5857
self._module = None
5958
self._worker = None
@@ -81,23 +80,13 @@ def execute(self):
8180
8281
Workers are given references to the shared Context, main-proc <-> child-proc return code value,
8382
logfile handle, and task-level arguments.
84-
85-
The Node supports two variations of Workers:
86-
1. Implicit: To support older Worker strategy, a generic Class can be provided, which is then
87-
extended/subclassed via the Node's `generate_worker` method. This simple generates
88-
a class that extends the generic class, wrapping the expected `run` method with all
89-
standard Worker implementations.
90-
2. Explicit: The preferred way of accepting Worker implementations. An extension of the Worker
91-
abstract class is provided to the Node such that the provided Worker implementation
92-
can be directly initialized and executed.
9383
"""
9484
# Return early if retry triggered and wait time has not yet fully elapsed
95-
if self._must_wait and (time.time() - self._wait_start) < self._retry_wait_time:
85+
if time.time() < self._wait_until:
9686
return
9787

9888
self._proc_retcode = multiprocessing.Value('i', 0)
9989
self._proc_retcode.value = 0
100-
self._must_wait = False
10190
self._attempts += 1
10291

10392
if not self._start_time:
@@ -115,8 +104,8 @@ def execute(self):
115104
raise TypeError('{}.{} is not an extension of pyrunner.Worker'.format(self.module, self.worker))
116105

117106
# Launch the "run" method of the provided Worker under a new process.
118-
self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False)
119-
self._thread.start()
107+
self._proc = multiprocessing.Process(target=worker.protected_run, daemon=False)
108+
self._proc.start()
120109
except Exception as e:
121110
logger = lg.FileLogger(self.logfile)
122111
logger.open()
@@ -137,36 +126,35 @@ def poll(self, wait=False):
137126
Returns:
138127
Integer return code if process has exited, otherwise `None`.
139128
"""
140-
if not self._thread:
129+
if not self._proc:
141130
self.retcode = 905
142131
return self.retcode
143132

144-
running = self._thread.is_alive()
133+
running = self._proc.is_alive()
145134

146135
if not running or wait:
147136
# Note that if wait is True, then the join() method is invoked immediately,
148137
# causing the thread to block until it's job is complete.
149-
self._thread.join()
138+
self._proc.join()
150139
self._end_time = time.time()
151140
self.retcode = self._proc_retcode.value
152-
if self._proc_retcode.value > 0:
153-
if self._attempts < self.max_attempts:
154-
logger = lg.FileLogger(self.logfile)
155-
logger.open(False)
156-
logger.info('Waiting {} seconds before retrying...'.format(self._retry_wait_time))
157-
self._must_wait = True
158-
self._wait_start = time.time()
159-
logger.restart_message(self._attempts)
160-
logger.close()
161-
self.retcode = -1
141+
if self.retcode > 0 and (self._attempts < self.max_attempts):
142+
logger = lg.FileLogger(self.logfile)
143+
logger.open(False)
144+
logger.info('Waiting {} seconds before retrying...'.format(self._retry_wait_time))
145+
self._wait_until = time.time() + self._retry_wait_time
146+
logger.restart_message(self._attempts)
147+
logger.close()
148+
self.retcode = -1
162149
self.cleanup()
163150
elif (time.time() - self._start_time) >= self._timeout:
164-
self._thread.terminate()
151+
self._proc.terminate()
165152
running = False
166153
logger = lg.FileLogger(self.logfile)
167154
logger.open(False)
168155
logger.error('Worker runtime has exceeded the set maximum/timeout of {} seconds.'.format(self._timeout))
169156
logger.close()
157+
self.cleanup()
170158
self.retcode = 906
171159

172160
return self.retcode if (not running or wait) else None
@@ -175,18 +163,18 @@ def terminate(self):
175163
"""
176164
Immediately terminates the Worker, if running.
177165
"""
178-
if self._thread.is_alive():
179-
self._thread.terminate()
166+
if self._proc.is_alive():
167+
self._proc.terminate()
180168
logger = lg.FileLogger(self.logfile)
181169
logger.open(False)
182170
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
183171
logger.close()
184-
self.cleanup()
185172
self.retcode = 907
173+
self.cleanup()
186174
return
187175

188176
def cleanup(self):
189-
self._thread = None
177+
self._proc = None
190178
self._proc_retcode = None
191179
self._context = None
192180

pyrunner/logger/file.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ def close(self):
6969
self.logfile_handle.write("# LOG END - {}\n".format(datetime.now()))
7070
self.logfile_handle.write("############################################################################\n\n")
7171
self.logfile_handle.close()
72-
self.logfile_handle = None
7372
return
7473

7574
def get_file_handle(self):

pyrunner/worker/abstract.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ def protected_run(self):
104104
self.logger.error(traceback.format_exc())
105105
self.retcode = 904
106106

107-
if sys.stdout: sys.stdout.close()
108-
if sys.stderr: sys.stderr.close()
109107
self.logger.close()
110108
self.logger = None
111109

0 commit comments

Comments
 (0)