Skip to content

Commit d16c97e

Browse files
authored
Fix: restore token-by-token streaming for chains using createTextOnlyOutputParser (#6086)
fix: restore token-by-token streaming for chains using createTextOnlyOutputParser
1 parent 45e3ad6 commit d16c97e

1 file changed

Lines changed: 45 additions & 15 deletions

File tree

packages/components/src/utils.ts

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { ICommonObject, IDatabaseEntity, IFileUpload, IMessage, INodeData, IVari
1111
import { BaseChatModel } from '@langchain/core/language_models/chat_models'
1212
import { AES, enc } from 'crypto-js'
1313
import { AIMessage, AIMessageChunk, HumanMessage, BaseMessage } from '@langchain/core/messages'
14-
import { RunnableLambda } from '@langchain/core/runnables'
14+
import { Runnable, type RunnableConfig } from '@langchain/core/runnables'
1515
import { Document } from '@langchain/core/documents'
1616
import { getFileFromStorage } from './storageUtils'
1717
import { GetSecretValueCommand, SecretsManagerClient, SecretsManagerClientConfig } from '@aws-sdk/client-secrets-manager'
@@ -303,24 +303,54 @@ export const transformBracesWithColon = (input: string): string => {
303303
}
304304

305305
/**
306-
* Creates a RunnableLambda that extracts text content from a chat model response,
307-
* filtering out reasoning/thinking content blocks that reasoning models may return.
306+
* Extracts text content from an AIMessageChunk, filtering out reasoning/thinking
307+
* content blocks that reasoning models may return.
308+
*/
309+
const extractTextFromChunk = (response: AIMessageChunk): string => {
310+
if (typeof response.content === 'string') {
311+
return response.content
312+
}
313+
if (Array.isArray(response.content)) {
314+
return response.content
315+
.filter((block: any) => block.type === 'text' || block.type === 'text_delta')
316+
.map((block: any) => block.text ?? '')
317+
.join('')
318+
}
319+
return ''
320+
}
321+
322+
/**
323+
* Creates a streaming-compatible output parser that extracts text content from
324+
* chat model responses, filtering out reasoning/thinking content blocks.
325+
* https://github.com/FlowiseAI/Flowise/pull/5893#issuecomment-4045466531
308326
*/
309327
export const createTextOnlyOutputParser = () => {
310-
return new RunnableLambda({
311-
func: (response: AIMessageChunk) => {
312-
if (typeof response.content === 'string') {
313-
return response.content
314-
}
315-
if (Array.isArray(response.content)) {
316-
return response.content
317-
.filter((block: any) => block.type === 'text' || block.type === 'text_delta')
318-
.map((block: any) => block.text ?? '')
319-
.join('')
328+
return new TextOnlyOutputParser()
329+
}
330+
331+
class TextOnlyOutputParser extends Runnable<AIMessageChunk, string> {
332+
static lc_name() {
333+
return 'TextOnlyOutputParser'
334+
}
335+
336+
lc_namespace = ['flowise', 'output_parsers']
337+
338+
async invoke(input: AIMessageChunk, _options?: Partial<RunnableConfig>): Promise<string> {
339+
return extractTextFromChunk(input)
340+
}
341+
342+
async *_transform(generator: AsyncGenerator<AIMessageChunk>): AsyncGenerator<string> {
343+
for await (const chunk of generator) {
344+
const text = extractTextFromChunk(chunk)
345+
if (text) {
346+
yield text
320347
}
321-
return ''
322348
}
323-
})
349+
}
350+
351+
async *transform(generator: AsyncGenerator<AIMessageChunk>, options?: Partial<RunnableConfig>): AsyncGenerator<string> {
352+
yield* this._transformStreamWithConfig(generator, this._transform.bind(this), options)
353+
}
324354
}
325355

326356
/**

0 commit comments

Comments
 (0)