1616 */
1717import { EventEmitter } from 'events' ;
1818
19- import { EventKey , EventListener , EventMap , TypedEmitter } from './utils/TypedEmitter.js' ;
19+ import { EventKey , EventMap , TypedEmitter } from './utils/TypedEmitter.js' ;
2020
2121/**
2222 * @internal
@@ -44,21 +44,19 @@ export class StreamablePromise<T, EM extends EventMap>
4444 } ) < EM >
4545 implements Promise < T >
4646{
47- private _promise : Promise < T > | null = null ;
48- private _promiseOns : [ string | symbol , EventListener < any [ ] > ] [ ] ;
47+ private _promise : Promise < T > ;
48+ protected _collectResults = true ;
4949
5050 /**
5151 * @internal
5252 */
5353 constructor ( promisefyFn : PromisifyFunc < T , EM > ) {
5454 super ( ) ;
5555
56- this . _promiseOns = [ ] ;
5756 this . _promise = new Promise ( ( resolve , reject ) => {
5857 promisefyFn (
5958 {
6059 on : < T extends EventKey < EM > > ( eventName : T , listener : EM [ T ] ) => {
61- this . _promiseOns . push ( [ eventName , listener ] ) ;
6260 void super . on ( eventName , listener ) ;
6361 } ,
6462 } ,
@@ -68,42 +66,30 @@ export class StreamablePromise<T, EM extends EventMap>
6866 } ) ;
6967 }
7068
71- private get promise ( ) : Promise < T > {
72- if ( ! this . _promise ) {
73- throw new Error ( 'Cannot await a promise that is already registered for events' ) ;
74- }
75- return this . _promise ;
76- }
77-
78- private _depromisify ( ) {
79- this . _promiseOns . forEach ( ( e ) => void this . off ( ...( e as [ never , never ] ) ) ) ;
80- this . _promise = null ;
81- }
82-
8369 then < TResult1 = T , TResult2 = never > (
8470 onfulfilled ?: ( ( value : T ) => TResult1 | PromiseLike < TResult1 > ) | undefined | null ,
8571 onrejected ?: ( ( reason : any ) => TResult2 | PromiseLike < TResult2 > ) | undefined | null
8672 ) : Promise < TResult1 | TResult2 > {
87- return this . promise . then < TResult1 , TResult2 > ( onfulfilled , onrejected ) ;
73+ return this . _promise . then < TResult1 , TResult2 > ( onfulfilled , onrejected ) ;
8874 }
8975
9076 catch < TResult = never > (
9177 onrejected ?: ( ( reason : any ) => TResult | PromiseLike < TResult > ) | undefined | null
9278 ) : Promise < T | TResult > {
93- return this . promise . catch < TResult > ( onrejected ) ;
79+ return this . _promise . catch < TResult > ( onrejected ) ;
9480 }
9581
9682 finally ( onfinally ?: ( ( ) => void ) | undefined | null ) : Promise < T > {
97- return this . promise . finally ( onfinally ) ;
83+ return this . _promise . finally ( onfinally ) ;
9884 }
9985
10086 override addListener < T extends EventKey < EM > > ( eventName : T , listener : EM [ T ] ) : this {
101- this . _depromisify ( ) ;
87+ this . _collectResults = false ;
10288 return super . on ( eventName , listener ) ;
10389 }
10490
10591 override on < T extends EventKey < EM > > ( eventName : T , listener : EM [ T ] ) : this {
106- this . _depromisify ( ) ;
92+ this . _collectResults = false ;
10793 return super . on ( eventName , listener ) ;
10894 }
10995
@@ -135,7 +121,9 @@ export class StreamableRowPromise<T, TRow, TMeta> extends StreamablePromise<
135121 const rows : TRow [ ] = [ ] ;
136122 let meta : TMeta | undefined ;
137123
138- void emitter . on ( 'row' , ( r ) => rows . push ( r ) ) ;
124+ void emitter . on ( 'row' , ( r ) => {
125+ if ( this . _collectResults ) rows . push ( r ) ;
126+ } ) ;
139127 void emitter . on ( 'meta' , ( m ) => ( meta = m ) ) ;
140128 void emitter . on ( 'error' , ( e ) => ( err = e ) ) ;
141129 void emitter . on ( 'end' , ( ) => {
@@ -167,7 +155,9 @@ export class StreamableReplicasPromise<T, TRep> extends StreamablePromise<
167155 let err : Error | undefined ;
168156 const replicas : TRep [ ] = [ ] ;
169157
170- void emitter . on ( 'replica' , ( r ) => replicas . push ( r ) ) ;
158+ void emitter . on ( 'replica' , ( r ) => {
159+ if ( this . _collectResults ) replicas . push ( r ) ;
160+ } ) ;
171161 void emitter . on ( 'error' , ( e ) => ( err = e ) ) ;
172162 void emitter . on ( 'end' , ( ) => {
173163 if ( err ) {
@@ -195,7 +185,9 @@ export class StreamableScanPromise<T, TRes> extends StreamablePromise<
195185 let err : Error | undefined ;
196186 const results : TRes [ ] = [ ] ;
197187
198- void emitter . on ( 'result' , ( r ) => results . push ( r ) ) ;
188+ void emitter . on ( 'result' , ( r ) => {
189+ if ( this . _collectResults ) results . push ( r ) ;
190+ } ) ;
199191 void emitter . on ( 'error' , ( e ) => ( err = e ) ) ;
200192 void emitter . on ( 'end' , ( ) => {
201193 if ( err ) {
0 commit comments