Skip to content

Commit 4dadd5c

Browse files
authored
Merge pull request #917 from yileicn/master
optimize realtime
2 parents d94b5cb + af01567 commit 4dadd5c

5 files changed

Lines changed: 96 additions & 87 deletions

File tree

src/Infrastructure/BotSharp.Abstraction/MLTasks/IRealTimeCompletion.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public interface IRealTimeCompletion
1010

1111
Task Connect(RealtimeHubConnection conn,
1212
Action onModelReady,
13-
Action<string> onModelAudioDeltaReceived,
13+
Action<string, string> onModelAudioDeltaReceived,
1414
Action onModelAudioResponseDone,
1515
Action<string> onAudioTranscriptDone,
1616
Action<List<RoleDialogModel>> onModelResponseDone,

src/Infrastructure/BotSharp.Abstraction/Realtime/Models/RealtimeHubConnection.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ public class RealtimeHubConnection
66
{
77
public string Event { get; set; } = null!;
88
public string StreamId { get; set; } = null!;
9-
public string? LastAssistantItem { get; set; } = null!;
9+
public string? LastAssistantItemId { get; set; } = null!;
1010
public long LatestMediaTimestamp { get; set; }
11-
public long? ResponseStartTimestamp { get; set; }
11+
public long? ResponseStartTimestampTwilio { get; set; }
1212
public string KeypadInputBuffer { get; set; } = string.Empty;
1313
public ConcurrentQueue<string> MarkQueue { get; set; } = new();
1414
public string CurrentAgentId { get; set; } = null!;
@@ -18,4 +18,17 @@ public class RealtimeHubConnection
1818
public Func<string, object> OnModelMessageReceived { get; set; } = null!;
1919
public Func<object> OnModelAudioResponseDone { get; set; } = null!;
2020
public Func<object> OnModelUserInterrupted { get; set; } = null!;
21+
22+
public void ResetResponseState()
23+
{
24+
MarkQueue.Clear();
25+
LastAssistantItemId = null;
26+
ResponseStartTimestampTwilio = null;
27+
}
28+
29+
public void ResetStreamState()
30+
{
31+
ResponseStartTimestampTwilio = null;
32+
LatestMediaTimestamp = 0;
33+
}
2134
}

src/Infrastructure/BotSharp.Core/Realtime/RealtimeHub.cs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ await completer.Connect(conn,
103103
await completer.UpdateSession(conn, turnDetection: false);
104104

105105
// Add dialog history
106-
foreach (var item in dialogs)
107-
{
108-
await completer.InsertConversationItem(item);
109-
}
106+
//foreach (var item in dialogs)
107+
//{
108+
// await completer.InsertConversationItem(item);
109+
//}
110110

111111
if (dialogs.LastOrDefault()?.Role == AgentRole.Assistant)
112112
{
@@ -121,30 +121,25 @@ await completer.Connect(conn,
121121
await Task.Delay(1000 * 8);
122122
await completer.UpdateSession(conn, turnDetection: true);
123123
},
124-
onModelAudioDeltaReceived: async audioDeltaData =>
124+
onModelAudioDeltaReceived: async (audioDeltaData, itemId) =>
125125
{
126-
// If this is the first delta of a new response, set the start timestamp
127-
if (!conn.ResponseStartTimestamp.HasValue)
128-
{
129-
conn.ResponseStartTimestamp = conn.LatestMediaTimestamp;
130-
_logger.LogDebug($"Setting start timestamp for new response: {conn.ResponseStartTimestamp}ms");
131-
}
132-
133126
var data = conn.OnModelMessageReceived(audioDeltaData);
134127
await SendEventToUser(userWebSocket, data);
135128

136-
// Send mark messages to Media Streams so we know if and when AI response playback is finished
137-
if (!string.IsNullOrEmpty(conn.StreamId))
129+
// If this is the first delta of a new response, set the start timestamp
130+
if (!conn.ResponseStartTimestampTwilio.HasValue)
138131
{
139-
var markEvent = new
140-
{
141-
@event = "mark",
142-
streamSid = conn.StreamId,
143-
mark = new { name = "responsePart" }
144-
};
145-
await SendEventToUser(userWebSocket, markEvent);
146-
conn.MarkQueue.Enqueue("responsePart");
132+
conn.ResponseStartTimestampTwilio = conn.LatestMediaTimestamp;
133+
_logger.LogDebug($"Setting start timestamp for new response: {conn.ResponseStartTimestampTwilio}ms");
147134
}
135+
// Record last assistant item ID for interruption handling
136+
if (!string.IsNullOrEmpty(itemId))
137+
{
138+
conn.LastAssistantItemId = itemId;
139+
}
140+
141+
// Send mark messages to Media Streams so we know if and when AI response playback is finished
142+
await SendMark(userWebSocket, conn);
148143
},
149144
onModelAudioResponseDone: async () =>
150145
{
@@ -226,15 +221,28 @@ await completer.Connect(conn,
226221
onUserInterrupted: async () =>
227222
{
228223
// Reset states
229-
conn.MarkQueue.Clear();
230-
conn.LastAssistantItem = null;
231-
conn.ResponseStartTimestamp = null;
224+
conn.ResetResponseState();
232225

233226
var data = conn.OnModelUserInterrupted();
234227
await SendEventToUser(userWebSocket, data);
235228
});
236229
}
237230

231+
private async Task SendMark(WebSocket userWebSocket, RealtimeHubConnection conn)
232+
{
233+
if (!string.IsNullOrEmpty(conn.StreamId))
234+
{
235+
var markEvent = new
236+
{
237+
@event = "mark",
238+
streamSid = conn.StreamId,
239+
mark = new { name = "responsePart" }
240+
};
241+
await SendEventToUser(userWebSocket, markEvent);
242+
conn.MarkQueue.Enqueue("responsePart");
243+
}
244+
}
245+
238246
private async Task HandleUserDtmfReceived(IRealTimeCompletion completer, RealtimeHubConnection conn)
239247
{
240248
var routing = _services.GetRequiredService<IRoutingService>();

src/Plugins/BotSharp.Plugin.OpenAI/Providers/Realtime/RealTimeCompletionProvider.cs

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public RealTimeCompletionProvider(
4040

4141
public async Task Connect(RealtimeHubConnection conn,
4242
Action onModelReady,
43-
Action<string> onModelAudioDeltaReceived,
43+
Action<string,string> onModelAudioDeltaReceived,
4444
Action onModelAudioResponseDone,
4545
Action<string> onAudioTranscriptDone,
4646
Action<List<RoleDialogModel>> onModelResponseDone,
@@ -123,7 +123,7 @@ await SendEventToModel(new
123123

124124
private async Task ReceiveMessage(RealtimeHubConnection conn,
125125
Action onModelReady,
126-
Action<string> onModelAudioDeltaReceived,
126+
Action<string,string> onModelAudioDeltaReceived,
127127
Action onModelAudioResponseDone,
128128
Action<string> onAudioTranscriptDone,
129129
Action<List<RoleDialogModel>> onModelResponseDone,
@@ -176,21 +176,10 @@ private async Task ReceiveMessage(RealtimeHubConnection conn,
176176
else if (response.Type == "response.audio.delta")
177177
{
178178
var audio = JsonSerializer.Deserialize<ResponseAudioDelta>(receivedText);
179-
// Record last assistant item ID for interruption handling
180-
if (conn.ResponseStartTimestamp.HasValue)
181-
{
182-
conn.ResponseStartTimestamp = conn.LatestMediaTimestamp;
183-
}
184-
185-
if (!string.IsNullOrEmpty(conn.StreamId))
186-
{
187-
conn.LastAssistantItem = audio?.ItemId;
188-
}
189-
190-
if (audio != null && audio.Delta != null)
179+
if (audio?.Delta != null)
191180
{
192181
_logger.LogDebug($"{response.Type}: {receivedText}");
193-
onModelAudioDeltaReceived(audio.Delta);
182+
onModelAudioDeltaReceived(audio.Delta, audio.ItemId);
194183
}
195184
}
196185
else if (response.Type == "response.audio.done")
@@ -218,16 +207,16 @@ private async Task ReceiveMessage(RealtimeHubConnection conn,
218207
else if (response.Type == "input_audio_buffer.speech_started")
219208
{
220209
// Handle user interuption
221-
if (conn.MarkQueue.Count > 0 && conn.ResponseStartTimestamp != null)
210+
if (conn.MarkQueue.Count > 0 && conn.ResponseStartTimestampTwilio != null)
222211
{
223-
var elapsedTime = conn.LatestMediaTimestamp - conn.ResponseStartTimestamp;
212+
var elapsedTime = conn.LatestMediaTimestamp - conn.ResponseStartTimestampTwilio;
224213

225-
if (!string.IsNullOrEmpty(conn.LastAssistantItem))
214+
if (!string.IsNullOrEmpty(conn.LastAssistantItemId))
226215
{
227216
var truncateEvent = new
228217
{
229218
type = "conversation.item.truncate",
230-
item_id = conn.LastAssistantItem,
219+
item_id = conn.LastAssistantItemId,
231220
content_index = 0,
232221
audio_end_ms = elapsedTime
233222
};
@@ -336,13 +325,13 @@ public async Task UpdateSession(RealtimeHubConnection conn, bool turnDetection =
336325
ToolChoice = "auto",
337326
Tools = functions,
338327
Modalities = [ "text", "audio" ],
339-
Temperature = Math.Max(options.Temperature ?? 0f, 0.8f),
328+
Temperature = Math.Max(options.Temperature ?? 0f, 0.6f),
340329
MaxResponseOutputTokens = 512,
341330
TurnDetection = new RealtimeSessionTurnDetection
342331
{
343-
Threshold = 0.5f,
332+
Threshold = 0.8f,
344333
PrefixPadding = 300,
345-
SilenceDuration = 500
334+
SilenceDuration = 800
346335
}
347336
}
348337
};

src/Plugins/BotSharp.Plugin.Twilio/Services/Stream/TwilioStreamMiddleware.cs

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,44 @@ await hub.Listen(webSocket, (receivedText) =>
7171
{
7272
var response = JsonSerializer.Deserialize<StreamEventResponse>(receivedText);
7373
conn.StreamId = response.StreamSid;
74-
conn.Event = response.Event switch
75-
{
76-
"start" => "user_connected",
77-
"media" => "user_data_received",
78-
"stop" => "user_disconnected",
79-
_ => response.Event
80-
};
8174

82-
if (string.IsNullOrEmpty(conn.Event))
75+
switch (response.Event)
8376
{
84-
return conn;
77+
case "start":
78+
conn.Event = "user_connected";
79+
var startResponse = JsonSerializer.Deserialize<StreamEventStartResponse>(receivedText);
80+
conn.Data = JsonSerializer.Serialize(startResponse.Body.CustomParameters);
81+
conn.ResetStreamState();
82+
break;
83+
case "media":
84+
conn.Event = "user_data_received";
85+
var mediaResponse = JsonSerializer.Deserialize<StreamEventMediaResponse>(receivedText);
86+
conn.LatestMediaTimestamp = long.Parse(mediaResponse.Body.Timestamp);
87+
conn.Data = mediaResponse.Body.Payload;
88+
break;
89+
case "stop":
90+
conn.Event = "user_disconnected";
91+
break;
92+
case "mark":
93+
conn.Event = "mark";
94+
if (conn.MarkQueue.Count > 0) conn.MarkQueue.TryDequeue(out var _);
95+
break;
96+
case "dtmf":
97+
var dtmfResponse = JsonSerializer.Deserialize<StreamEventDtmfResponse>(receivedText);
98+
if (dtmfResponse.Body.Digit == "#")
99+
{
100+
conn.Event = "user_dtmf_received";
101+
conn.Data = conn.KeypadInputBuffer;
102+
conn.KeypadInputBuffer = string.Empty;
103+
}
104+
else
105+
{
106+
conn.KeypadInputBuffer += dtmfResponse.Body.Digit;
107+
}
108+
break;
109+
default:
110+
conn.Event = response.Event;
111+
break;
85112
}
86113

87114
conn.OnModelMessageReceived = message =>
@@ -105,34 +132,6 @@ await hub.Listen(webSocket, (receivedText) =>
105132
streamSid = response.StreamSid
106133
};
107134

108-
if (response.Event == "start")
109-
{
110-
var startResponse = JsonSerializer.Deserialize<StreamEventStartResponse>(receivedText);
111-
conn.LatestMediaTimestamp = 0;
112-
conn.ResponseStartTimestamp = null;
113-
conn.Data = JsonSerializer.Serialize(startResponse.Body.CustomParameters);
114-
}
115-
else if (response.Event == "media")
116-
{
117-
var mediaResponse = JsonSerializer.Deserialize<StreamEventMediaResponse>(receivedText);
118-
conn.LatestMediaTimestamp = long.Parse(mediaResponse.Body.Timestamp);
119-
conn.Data = mediaResponse.Body.Payload;
120-
}
121-
else if (response.Event == "dtmf")
122-
{
123-
var dtmfResponse = JsonSerializer.Deserialize<StreamEventDtmfResponse>(receivedText);
124-
if (dtmfResponse.Body.Digit == "#")
125-
{
126-
conn.Event = "user_dtmf_received";
127-
conn.Data = conn.KeypadInputBuffer;
128-
conn.KeypadInputBuffer = string.Empty;
129-
}
130-
else
131-
{
132-
conn.KeypadInputBuffer += dtmfResponse.Body.Digit;
133-
}
134-
}
135-
136135
return conn;
137136
});
138137
}

0 commit comments

Comments
 (0)