11import type { Worker } from 'node:worker_threads' ;
22import type { Subscription } from 'rxjs' ;
3- import type { WorkerFactory , WorkerTask } from './types.js' ;
3+ import type {
4+ TaskCallback ,
5+ WorkerFactory ,
6+ WorkerResultInternal ,
7+ WorkerTask ,
8+ WorkerTaskInput ,
9+ } from './types.js' ;
410import { Subject } from 'rxjs' ;
11+ import { WorkerFunction , WorkerResult } from './types.js' ;
12+ import * as errors from './errors.js' ;
513
614const taskInfoSymbol = Symbol ( 'Task Info Symbol' ) ;
7- type TaskCallback = ( result : any | undefined , error : Error | undefined ) => void ;
815type PoolStatus = 'idle' | 'working' | 'queued' ;
916
1017class WorkerPool {
@@ -23,7 +30,7 @@ class WorkerPool {
2330 public poolStatus : PoolStatus = 'idle' ;
2431
2532 constructor ( workerNum : number , workerFactory : WorkerFactory ) {
26- if ( workerNum < 1 ) throw Error ( 'TMP IMP must be at least 1 worker' ) ;
33+ if ( workerNum < 1 ) throw new errors . ErrorWorkerPoolInvalidWorkers ( ) ;
2734 this . workerFactory = workerFactory ;
2835 for ( let i = 0 ; i < workerNum ; i ++ ) {
2936 this . addWorker ( ) ;
@@ -50,9 +57,9 @@ class WorkerPool {
5057
5158 protected addWorker ( ) {
5259 const worker = this . workerFactory ( ) ;
53- const messageHandler = ( result : { result ?: unknown ; error ?: Error } ) => {
60+ const messageHandler = ( result : WorkerResultInternal ) => {
5461 if ( result . error != null ) worker [ taskInfoSymbol ] ( undefined , result . error ) ;
55- else worker [ taskInfoSymbol ] ( result . result ) ;
62+ else worker [ taskInfoSymbol ] ( result . data , undefined ) ;
5663 worker [ taskInfoSymbol ] = undefined ;
5764 this . freeWorkers . push ( worker ) ;
5865 this . $workerFreed . next ( ) ;
@@ -83,7 +90,7 @@ class WorkerPool {
8390 this . $workerFreed . next ( ) ;
8491 }
8592
86- public runTask ( task : any , callback : any ) {
93+ public runTask ( task : WorkerTaskInput , callback : TaskCallback ) {
8794 if ( this . terminating ) throw Error ( 'TMP IMP terminating' ) ;
8895 if ( this . freeWorkers . length === 0 ) {
8996 this . queue . push ( { task, callback } ) ;
@@ -95,33 +102,13 @@ class WorkerPool {
95102 const worker = this . freeWorkers . pop ( ) ! ;
96103 if ( wasIdle ) this . $poolStatus . next ( 'working' ) ;
97104 worker [ taskInfoSymbol ] = callback ;
98- worker . postMessage ( task ) ;
105+ worker . postMessage ( { type : task . type , data : task . data } , task . transferList ) ;
99106 }
100107
101108 public async terminate ( force : boolean ) {
102109 this . terminating = true ;
103- if ( ! force ) {
104- // Prevent new tasks and wait for exising queue to drain
105- await new Promise < void > ( ( resolve ) => {
106- if (
107- this . freeWorkers . length === this . workers . size &&
108- this . queue . length === 0
109- ) {
110- return resolve ( ) ;
111- }
112- const subscription = this . $workerFreed . subscribe ( ( ) => {
113- // Wait for the queue to drain and the workers to free
114- if (
115- this . freeWorkers . length === this . workers . size &&
116- this . queue . length === 0
117- ) {
118- subscription . unsubscribe ( ) ;
119- resolve ( ) ;
120- }
121- } ) ;
122- } ) ;
123- }
124-
110+ // Prevent new tasks and wait for exising queue to drain
111+ if ( ! force ) await this . settled ( ) ;
125112 // Prevent terminations from creating new workers
126113 this . handleDestroySubscription . unsubscribe ( ) ;
127114 const workerTerminatePs : Array < Promise < number > > = [ ] ;
0 commit comments