Skip to content

Commit 2bd71bd

Browse files
author
Jicheng Lu
committed
add stop conv streaming and thinking text
1 parent 9d4127a commit 2bd71bd

18 files changed

Lines changed: 762 additions & 363 deletions

File tree

Directory.Packages.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
<PackageVersion Include="A2A" Version="0.3.3-preview" />
77
<PackageVersion Include="CsvHelper" Version="33.1.0" />
88
<PackageVersion Include="FuzzySharp" Version="2.0.2" />
9-
<PackageVersion Include="Google_GenerativeAI" Version="3.6.3" />
10-
<PackageVersion Include="Google_GenerativeAI.Live" Version="3.6.3" />
9+
<PackageVersion Include="Google_GenerativeAI" Version="3.6.4" />
10+
<PackageVersion Include="Google_GenerativeAI.Live" Version="3.6.4" />
1111
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
1212
<PackageVersion Include="Polly" Version="8.4.2" />
1313
<PackageVersion Include="RabbitMQ.Client" Version="7.2.0" />
@@ -33,7 +33,7 @@
3333
<PackageVersion Include="Whisper.net.Runtime" Version="1.8.1" />
3434
<PackageVersion Include="NCrontab" Version="3.3.3" />
3535
<PackageVersion Include="Azure.AI.OpenAI" Version="2.7.0-beta.1" />
36-
<PackageVersion Include="OpenAI" Version="2.9.1" />
36+
<PackageVersion Include="OpenAI" Version="2.10.0" />
3737
<PackageVersion Include="MailKit" Version="4.14.1" />
3838
<PackageVersion Include="Microsoft.Data.Sqlite" Version="10.0.0" />
3939
<PackageVersion Include="MySql.Data" Version="9.5.0" />

src/Infrastructure/BotSharp.Abstraction/Conversations/Dtos/ChatResponseDto.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public class ChatResponseDto : InstructResult
4646
[JsonPropertyName("is_streaming")]
4747
public bool IsStreaming { get; set; }
4848

49+
[JsonPropertyName("meta_data")]
50+
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
51+
public Dictionary<string, string?>? MetaData { get; set; }
52+
4953
[JsonPropertyName("created_at")]
5054
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
5155
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System.Threading;
2+
3+
namespace BotSharp.Abstraction.Conversations;
4+
5+
/// <summary>
6+
/// Service to manage cancellation tokens for streaming chat completions.
7+
/// Allows stopping an active streaming response by conversation ID.
8+
/// </summary>
9+
public interface IConversationCancellationService
10+
{
11+
/// <summary>
12+
/// Register a new cancellation token source for the given conversation.
13+
/// Returns the CancellationToken to be used in streaming loops.
14+
/// </summary>
15+
CancellationToken RegisterConversation(string conversationId);
16+
17+
/// <summary>
18+
/// Cancel an active streaming operation for the given conversation.
19+
/// </summary>
20+
/// <returns>True if the conversation was found and cancelled, false otherwise.</returns>
21+
bool CancelStreaming(string conversationId);
22+
23+
/// <summary>
24+
/// Remove the cancellation token source for the given conversation.
25+
/// Should be called when streaming completes (either normally or via cancellation).
26+
/// </summary>
27+
void UnregisterConversation(string conversationId);
28+
29+
/// <summary>
30+
/// Get the cancellation token for the given conversation if one is registered.
31+
/// </summary>
32+
CancellationToken GetToken(string conversationId);
33+
}

src/Infrastructure/BotSharp.Core/Conversations/ConversationPlugin.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public void RegisterDI(IServiceCollection services, IConfiguration config)
4848
return settingService.Bind<GoogleApiSettings>("GoogleApi");
4949
});
5050

