Skip to content

Commit 7b5eea6

Browse files
authored
Merge pull request #10 from Comcast/v5
V5
2 parents bc6d688 + d40deb3 commit 7b5eea6

35 files changed

Lines changed: 935 additions & 440 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ dist
66
.eggs
77
*.egg-info
88
MANIFEST
9+
build

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# python-batch-runner
2+
[![Documentation Status](https://readthedocs.org/projects/python-batch-runner/badge/?version=latest)](https://python-batch-runner.readthedocs.io/en/latest/?badge=latest)
3+
4+
For more complete documentation, please see: https://python-batch-runner.readthedocs.io/
5+
26
python-batch-runner is a microframework to assist with building small to medium scale batch applications without needing to build the scaffolding from scratch.
37

48
It provides the following with zero or minimal setup required:
@@ -92,4 +96,4 @@ Upon above setup, the following three files will be generated:
9296
Please read the CONTRIBUTING file for more details.
9397

9498
## License
95-
python-batch-runner is released under the Apache 2.0 License. Please read the LICENSE file for more details.
99+
python-batch-runner is released under the Apache 2.0 License. Please read the LICENSE file for more details.

docs/css/overrides.css

Whitespace-only changes.

docs/design/config.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Config
2+
Centralized configuration system. The Config class emulates container objects such that key value pairs can be accessed much like the way they are accessed in dictionaries.
3+
4+
```python
5+
In [1]: from pyrunner import PyRunner
6+
7+
In [2]: pr = PyRunner()
8+
9+
In [3]: pr.config['app_name'] = 'MyNewApp'
10+
11+
In [4]: pr.config['app_name']
12+
Out[3]: 'MyNewApp'
13+
```
14+
15+
Config attributes are typed and additionally have multiple values that each may reference.
16+
The value assigned to a given attribute is taken from the first not-None source, checked in the following order:
17+
18+
1. Manually assigned value
19+
2. Environment variable
20+
3. Hard-coded default
21+
22+
```python
23+
In [1]: pr.config['app_name']
24+
Out[1]: 'PyrunnerApp_5aa9a44c-a252-4505-89f9-b2e741bc1262'
25+
26+
In [2]: os.environ['APP_NAME'] = 'MyNewApp'
27+
28+
In [3]: pr.config['app_name']
29+
Out[3]: 'MyNewApp'
30+
31+
In [4]: pr.config['app_name'] = 'HelloWorldApp'
32+
Out[4]: 'HelloWorldApp'
33+
34+
In [5]: del pr.config['app_name']
35+
36+
In [6]: pr.config['app_name']
37+
Out[6]: 'MyNewApp'
38+
```

docs/design/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Design Overview
2+
![overview](../img/pyrunner_app_code.svg)

docs/draw.io/pyrunner_flow_diagram

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
<mxfile host="Electron" modified="2020-02-23T15:43:49.231Z" agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/12.6.5 Chrome/80.0.3987.86 Electron/8.0.0 Safari/537.36" version="12.6.5" etag="WK6qA54_3CSCaVWJjH_s" type="device"><diagram id="bdc2347d-bb37-9d5b-0eaa-ac196fc5e478" name="Page-1">7V1bc6M6Ev41rtp9SErc8WOO15k9VTOZ1OTU7s6+TBFbtrUHIxaw4+yvXwmEDUjYJAbExZ6a2AghpO7+WlKr1Zpos+3hS+D4m294Cd2JCpaHifa3iaqqwDTJF015Zym2aSQp6wAtkzRwSnhB/4NJopKm7tAShiwtSYowdiPk5xMX2PPgIsqlOUGA3/LZVthd5hJ8Zw1z1aAJLwvHhVy2f6JltElSbSOT++8QrTfpm3XA7rw6iz/XAd557H0TVVvFn+T21knLYvnDjbPEb5kkbT7RZgHGUfJre5hBlxI3T7bHkrvHegfQi6o8oDG+hNF72na4JKRglx72yNdvcYMgfQSQq020dclPhfyEBxT9iybfG+zqZ3rHi4L3zC16+ZMV8B8YRe+M6c4uwiQJB9EGr7HnuF8x9lkZK+xFLJuisusZdnEQV1QD8Yek841mdAjxLliwZqlMjJxgDVkuLUmiDc48xgj1BeItJJUmGQLoOhHa52XDYSK2PuY7kZn8YJQWU53VZe+4O5jKielGrIk5dpj/3eH0xl0YE+OBZFBU/3C6SX6t6fdL0l5SOd//5Qd4hUgprGRSp6TwJCvH9jyP3zYogi++E1PvjaA8z/cwCvCfUMSLqjxzXLT2SNqC8AwGNCNy3UxGhpozzN3DIIKHs4xjd7VU+zB1dKcoBkt5O6FbSTG5ySDbBNdzWzPHgjGNx5gNZIFMawhk84NPKEluz76RR8FX5FHEPQd4jyj/VPAPJwhLUZckvwZpyl/QirbTe/9r5xDZCPKA1SbyrLEgz+CRZ8oCnsEB75kggoFkAcMwRk1IIfQYd1AjkHu7TbEnIGtI7qXIdu2CzB59xijuBxjPdLXAM0vR82Uk6GKPZQfVF0tSp/mSkuZxJcWMPbaoEq9NDmy/7ZBLu6H5AS52EcIe+R3P00YBNGvaItB0wNH0QzjrQi+SmxBN24KameeaphS4UQKQyyWRSXol0H4CapZgQFmKKEUKopZOuDm+vgZ4pQaPk0rk0WVaAnTpwCgXisrTZB5eQxir2TzwFGmzpGMFBz9EFpFdkUV2m9MlAj64LvJDeFmROKGfGEFX6ECZ1NG+WgUFXW2mVtOMNtEEykSroatW9YbkXIosp/oiJ8zyzJmjMSIL6a7Lontam4wW4YckQ9MiiqXktYhhtKhFBPQdqKQLpgeKKk3SUzE5Sbo2eEk/LhzIkPSU1xmKC5YxBk/xNkcoo1kmUgTrROnEUoJu4ReKBOOZgUk634u2Ken2aCRd75Sk65ykC2ZFQ5N0RaakT0cj6YIVSEXaEqR2pWG+53S3pNF9NNZEId1taZqdX3MXLAEPTbPrBc2utzk/4hdeBbp+6BTX7BYpzq+/CcaRA6O4LZPggkUKQb86MIqrQCLJtXKnxSXa08JZi09uiS5cRbyT4tHRME0ocb0CP6AfwJCQy0lcRYr+iRlHxldBWlyrfOqVFe1QFZif6MphnX764IyMAxCRNxU8wTe+PGVize7oH/r/giOoqE111aPu8gBpDhWjDzUJfCdDPmt2f38v+HuO1aKiP1lxtRovriTzFdJymaJDkJ1PNEdiyztCuA8DriQr+OMNX8LhRWf1K9uidRmH6ihwaN6AKB2ImwBe7BKbhqJOXldB4uWwThsFFI0bEiUj8ZG8RzYQjS4Dsdtj046IY9fLa67Hrw+I1BYuGYgmeV2FjucGxBsQhwvEF3SQjUOLvE6fqLMKw6MbFvsorAPXQvVhEe6hJxuNNnmddUNij+Wx6+X1oFecx4t8kpE4vSHxhsSxI/EJXV5NbBqICrghcbiiOnAdVN8qxuWx6QlxylBdIvoj1wnjwdytMquo7JBRxSWn4C8WwQNNPxcHw4w/lIqFmIAG/TcR+H1R9y60cNwHdmOLlku3zCct735Mm3WVV/AHgtXYhWA1JuDDaWhA533LjluSrnIu00udy8rk93KIPFscIo8LPXSVYH1QhJibeVZ6WFJPBecYaOgoOIIoR+pU4JSo1uCUqPNOiU+Y9wPtnZ9/nI/VWBA4VxTfkrl4Z53+dXUiyenf4p3+f8LwLGNO9JqfUivwKcOlDGOOt45ciu91k09plKKaY1s9BIHznsng00BTYabkQsAq1Si4F6chmMsiXBUfsBWjICRJFT4b1SqlVUaKHjzaou+Bv3GoBn/CcSzuR06ygg3evu7CEm15rodfmq+mYX5ABs5HuuJjB6uLRU19tq7fF3ptVRTL0dR45WvXoHyN87vCxYBe7IL9kTh9iWmTKtIsaA1pe2XT2mRg8buHIuRE1Gs+O8CJ6cFB4yOhFXOyTC+enYhIuZfoO9Dl6IuFeAmK1mL0RYPXXA31f8qktP87Dlq61P+l1G0WSuL+SrenOZkwCjOXksiOH+1HjcJWNPN8L1rMfqfW24sa50MaNKOmO7cptjUZK0T9LM6O64v5afBj7Gcc52DzWrDC1JrzEjnRjtc9g4iwy0Uhak/Fm5Wmnh0Y4ZyDYj3629TlYEvRGwNX2swMd3/sPA9569usozjrUFucc6SxgXpt3akGNF0ANGlRS0zeQPsFejBI5hz3i8h1MYXGcE9NKFE9bXQ11vkzEz5rOesBBkweA5Y8DPBxNf6N/HiaTUXf8ZYcFEYx7BJ2AI1hQRBcs7/Bkk1B5G+z9pMOKgs4H1XjBcZS/YQjtEILFogBzLcOcscg2nqroq33RLTP9RPVxF6wCDKtfRJRWex5Y933+JiaRK/PXOh4uyvtqP0Qd0Ow6N+cuPNTrDlVNrWS+dU2aOitK8hcnErZC1jbVKpAfstuk/y8sm9qibZnS+mWYNRpSoufZ/Ha6YKLw0hWEoRskjZ2svjJwcPz86/5t4ffv07ojpnoZjBK9Zwm6maashilYeoax0/f1JwtWomTBR+bt6a37zDUSTUnYpOtSuOTwC4OQ1I3aqZ/dJ31eLWcVnTGmYrOe2xMzfEG2vbVXCfxo3ZJzfGjBAl+kZ3sjQRssuXxSeX4VFzjAPMDCqkbCECC2KR/wK0/VlWoaIZcVfgZh5eO671qGOrSUeu2wD2FnbU+/OVChUNAq6esT8+vkvdyglMNASLDsrQDbG3edPOSDJVrlXhbfdXMq3qGQmewNKC91GsaFxdPQVeOKa0gge/Ib9azLCo+tQ+F8fYO3E9BGhe+2b0pVlq3oxhd2pxSfOLs5pTT02mF8GoVwmu9yKbtWTV6ZnwSiZ80o8aUN2pwZxLE3h3FUb7j+7/8AK+SCQFz+ADk2T2i3OLz09sPwXrEzoOclURpdc/SlF/1ulmDY8II3A2n0g7rUgAPydsGmlJG1b+BtDKeeLNjskXz0SHaUKACE9/pxGiS3atWzJfsYRitmiwumRltGlCm54+G7KVGrIYtq1PY4k8V+wG3eE/HGrF9cUQ+plabnklKes5W9ngxgUllYMeLaWphjmW2eBytAnjvxyGoFAWovE6RdxhnCqPh758REv6IbBmk501Rd40qlU7bBdXi3k3TbFPZnA9d8TkYXHKNlgIDTTBd0NpytSaXAabht072Mzqs/0bmADTH/wE=</diagram></mxfile>

docs/example.md

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
Traditionally, batch processes/applications are commonly built as top-down scripts.
2+
3+
It is not uncommon to see even long-running batch processes implemented as long top-down Shell/Python scripts or a series of individual scripts to be executed in specific order.
4+
5+
In the event of a failure, processes may need to be resumed/restarted from the specific point of failure, rather than from the start. To do this, state management must either be implemented or a larger orchestration tool may be used.
6+
7+
# Sample Python Script
8+
9+
Let's take a very simple (and very much contrived) example:
10+
11+
```python
12+
import urllib.request
13+
import pandas as pd
14+
from datetime import datetime
15+
16+
def emit_slack_message(message):
17+
# Assume implementation
18+
# ...
19+
20+
def load_to_mysql(df):
21+
# Assume implementation
22+
# ...
23+
24+
def write_as_parquet(df, path):
25+
# Assume implementation
26+
# ...
27+
28+
# Broadcast job start notification
29+
print('Starting example batch process')
30+
run_date_str = datetime.now().strftime('%Y-%m-%d')
31+
emit_slack_message('Starting daily data download for {}'.format(run_date_str))
32+
33+
# Download data file 1
34+
print('Initiating File Download 1')
35+
url1 = 'https://some_file_url_here_1'
36+
urllib.request.urlretrieve(url1, '/landing/data1.csv')
37+
38+
# Download data file 2
39+
print('Initiating File Download 2')
40+
url2 = 'https://some_file_url_here_2'
41+
urllib.request.urlretrieve(url2, '/landing/data2.csv')
42+
43+
# Load file 1 to MYSQL
44+
print('Loading MYSQL table')
45+
df1 = pd.read_csv('/landing/data1.csv')
46+
load_to_mysql(df1)
47+
48+
# Write combined file as parquet
49+
print('Writing Parquet file')
50+
df2 = pd.read_csv('/landing/data2.csv')
51+
write_as_parquet(df2, '/data/extract_{}.parquet'.format(run_date_str))
52+
53+
# Broadcast job end notification
54+
print('Completed example batch process')
55+
emit_slack_message('Successfully completed daily data download for {}'.format(run_date_str))
56+
```
57+
58+
As with any script, all steps execute in serial fashion and upon failure, a restart will run all steps again.
59+
60+
# Worker Classes
61+
62+
We can instead convert this into a PyRunner application with minor changes by separating each logical step into Worker classes:
63+
64+
```python
65+
# <app_root_dir>/python/workers.py
66+
67+
import urllib.request
68+
import pandas as pd
69+
from datetime import datetime
70+
from pyrunner import Worker
71+
72+
def emit_slack_message(message):
73+
# Assume implementation
74+
# ...
75+
76+
def load_to_mysql(df):
77+
# Assume implementation
78+
# ...
79+
80+
def write_as_parquet(df, path):
81+
# Assume implementation
82+
# ...
83+
84+
# Broadcast job start notification
85+
class Start(Worker):
86+
def run(self):
87+
# Print function also works, but we can take advantage of
88+
# advanced features with the provided logger.
89+
self.logger.info('Starting example batch process')
90+
91+
run_date_str = datetime.now().strftime('%Y-%m-%d')
92+
emit_slack_message('Starting daily data download for {}'.format(run_date_str))
93+
94+
# The self.context is a special thread-safe shared dictionary,
95+
# which can be read or modified from any Worker.
96+
self.context['run_date_str'] = run_date_str
97+
98+
# Download data file 1
99+
class DownloadFile1(Worker):
100+
def run(self):
101+
self.logger.info('Initiating File Download 1')
102+
url = 'https://some_file_url_here_1'
103+
urllib.request.urlretrieve(url, '/landing/data1.csv')
104+
105+
# Download data file 2
106+
class DownloadFile2(Worker):
107+
def run(self):
108+
self.logger.info('Initiating File Download 2')
109+
url = 'https://some_file_url_here_2'
110+
urllib.request.urlretrieve(url, '/landing/data2.csv')
111+
112+
# Load file 1 to MYSQL
113+
class LoadMySQL(Worker):
114+
def run(self):
115+
self.logger.info('Loading MySQL table')
116+
df = pd.read_csv('/landing/data1.csv')
117+
load_to_mysql(df)
118+
119+
# Write file 2 as parquet
120+
class WriteParquet(Worker):
121+
def run(self):
122+
self.logger.info('Writing Parquet file')
123+
124+
# Once again, we are accessing the shared dictionary, this time
125+
# to read the value originally set by the Start Worker.
126+
run_date_str = self.context['run_date_str']
127+
df = pd.read_csv('/landing/data2.csv')
128+
write_as_parquet(df, '/data/extract_{}.parquet'.format(run_date_str))
129+
130+
# Broadcast job end notification
131+
class End(Worker):
132+
def run(self):
133+
self.logger.info('Completed example batch process')
134+
run_date_str = self.context['run_date_str']
135+
emit_slack_message('Successfully completed daily data download for {}'.format(run_date_str))
136+
```
137+
138+
# Task List File
139+
140+
To specify the order of execution of Workers above, we need to express in a `.lst` (to be placed in `<app_root_dir>/config/my_sample_app.lst`) file like:
141+
142+
```bash
143+
#PYTHON
144+
#ID|PARENT_IDS|MAX_ATTEMPTS|RETRY_WAIT_TIME|PROCESS_NAME |MODULE_NAME|WORKER_NAME |ARGUMENTS|LOGFILE
145+
1 |-1 |1 |0 |Start Job Notification|workers |Start | |$ENV{APP_LOG_DIR}/start.log
146+
2 |1 |1 |0 |Download File 1 |workers |DownloadFile1| |$ENV{APP_LOG_DIR}/download_1.log
147+
3 |1 |1 |0 |Download File 2 |workers |DownloadFile2| |$ENV{APP_LOG_DIR}/download_2.log
148+
4 |2 |1 |0 |Load MySQL Table |workers |LoadMySQL | |$ENV{APP_LOG_DIR}/load_mysql.log
149+
5 |3 |1 |0 |Write Parquet File |workers |WriteParquet | |$ENV{APP_LOG_DIR}/write_parquet.log
150+
6 |4,5 |1 |0 |End Job Notification |workers |End | |$ENV{APP_LOG_DIR}/end.log
151+
```
152+
153+
A visualization of the above `.lst` file:
154+
155+
![sample_flow](../img/sample_flow.svg)
156+
157+
With the above two pieces, we now have the necessary code to run the original script as a PyRunner application.

docs/img/execution_graph.png

116 KB
Loading

docs/img/project_structure.png

14.7 KB
Loading

0 commit comments

Comments
 (0)