Skip to content

Commit 092f170

Browse files
committed
cherrypick for 2.15
1 parent 031cb09 commit 092f170

2 files changed

Lines changed: 74 additions & 50 deletions

File tree

sdks/python/apache_beam/runners/portability/fn_api_runner.py

Lines changed: 69 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,11 +1134,12 @@ class GrpcServer(object):
11341134

11351135
_DEFAULT_SHUTDOWN_TIMEOUT_SECS = 5
11361136

1137-
def __init__(self, state, provision_info):
1137+
def __init__(self, state, provision_info, max_workers):
11381138
self.state = state
11391139
self.provision_info = provision_info
1140+
self.max_workers = max_workers
11401141
self.control_server = grpc.server(
1141-
futures.ThreadPoolExecutor(max_workers=10))
1142+
futures.ThreadPoolExecutor(max_workers=self.max_workers))
11421143
self.control_port = self.control_server.add_insecure_port('[::]:0')
11431144
self.control_address = 'localhost:%s' % self.control_port
11441145

@@ -1148,12 +1149,12 @@ def __init__(self, state, provision_info):
11481149
no_max_message_sizes = [("grpc.max_receive_message_length", -1),
11491150
("grpc.max_send_message_length", -1)]
11501151
self.data_server = grpc.server(
1151-
futures.ThreadPoolExecutor(max_workers=10),
1152+
futures.ThreadPoolExecutor(max_workers=self.max_workers),
11521153
options=no_max_message_sizes)
11531154
self.data_port = self.data_server.add_insecure_port('[::]:0')
11541155

11551156
self.state_server = grpc.server(
1156-
futures.ThreadPoolExecutor(max_workers=10),
1157+
futures.ThreadPoolExecutor(max_workers=self.max_workers),
11571158
options=no_max_message_sizes)
11581159
self.state_port = self.state_server.add_insecure_port('[::]:0')
11591160

@@ -1297,6 +1298,11 @@ def stop_worker(self):
12971298
self.worker_thread.join()
12981299

12991300

1301+
# The subprocesses module is not threadsafe on Python 2.7. Use this lock to
1302+
# prevent concurrent calls to POpen().
1303+
SUBPROCESS_LOCK = threading.Lock()
1304+
1305+
13001306
@WorkerHandler.register_environment(python_urns.SUBPROCESS_SDK, bytes)
13011307
class SubprocessSdkWorkerHandler(GrpcWorkerHandler):
13021308
def __init__(self, worker_command_line, state, provision_info, grpc_server):
@@ -1326,48 +1332,50 @@ def __init__(self, payload, state, provision_info, grpc_server):
13261332
self._container_id = None
13271333

13281334
def start_worker(self):
1329-
try:
1330-
subprocess.check_call(['docker', 'pull', self._container_image])
1331-
except Exception:
1332-
logging.info('Unable to pull image %s' % self._container_image)
1333-
self._container_id = subprocess.check_output(
1334-
['docker',
1335-
'run',
1336-
'-d',
1337-
# TODO: credentials
1338-
'--network=host',
1339-
self._container_image,
1340-
'--id=%s' % uuid.uuid4(),
1341-
'--logging_endpoint=%s' % self.logging_api_service_descriptor().url,
1342-
'--control_endpoint=%s' % self.control_address,
1343-
'--artifact_endpoint=%s' % self.control_address,
1344-
'--provision_endpoint=%s' % self.control_address,
1345-
]).strip()
1346-
while True:
1347-
logging.info('Waiting for docker to start up...')
1348-
status = subprocess.check_output([
1349-
'docker',
1350-
'inspect',
1351-
'-f',
1352-
'{{.State.Status}}',
1353-
self._container_id]).strip()
1354-
if status == 'running':
1355-
break
1356-
elif status in ('dead', 'exited'):
1357-
subprocess.call([
1335+
with SUBPROCESS_LOCK:
1336+
try:
1337+
subprocess.check_call(['docker', 'pull', self._container_image])
1338+
except Exception:
1339+
logging.info('Unable to pull image %s' % self._container_image)
1340+
self._container_id = subprocess.check_output(
1341+
['docker',
1342+
'run',
1343+
'-d',
1344+
# TODO: credentials
1345+
'--network=host',
1346+
self._container_image,
1347+
'--id=%s' % uuid.uuid4(),
1348+
'--logging_endpoint=%s' % self.logging_api_service_descriptor().url,
1349+
'--control_endpoint=%s' % self.control_address,
1350+
'--artifact_endpoint=%s' % self.control_address,
1351+
'--provision_endpoint=%s' % self.control_address,
1352+
]).strip()
1353+
while True:
1354+
logging.info('Waiting for docker to start up...')
1355+
status = subprocess.check_output([
13581356
'docker',
1359-
'container',
1360-
'logs',
1361-
self._container_id])
1362-
raise RuntimeError('SDK failed to start.')
1363-
time.sleep(1)
1357+
'inspect',
1358+
'-f',
1359+
'{{.State.Status}}',
1360+
self._container_id]).strip()
1361+
if status == 'running':
1362+
break
1363+
elif status in ('dead', 'exited'):
1364+
subprocess.call([
1365+
'docker',
1366+
'container',
1367+
'logs',
1368+
self._container_id])
1369+
raise RuntimeError('SDK failed to start.')
1370+
time.sleep(1)
13641371

