-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
109 lines (74 loc) · 3.54 KB
/
server.py
File metadata and controls
109 lines (74 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import sched
import threading
import time
import requests
from requests.compat import urljoin
from flask import Flask, request, abort, Response, jsonify
from flask_cors import CORS
from models import DownloadSpec, DownloadRequest
from sources_manager import SourcesManager
REQUEST_KEEPING_TIMEOUT_SEC = 60 * 60 * 12 # 12 hours
app = Flask(__name__)
app.url_map.strict_slashes = False
CORS(app)
request_id_to_agent = {}
scheduler = sched.scheduler(time.time, time.sleep)
sources_manager = SourcesManager()
def _build_url(host, port, path, protocol='http'):
return urljoin(f'{protocol}://{host}:{port}', path)
def _build_new_response_from_agent_response(agent_response):
return Response(response=agent_response.content,
status=agent_response.status_code,
headers=dict(agent_response.headers))
def _forward_request_by_id(request_id):
if request_id not in request_id_to_agent:
abort(404)
agent_host, agent_port = request_id_to_agent[request_id]
agent_url = _build_url(agent_host, agent_port, request.path)
agent_response = requests.request(request.method, agent_url, headers=request.headers, data=request.data)
return _build_new_response_from_agent_response(agent_response)
def _delete_request(request_id):
del request_id_to_agent[request_id]
def _schedule_request_deletion(request_id):
scheduler.enter(REQUEST_KEEPING_TIMEOUT_SEC, 1, _delete_request, (request_id,))
scheduler.run()
def _get_agent_host_port_by_spec(spec: DownloadSpec):
try:
if not spec:
abort(400, 'Download spec must be specified.')
agent_key = sources_manager.get_agent_key_for_spec(spec)
except ValueError:
abort(400, 'No agent can satisfy the given spec. Head over to /sources to list supported sources.')
return sources_manager.get_host_port_for_agent(agent_key)
@app.route('/search', methods=['POST'])
def search():
spec = DownloadSpec.from_dict(request.json)
agent_host, agent_port = _get_agent_host_port_by_spec(spec)
agent_url = _build_url(agent_host, agent_port, request.path)
agent_response = requests.request(request.method, agent_url, headers=request.headers, data=request.data)
return _build_new_response_from_agent_response(agent_response)
@app.route('/download', methods=['POST'])
def submit_download_request():
dl_req = DownloadRequest.from_dict(request.json)
agent_host, agent_port = _get_agent_host_port_by_spec(dl_req.spec)
agent_url = _build_url(agent_host, agent_port, request.path)
agent_response = requests.request(request.method, agent_url, headers=request.headers, data=request.data)
resp_json = agent_response.json()
returned_dl_req = DownloadRequest.from_dict(resp_json)
request_id_to_agent[returned_dl_req.id] = (agent_host, agent_port)
job_thread = threading.Thread(target=_schedule_request_deletion, args=(returned_dl_req.id,))
job_thread.start()
return _build_new_response_from_agent_response(agent_response)
@app.route('/download/<string:request_id>', methods=['GET'])
def get_download_request(request_id):
return _forward_request_by_id(request_id)
@app.route('/download/<string:request_id>/files', methods=['GET'])
def get_download_files(request_id):
response = _forward_request_by_id(request_id)
response.headers['Access-Control-Expose-Headers'] = 'Content-Disposition'
return response
@app.route('/sources', methods=['GET'])
def get_supported_sources():
return jsonify(sources_manager.get_supported_sources())
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)