51+
// Streaming cancellation
52+
services.AddSingleton<IConversationCancellationService, ConversationCancellationService>();
53+
5154
// Observer and observable
5255
services.AddSingleton<MessageHub<HubObserveData<RoleDialogModel>>>();
5356
services.AddScoped<ObserverSubscriptionContainer<HubObserveData<RoleDialogModel>>>();
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace BotSharp.Core.Conversations.Services;
4+
5+
public class ConversationCancellationService : IConversationCancellationService
6+
{
7+
private readonly ConcurrentDictionary<string, CancellationTokenSource> _cancellationTokenSources = new();
8+
private readonly ILogger _logger;
9+
10+
public ConversationCancellationService(
11+
ILogger<ConversationCancellationService> logger)
12+
{
13+
_logger = logger;
14+
}
15+
16+
public CancellationToken RegisterConversation(string conversationId)
17+
{
18+
// Cancel any existing streaming for this conversation
19+
if (_cancellationTokenSources.TryRemove(conversationId, out var existingCts))
20+
{
21+
existingCts.Cancel();
22+
existingCts.Dispose();
23+
_logger.LogWarning("Cancelled existing streaming session for conversation {ConversationId}", conversationId);
24+
}
25+
26+
var cts = new CancellationTokenSource();
27+
_cancellationTokenSources[conversationId] = cts;
28+
_logger.LogInformation("Registered streaming cancellation for conversation {ConversationId}", conversationId);
29+
return cts.Token;
30+
}
31+
32+
public bool CancelStreaming(string conversationId)
33+
{
34+
if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
35+
{
36+
cts.Cancel();
37+
_logger.LogInformation("Streaming cancelled for conversation {ConversationId}", conversationId);
38+
return true;
39+
}
40+
41+
_logger.LogWarning("No active streaming found for conversation {ConversationId}", conversationId);
42+
return false;
43+
}
44+
45+
public void UnregisterConversation(string conversationId)
46+
{
47+
if (_cancellationTokenSources.TryRemove(conversationId, out var cts))
48+
{
49+
cts.Dispose();
50+
_logger.LogDebug("Unregistered streaming cancellation for conversation {ConversationId}", conversationId);
51+
}
52+
}
53+
54+
public CancellationToken GetToken(string conversationId)
55+
{
56+
if (_cancellationTokenSources.TryGetValue(conversationId, out var cts))
57+
{
58+
return cts.Token;
59+
}
60+
61+
return CancellationToken.None;
62+
}
63+
}

src/Infrastructure/BotSharp.OpenAPI/Controllers/Conversation/ConversationController.cs

