@@ -1134,11 +1134,12 @@ class GrpcServer(object):
11341134
11351135 _DEFAULT_SHUTDOWN_TIMEOUT_SECS = 5
11361136
1137- def __init__ (self , state , provision_info ):
1137+ def __init__ (self , state , provision_info , max_workers ):
11381138 self .state = state
11391139 self .provision_info = provision_info
1140+ self .max_workers = max_workers
11401141 self .control_server = grpc .server (
1141- futures .ThreadPoolExecutor (max_workers = 10 ))
1142+ futures .ThreadPoolExecutor (max_workers = self . max_workers ))
11421143 self .control_port = self .control_server .add_insecure_port ('[::]:0' )
11431144 self .control_address = 'localhost:%s' % self .control_port
11441145
@@ -1148,12 +1149,12 @@ def __init__(self, state, provision_info):
11481149 no_max_message_sizes = [("grpc.max_receive_message_length" , - 1 ),
11491150 ("grpc.max_send_message_length" , - 1 )]
11501151 self .data_server = grpc .server (
1151- futures .ThreadPoolExecutor (max_workers = 10 ),
1152+ futures .ThreadPoolExecutor (max_workers = self . max_workers ),
11521153 options = no_max_message_sizes )
11531154 self .data_port = self .data_server .add_insecure_port ('[::]:0' )
11541155
11551156 self .state_server = grpc .server (
1156- futures .ThreadPoolExecutor (max_workers = 10 ),
1157+ futures .ThreadPoolExecutor (max_workers = self . max_workers ),
11571158 options = no_max_message_sizes )
11581159 self .state_port = self .state_server .add_insecure_port ('[::]:0' )
11591160
@@ -1297,6 +1298,11 @@ def stop_worker(self):
12971298 self .worker_thread .join ()
12981299
12991300
1301+ # The subprocesses module is not threadsafe on Python 2.7. Use this lock to
1302+ # prevent concurrent calls to POpen().
1303+ SUBPROCESS_LOCK = threading .Lock ()
1304+
1305+
13001306@WorkerHandler .register_environment (python_urns .SUBPROCESS_SDK , bytes )
13011307class SubprocessSdkWorkerHandler (GrpcWorkerHandler ):
13021308 def __init__ (self , worker_command_line , state , provision_info , grpc_server ):
@@ -1326,48 +1332,52 @@ def __init__(self, payload, state, provision_info, grpc_server):
13261332 self ._container_id = None
13271333
13281334 def start_worker (self ):
1329- try :
1330- subprocess .check_call (['docker' , 'pull' , self ._container_image ])
1331- except Exception :
1332- logging .info ('Unable to pull image %s' % self ._container_image )
1333- self ._container_id = subprocess .check_output (
1334- ['docker' ,
1335- 'run' ,
1336- '-d' ,
1337- # TODO: credentials
1338- '--network=host' ,
1339- self ._container_image ,
1340- '--id=%s' % uuid .uuid4 (),
1341- '--logging_endpoint=%s' % self .logging_api_service_descriptor ().url ,
1342- '--control_endpoint=%s' % self .control_address ,
1343- '--artifact_endpoint=%s' % self .control_address ,
1344- '--provision_endpoint=%s' % self .control_address ,
1345- ]).strip ()
1346- while True :
1347- logging .info ('Waiting for docker to start up...' )
1348- status = subprocess .check_output ([
1349- 'docker' ,
1350- 'inspect' ,
1351- '-f' ,
1352- '{{.State.Status}}' ,
1353- self ._container_id ]).strip ()
1354- if status == 'running' :
1355- break
1356- elif status in ('dead' , 'exited' ):
1357- subprocess .call ([
1335+ with SUBPROCESS_LOCK :
1336+ try :
1337+ subprocess .check_call (['docker' , 'pull' , self ._container_image ])
1338+ except Exception :
1339+ logging .info ('Unable to pull image %s' % self ._container_image )
1340+ self ._container_id = subprocess .check_output (
1341+ ['docker' ,
1342+ 'run' ,
1343+ '-d' ,
1344+ # TODO: credentials
1345+ '--network=host' ,
1346+ self ._container_image ,
1347+ '--id=%s' % self .worker_id ,
1348+ '--logging_endpoint=%s' % self .logging_api_service_descriptor ().url ,
1349+ '--control_endpoint=%s' % self .control_address ,
1350+ '--artifact_endpoint=%s' % self .control_address ,
1351+ '--provision_endpoint=%s' % self .control_address ,
1352+ ]).strip ()
1353+ while True :
1354+ logging .info ('Waiting for docker to start up...' )
1355+ status = subprocess .check_output ([
13581356 'docker' ,
1359- 'container' ,
1360- 'logs' ,
1361- self ._container_id ])
1362- raise RuntimeError ('SDK failed to start.' )
1357+ 'inspect' ,
1358+ '-f' ,
1359+ '{{.State.Status}}' ,
1360+ self ._container_id ]).strip ()
1361+ if status == 'running' :
1362+ logging .info ('Docker container is running. container_id = %s, '
1363+ 'worker_id = %s' , self ._container_id , self .worker_id )
1364+ break
1365+ elif status in ('dead' , 'exited' ):
1366+ subprocess .call ([
1367+ 'docker' ,
1368+ 'container' ,
1369+ 'logs' ,
1370+ self ._container_id ])
1371+ raise RuntimeError ('SDK failed to start.' )
13631372 time .sleep (1 )
13641373
13651374 def stop_worker (self ):
13661375 if self ._container_id :
1367- subprocess .call ([
1368- 'docker' ,
1369- 'kill' ,
1370- self ._container_id ])
1376+ with SUBPROCESS_LOCK :
1377+ subprocess .call ([
1378+ 'docker' ,
1379+ 'kill' ,
1380+ self ._container_id ])
13711381
13721382
13731383class WorkerHandlerManager (object ):
@@ -1383,10 +1393,25 @@ def get_worker_handlers(self, environment_id, num_workers):
13831393 # Any environment will do, pick one arbitrarily.
13841394 environment_id = next (iter (self ._environments .keys ()))
13851395 environment = self ._environments [environment_id ]
1386- # assume it's using grpc if environment is not EMBEDDED_PYTHON.
1387- if environment .urn != python_urns .EMBEDDED_PYTHON and \
1388- self ._grpc_server is None :
1389- self ._grpc_server = GrpcServer (self ._state , self ._job_provision_info )
1396+ max_total_workers = num_workers * len (self ._environments )
1397+
1398+ # assume all environments except EMBEDDED_PYTHON use gRPC.
1399+ if environment .urn == python_urns .EMBEDDED_PYTHON :
1400+ pass # no need for a gRPC server
1401+ elif self ._grpc_server is None :
1402+ self ._grpc_server = GrpcServer (self ._state , self ._job_provision_info ,
1403+ max_total_workers )
1404+ elif max_total_workers > self ._grpc_server .max_workers :
1405+ # each gRPC server is running with fixed number of threads (
1406+ # max_total_workers), which is defined by the first call to
1407+ # get_worker_handlers(). Assumption here is a worker has a connection to a
1408+ # gRPC server. In case a stage tries to add more workers
1409+ # than the max_total_workers, some workers cannot connect to gRPC and
1410+ # pipeline will hang, hence raise an error here.
1411+ raise RuntimeError ('gRPC servers are running with %s threads, we cannot '
1412+ 'attach %s workers.' % (self ._grpc_server .max_workers ,
1413+ max_total_workers ))
1414+
13901415 worker_handler_list = self ._cached_handlers [environment_id ]
13911416 if len (worker_handler_list ) < num_workers :
13921417 for _ in range (len (worker_handler_list ), num_workers ):
0 commit comments