Skip to content

Commit 45c31e6

Browse files
committed
add parallel execution of pipeline using Celery
1 parent 3d8dd92 commit 45c31e6

4 files changed

Lines changed: 39 additions & 29 deletions

File tree

backend/main.py

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
1+
import shortuuid
12
from fastapi import FastAPI, UploadFile, File, Form
23
from fastapi.middleware.cors import CORSMiddleware
3-
import shortuuid
44

5-
from backend.file_utils.files import reset_dirs
6-
from backend.utils.docker_runner import run_pipeline
7-
from backend.utils.data_uploader import handle_upload
8-
from backend.utils.methods_handler import Methods
9-
from backend.utils.results_sender import send_folder_by_email
105
from backend.config.paths import (
11-
RESULTS_DIR,
126
DATASET_FILE,
137
METHODS_TS_FILE,
14-
get_thread_filepath,
15-
get_tmp_thread_files
168
)
9+
from backend.utils.data_uploader import handle_upload
10+
from backend.utils.methods_handler import Methods
11+
from backend.utils.task import process_and_cleanup_task
1712

1813
dataset_methods = Methods(data_filepath=DATASET_FILE, output_filepath=METHODS_TS_FILE)
1914

@@ -38,22 +33,13 @@ async def handle_submit(
3833

3934
handle_upload(task_uid, file, methods, dataset_methods)
4035

41-
run_pipeline(task_uid)
42-
43-
send_folder_by_email(
44-
email,
45-
get_thread_filepath(task_uid, RESULTS_DIR),
46-
experiment,
47-
file.filename,
48-
)
49-
50-
tmp_thread_dirs = get_tmp_thread_files(task_uid)
51-
reset_dirs(tmp_thread_dirs)
36+
process_and_cleanup_task.delay(task_uid, email, experiment, file.filename)
5237

5338
return {
39+
"task_uid": task_uid,
5440
"experiment": experiment,
5541
"filename": file.filename,
5642
"email": email,
5743
"methods": methods,
58-
"message": "Data successfully uploaded",
44+
"message": "Data uploaded, processing started",
5945
}

backend/utils/data_uploader.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77

88

99
def handle_upload(
10-
uid,
11-
file,
12-
methods: str,
13-
dataset_methods: Methods,
10+
uid,
11+
file,
12+
methods: str,
13+
dataset_methods: Methods,
1414
) -> None:
1515
selection_methods = json.loads(methods)
1616

backend/utils/results_sender.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ def require_env(name: str) -> str:
1515

1616

1717
def send_folder_by_email(
18-
to_email: str,
19-
folder_path: str,
20-
experiment_name: str,
21-
model_name: str,
18+
to_email: str,
19+
folder_path: str,
20+
experiment_name: str,
21+
model_name: str,
2222
):
2323
load_dotenv()
2424
from_email = require_env("EMAIL")

backend/utils/task.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from celery import Celery
2+
from backend.utils.docker_runner import run_pipeline
3+
from backend.utils.results_sender import send_folder_by_email
4+
from backend.config.paths import RESULTS_DIR, get_thread_filepath, get_tmp_thread_files
5+
from backend.file_utils.files import reset_dirs
6+
7+
celery_app = Celery(
8+
"tasks", broker="redis://localhost:6379", backend="redis://localhost:6379"
9+
)
10+
11+
12+
@celery_app.task
13+
def process_and_cleanup_task(task_uid: str, email: str, experiment: str, filename: str):
14+
run_pipeline(task_uid)
15+
16+
send_folder_by_email(
17+
email,
18+
get_thread_filepath(task_uid, RESULTS_DIR),
19+
experiment,
20+
filename,
21+
)
22+
23+
tmp_thread_dirs = get_tmp_thread_files(task_uid)
24+
reset_dirs(tmp_thread_dirs)

0 commit comments

Comments
 (0)