13651372
def stop_worker(self):
1366-
if self._container_id:
1367-
subprocess.call([
1368-
'docker',
1369-
'kill',
1370-
self._container_id])
1373+
with SUBPROCESS_LOCK:
1374+
if self._container_id:
1375+
subprocess.call([
1376+
'docker',
1377+
'kill',
1378+
self._container_id])
13711379

13721380

13731381
class WorkerHandlerManager(object):
@@ -1384,9 +1392,24 @@ def get_worker_handlers(self, environment_id, num_workers):
13841392
environment_id = next(iter(self._environments.keys()))
13851393
environment = self._environments[environment_id]
13861394
# assume it's using grpc if environment is not EMBEDDED_PYTHON.
1387-
if environment.urn != python_urns.EMBEDDED_PYTHON and \
1388-
self._grpc_server is None:
1389-
self._grpc_server = GrpcServer(self._state, self._job_provision_info)
1395+
max_total_workers = num_workers * len(self._environments)
1396+
1397+
# assume all environments except EMBEDDED_PYTHON use gRPC.
1398+
if environment.urn == python_urns.EMBEDDED_PYTHON:
1399+
pass # no need for a gRPC server
1400+
elif self._grpc_server is None:
1401+
self._grpc_server = GrpcServer(self._state, self._job_provision_info,
1402+
max_total_workers)
1403+
elif max_total_workers > self._grpc_server.max_workers:
1404+
# each gRPC server is running with fixed number of threads (
1405+
# max_total_workers), which is defined by the first call to
1406+
# get_worker_handlers(). Assumption here is a worker has a connection to a
1407+
# gRPC server. In case a stage tries to add more workers
1408+
# than the max_total_workers, some workers cannot connect to gRPC and
1409+
# pipeline will hang, hence raise an error here.
1410+
raise RuntimeError('gRPC servers are running with %s threads, we cannot '
1411+
'attach %s workers.' % (self._grpc_server.max_workers,
1412+
max_total_workers))
13901413
worker_handler_list = self._cached_handlers[environment_id]
13911414
if len(worker_handler_list) < num_workers:
13921415
for _ in range(len(worker_handler_list), num_workers):

sdks/python/apache_beam/runners/portability/local_job_service.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,11 @@ def run(self):
200200
if self._worker_id:
201201
env_dict['WORKER_ID'] = self._worker_id
202202

203-
p = subprocess.Popen(
204-
self._worker_command_line,
205-
shell=True,
206-
env=env_dict)
203+
with fn_api_runner.SUBPROCESS_LOCK:
204+
p = subprocess.Popen(
205+
self._worker_command_line,
206+
shell=True,
207+
env=env_dict)
207208
try:
208209
p.wait()
209210
if p.returncode:

0 commit comments

Comments
 (0)