Skip to content

Commit b9b9221

Browse files
committed
cherrypick for 2.15
1 parent 092f170 commit b9b9221

1 file changed

Lines changed: 7 additions & 5 deletions

File tree

sdks/python/apache_beam/runners/portability/fn_api_runner.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,7 +1344,7 @@ def start_worker(self):
13441344
# TODO: credentials
13451345
'--network=host',
13461346
self._container_image,
1347-
'--id=%s' % uuid.uuid4(),
1347+
'--id=%s' % self.worker_id,
13481348
'--logging_endpoint=%s' % self.logging_api_service_descriptor().url,
13491349
'--control_endpoint=%s' % self.control_address,
13501350
'--artifact_endpoint=%s' % self.control_address,
@@ -1359,6 +1359,8 @@ def start_worker(self):
13591359
'{{.State.Status}}',
13601360
self._container_id]).strip()
13611361
if status == 'running':
1362+
logging.info('Docker container is running. container_id = %s, '
1363+
'worker_id = %s', self._container_id, self.worker_id)
13621364
break
13631365
elif status in ('dead', 'exited'):
13641366
subprocess.call([
@@ -1367,11 +1369,11 @@ def start_worker(self):
13671369
'logs',
13681370
self._container_id])
13691371
raise RuntimeError('SDK failed to start.')
1370-
time.sleep(1)
1372+
time.sleep(1)
13711373

13721374
def stop_worker(self):
1373-
with SUBPROCESS_LOCK:
1374-
if self._container_id:
1375+
if self._container_id:
1376+
with SUBPROCESS_LOCK:
13751377
subprocess.call([
13761378
'docker',
13771379
'kill',
@@ -1391,7 +1393,6 @@ def get_worker_handlers(self, environment_id, num_workers):
13911393
# Any environment will do, pick one arbitrarily.
13921394
environment_id = next(iter(self._environments.keys()))
13931395
environment = self._environments[environment_id]
1394-
# assume it's using grpc if environment is not EMBEDDED_PYTHON.
13951396
max_total_workers = num_workers * len(self._environments)
13961397

13971398
# assume all environments except EMBEDDED_PYTHON use gRPC.
@@ -1410,6 +1411,7 @@ def get_worker_handlers(self, environment_id, num_workers):
14101411
raise RuntimeError('gRPC servers are running with %s threads, we cannot '
14111412
'attach %s workers.' % (self._grpc_server.max_workers,
14121413
max_total_workers))
1414+
14131415
worker_handler_list = self._cached_handlers[environment_id]
14141416
if len(worker_handler_list) < num_workers:
14151417
for _ in range(len(worker_handler_list), num_workers):

0 commit comments

Comments
 (0)