Lines changed: 67 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs(
131131
Data = message.Data,
132132
Sender = UserDto.FromUser(user),
133133
Payload = message.Payload,
134+
MetaData = message.MetaData,
134135
HasMessageFiles = files.Any(x => x.MessageId.IsEqualTo(message.MessageId) && x.FileSource == FileSource.User)
135136
});
136137
}
@@ -146,6 +147,7 @@ public async Task<IEnumerable<ChatResponseModel>> GetDialogs(
146147
Text = !string.IsNullOrEmpty(message.SecondaryContent) ? message.SecondaryContent : message.Content,
147148
Function = message.FunctionName,
148149
Data = message.Data,
150+
MetaData = message.MetaData,
149151
Sender = new()
150152
{
151153
FirstName = agent?.Name ?? "Unkown",
@@ -397,18 +399,36 @@ public async Task<ChatResponseModel> SendMessage(
397399
await conv.SetConversationId(conversationId, input.States);
398400
SetStates(conv, input);
399401

402+
IConversationCancellationService? convCancellation = null;
403+
if (input.IsStreamingMessage)
404+
{
405+
convCancellation = _services.GetRequiredService<IConversationCancellationService>();
406+
convCancellation.RegisterConversation(conversationId);
407+
}
408+
400409
var response = new ChatResponseModel();
401-
await conv.SendMessage(agentId, inputMsg,
402-
replyMessage: input.Postback,
403-
async msg =>
404-
{
405-
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
406-
response.Function = msg.FunctionName;
407-
response.MessageLabel = msg.MessageLabel;
408-
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
409-
response.Instruction = msg.Instruction;
410-
response.Data = msg.Data;
411-
});
410+
try
411+
{
412+
await conv.SendMessage(agentId, inputMsg,
413+
replyMessage: input.Postback,
414+
async msg =>
415+
{
416+
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
417+
response.Function = msg.FunctionName;
418+
response.MessageLabel = msg.MessageLabel;
419+
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
420+
response.Instruction = msg.Instruction;
421+
response.Data = msg.Data;
422+
});
423+
}
424+
catch (OperationCanceledException) when (input.IsStreamingMessage)
425+
{
426+
response.Text = string.Empty;
427+
}
428+
finally
429+
{
430+
convCancellation?.UnregisterConversation(conversationId);
431+
}
412432

413433
var state = _services.GetRequiredService<IConversationStateService>();
414434
response.States = state.GetStates();
@@ -455,20 +475,20 @@ public async Task SendMessageSse([FromRoute] string agentId, [FromRoute] string
455475
Response.Headers.Append(Microsoft.Net.Http.Headers.HeaderNames.Connection, "keep-alive");
456476

457477
await conv.SendMessage(agentId, inputMsg,
458-
replyMessage: input.Postback,
459-
// responsed generated
460-
async msg =>
461-
{
462-
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
463-
response.MessageLabel = msg.MessageLabel;
464-
response.Function = msg.FunctionName;
465-
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
466-
response.Instruction = msg.Instruction;
467-
response.Data = msg.Data;
468-
response.States = state.GetStates();
469-
470-
await OnChunkReceived(Response, response);
471-
});
478+
replyMessage: input.Postback,
479+
// responsed generated
480+
async msg =>
481+
{
482+
response.Text = !string.IsNullOrEmpty(msg.SecondaryContent) ? msg.SecondaryContent : msg.Content;
483+
response.MessageLabel = msg.MessageLabel;
484+
response.Function = msg.FunctionName;
485+
response.RichContent = msg.SecondaryRichContent ?? msg.RichContent;
486+
response.Instruction = msg.Instruction;
487+
response.Data = msg.Data;
488+
response.States = state.GetStates();
489+
490+
await OnChunkReceived(Response, response);
491+
});
472492

473493
response.States = state.GetStates();
474494
response.MessageId = inputMsg.MessageId;
@@ -477,18 +497,13 @@ await conv.SendMessage(agentId, inputMsg,
477497
// await OnEventCompleted(Response);
478498
}
479499

480-
private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
500+
[HttpPost("/conversation/{conversationId}/stop-streaming")]
501+
public ConverstionCancellationResponse StopStreaming([FromRoute] string conversationId)
481502
{
482-
var indicator = new ChatResponseModel
483-
{
484-
ConversationId = conversationId,
485-
MessageId = msg.MessageId,
486-
Text = msg.Indication,
487-
Function = "indicating",
488-
Instruction = msg.Instruction,
489-
States = new Dictionary<string, string>()
490-
};
491-
await OnChunkReceived(Response, indicator);
503+
var streamingCancellation = _services.GetRequiredService<IConversationCancellationService>();
504+
var cancelled = streamingCancellation.CancelStreaming(conversationId);
505+
506+
return new ConverstionCancellationResponse { Success = cancelled };
492507
}
493508
#endregion
494509

@@ -515,6 +530,8 @@ private void SetStates(IConversationService conv, NewMessageModel input)
515530
{
516531
conv.States.SetState("sampling_factor", input.SamplingFactor, source: StateSource.External);
517532
}
533+
534+
conv.States.SetState("use_stream_message", input.IsStreamingMessage, source: StateSource.Application);
518535
}
519536

520537
private FileContentResult BuildFileResult(string file)
@@ -567,5 +584,19 @@ private JsonSerializerOptions InitJsonOptions(BotSharpOptions options)
567584

568585
return jsonOption;
569586
}
587+
588+
private async Task OnReceiveToolCallIndication(string conversationId, RoleDialogModel msg)
589+
{
590+
var indicator = new ChatResponseModel
591+
{
592+
ConversationId = conversationId,
593+
MessageId = msg.MessageId,
594+
Text = msg.Indication,
595+
Function = "indicating",
596+
Instruction = msg.Instruction,
597+
States = []
598+
};
599+
await OnChunkReceived(Response, indicator);
600+
}
570601
#endregion
571602
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
1+
using System.Text.Json.Serialization;
2+
13
namespace BotSharp.OpenAPI.ViewModels.Conversations;
24

35
public class NewMessageModel : IncomingMessageModel
46
{
57
public override string Channel { get; set; } = ConversationChannel.OpenAPI;
8+
9+
/// <summary>
10+
/// Indicates whether this message uses streaming completion.
11+
/// When true, the streaming can be stopped via the stop endpoint.
12+
/// </summary>
13+
[JsonPropertyName("is_streaming_msg")]
14+
public bool IsStreamingMessage { get; set; }
615
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
namespace BotSharp.OpenAPI.ViewModels.Conversations;
2+
3+
public class ConverstionCancellationResponse : ResponseBase
4+
{
5+
}

0 commit comments

Comments
 (0)