Skip to content

Commit 1275053

Browse files
author
Jicheng Lu
committed
migrate conv latest states
1 parent c4d2f5b commit 1275053

8 files changed

Lines changed: 215 additions & 1 deletion

File tree

src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,6 @@ Task<bool> SendMessage(string agentId,
7070
/// <param name="preLoad">if pre-loading, then keys are not filter by the search query</param>
7171
/// <returns></returns>
7272
Task<List<string>> GetConversationStateSearhKeys(string query, int convLimit = 100, int keyLimit = 10, bool preload = false);
73+
74+
Task<bool> MigrateLatestStates(int batchSize = 100, int errorLimit = 10);
7375
}

src/Infrastructure/BotSharp.Abstraction/Repositories/IBotSharpRepository.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ List<string> TruncateConversation(string conversationId, string messageId, bool
151151
=> throw new NotImplementedException();
152152
List<string> GetConversationStateSearchKeys(int messageLowerLimit = 2, int convUpperLimit = 100)
153153
=> throw new NotImplementedException();
154+
List<string> GetConversationsToMigrate(int batchSize = 100)
155+
=> throw new NotImplementedException();
156+
bool MigrateConvsersationLatestStates(string conversationId)
157+
=> throw new NotImplementedException();
154158
#endregion
155159

156160
#region LLM Completion Log
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using NetTopologySuite.Algorithm;
2+
using System.Diagnostics;
3+
4+
namespace BotSharp.Core.Conversations.Services;
5+
6+
public partial class ConversationService
7+
{
8+
public async Task<bool> MigrateLatestStates(int batchSize = 100, int errorLimit = 10)
9+
{
10+
var db = _services.GetRequiredService<IBotSharpRepository>();
11+
var isSuccess = true;
12+
var errorCount = 0;
13+
var batchNum = 0;
14+
var info = string.Empty;
15+
var error = string.Empty;
16+
17+
#if DEBUG
18+
Console.WriteLine($"\r\n#Start migrating Conversation Latest States...\r\n");
19+
#else
20+
_logger.LogInformation($"#Start migrating Conversation Latest States...");
21+
#endif
22+
var sw = Stopwatch.StartNew();
23+
24+
var convIds = db.GetConversationsToMigrate(batchSize);
25+
26+
while (!convIds.IsNullOrEmpty())
27+
{
28+
batchNum++;
29+
var innerSw = Stopwatch.StartNew();
30+
#if DEBUG
31+
Console.WriteLine($"\r\n#Start migrating Conversation Latest States (batch number: {batchNum})\r\n");
32+
#else
33+
_logger.LogInformation($"#Start migrating Conversation Latest States (batch number: {batchNum})");
34+
#endif
35+
36+
for (int i = 0; i < convIds.Count; i++)
37+
{
38+
var convId = convIds.ElementAt(i);
39+
try
40+
{
41+
var done = db.MigrateConvsersationLatestStates(convId);
42+
info = $"Conversation {convId} latest states have been migrated ({i + 1}/{convIds.Count})!";
43+
#if DEBUG
44+
Console.WriteLine($"\r\n{info}\r\n");
45+
#else
46+
_logger.LogInformation($"{info}");
47+
#endif
48+
}
49+
catch (Exception ex)
50+
{
51+
errorCount++;
52+
error = $"Conversation {convId} latest states fail to be migrated! ({i + 1}/{convIds.Count})\r\n{ex.Message}\r\n{ex.InnerException}";
53+
#if DEBUG
54+
Console.WriteLine($"\r\n{error}\r\n");
55+
#else
56+
_logger.LogError($"{error}");
57+
#endif
58+
}
59+
}
60+
61+
if (errorCount >= errorLimit)
62+
{
63+
error = $"\r\nErrors exceed limit => stop the migration!\r\n";
64+
#if DEBUG
65+
Console.WriteLine($"{error}");
66+
#else
67+
_logger.LogError($"{error}");
68+
#endif
69+
innerSw.Stop();
70+
isSuccess = false;
71+
break;
72+
}
73+
74+
innerSw.Stop();
75+
info = $"#Done migrating Conversation Latest States (batch number: {batchNum}) " +
76+
$"(Total time: {innerSw.Elapsed.Hours} hrs, {innerSw.Elapsed.Minutes} mins, {innerSw.Elapsed.Seconds} seconds)";
77+
#if DEBUG
78+
Console.WriteLine($"\r\n{info}\r\n");
79+
#else
80+
_logger.LogInformation($"{info}");
81+
#endif
82+
83+
await Task.Delay(100);
84+
convIds = db.GetConversationsToMigrate(batchSize);
85+
}
86+
87+
sw.Stop();
88+
info = $"#Done with migrating Conversation Latest States! " +
89+
$"(Total time: {sw.Elapsed.Days} days, {sw.Elapsed.Hours} hrs, {sw.Elapsed.Minutes} mins, {sw.Elapsed.Seconds} seconds)";
90+
#if DEBUG
91+
Console.WriteLine($"\r\n{info}\r\n");
92+
#else
93+
_logger.LogInformation($"{info}");
94+
#endif
95+
96+
return isSuccess;
97+
}
98+
}

src/Infrastructure/BotSharp.Core/Loggers/Services/LoggerService.Instruction.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public async Task<PagedItems<InstructionLogModel>> GetInstructionLogs(InstructLo
4141

4242
var items = logs.Items.Select(x =>
4343
{
44-
x.AgentId = !string.IsNullOrEmpty(x.AgentId) ? agents.FirstOrDefault(a => a.Id == x.AgentId)?.Name : null;
44+
x.AgentName = !string.IsNullOrEmpty(x.AgentId) ? agents.FirstOrDefault(a => a.Id == x.AgentId)?.Name : null;
4545

4646
if (!isAdmin)
4747
{

src/Infrastructure/BotSharp.Core/Repository/FileRepository/FileRepository.Conversation.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,56 @@ public List<string> GetConversationStateSearchKeys(int messageLowerLimit = 2, in
687687
}
688688

689689

690+
691+
public List<string> GetConversationsToMigrate(int batchSize = 100)
692+
{
693+
var baseDir = Path.Combine(_dbSettings.FileRepository, _conversationSettings.DataDir);
694+
if (!Directory.Exists(baseDir)) return [];
695+
696+
var convIds = new List<string>();
697+
var dirs = Directory.GetDirectories(baseDir);
698+
699+
foreach (var dir in dirs)
700+
{
701+
var latestStateFile = Path.Combine(dir, CONV_LATEST_STATE_FILE);
702+
if (File.Exists(latestStateFile)) continue;
703+
704+
var convId = dir.Split(Path.DirectorySeparatorChar).Last();
705+
if (string.IsNullOrEmpty(convId)) continue;
706+
707+
convIds.Add(convId);
708+
if (convIds.Count >= batchSize)
709+
{
710+
break;
711+
}
712+
}
713+
714+
return convIds;
715+
}
716+
717+
718+
public bool MigrateConvsersationLatestStates(string conversationId)
719+
{
720+
if (string.IsNullOrEmpty(conversationId)) return false;
721+
722+
var convDir = FindConversationDirectory(conversationId);
723+
if (string.IsNullOrEmpty(convDir))
724+
{
725+
return false;
726+
}
727+
728+
var stateFile = Path.Combine(convDir, STATE_FILE);
729+
var states = CollectConversationStates(stateFile);
730+
var latestStates = BuildLatestStates(states);
731+
732+
var latestStateFile = Path.Combine(convDir, CONV_LATEST_STATE_FILE);
733+
var stateStr = JsonSerializer.Serialize(latestStates, _options);
734+
File.WriteAllText(latestStateFile, stateStr);
735+
736+
return true;
737+
}
738+
739+
690740
#region Private methods
691741
private string? FindConversationDirectory(string conversationId)
692742
{
@@ -883,6 +933,11 @@ private Dictionary<string, JsonDocument> CollectConversationLatestStates(string
883933
private Dictionary<string, JsonDocument> BuildLatestStates(List<StateKeyValue> states)
884934
{
885935
var endNodes = new Dictionary<string, JsonDocument>();
936+
if (states.IsNullOrEmpty())
937+
{
938+
return endNodes;
939+
}
940+
886941
foreach (var pair in states)
887942
{
888943
var value = pair.Values?.LastOrDefault();

src/Infrastructure/BotSharp.OpenAPI/Controllers/ConversationController.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,16 @@ public async Task<List<string>> GetConversationStateKeys([FromQuery] string quer
563563
}
564564
#endregion
565565

566+
#region Migrate Latest States
567+
[HttpPost("/conversation/latest-state/migrate")]
568+
public async Task<bool> MigrateConversationLatestStates([FromBody] MigrateLatestStateRequest request)
569+
{
570+
var convService = _services.GetRequiredService<IConversationService>();
571+
var res = await convService.MigrateLatestStates(request.BatchSize, request.ErrorLimit);
572+
return res;
573+
}
574+
#endregion
575+
566576
#region Private methods
567577
private void SetStates(IConversationService conv, NewMessageModel input)
568578
{
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace BotSharp.OpenAPI.ViewModels.Conversations;
2+
3+
public class MigrateLatestStateRequest
4+
{
5+
public int BatchSize { get; set; } = 1000;
6+
public int ErrorLimit { get; set; } = 10;
7+
}

src/Plugins/BotSharp.Plugin.MongoStorage/Repository/MongoRepository.Conversation.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using BotSharp.Abstraction.Conversations.Models;
22
using BotSharp.Abstraction.Repositories.Filters;
3+
using MongoDB.Driver;
34
using System.Text.Json;
45

56
namespace BotSharp.Plugin.MongoStorage.Repository;
@@ -661,6 +662,37 @@ public List<string> GetConversationStateSearchKeys(int messageLowerLimit = 2, in
661662
return keys;
662663
}
663664

665+
666+
667+
public List<string> GetConversationsToMigrate(int batchSize = 100)
668+
{
669+
var convFilter = Builders<ConversationDocument>.Filter.Exists(x => x.LatestStates, false);
670+
var sortDef = Builders<ConversationDocument>.Sort.Ascending(x => x.CreatedTime);
671+
var convIds = _dc.Conversations.Find(convFilter).Sort(sortDef)
672+
.Limit(batchSize).ToEnumerable()
673+
.Select(x => x.Id).ToList();
674+
return convIds ?? [];
675+
}
676+
677+
public bool MigrateConvsersationLatestStates(string conversationId)
678+
{
679+
if (string.IsNullOrEmpty(conversationId)) return false;
680+
681+
var stateFilter = Builders<ConversationStateDocument>.Filter.Eq(x => x.ConversationId, conversationId);
682+
var foundStates = _dc.ConversationStates.Find(stateFilter).FirstOrDefault();
683+
if (foundStates?.States == null) return false;
684+
685+
var states = foundStates.States.ToList();
686+
var latestStates = BuildLatestStates(states);
687+
688+
var convFilter = Builders<ConversationDocument>.Filter.Eq(x => x.Id, conversationId);
689+
var convUpdate = Builders<ConversationDocument>.Update.Set(x => x.LatestStates, latestStates);
690+
_dc.Conversations.UpdateOne(convFilter, convUpdate);
691+
692+
return true;
693+
}
694+
695+
#region Private methods
664696
private string ConvertSnakeCaseToPascalCase(string snakeCase)
665697
{
666698
string[] words = snakeCase.Split('_');
@@ -682,6 +714,11 @@ private string ConvertSnakeCaseToPascalCase(string snakeCase)
682714
private Dictionary<string, BsonDocument> BuildLatestStates(List<StateMongoElement> states)
683715
{
684716
var endNodes = new Dictionary<string, BsonDocument>();
717+
if (states.IsNullOrEmpty())
718+
{
719+
return endNodes;
720+
}
721+
685722
foreach (var pair in states)
686723
{
687724
var value = pair.Values?.LastOrDefault();
@@ -703,4 +740,5 @@ private Dictionary<string, BsonDocument> BuildLatestStates(List<StateMongoElemen
703740

704741
return endNodes;
705742
}
743+
#endregion
706744
}

0 commit comments

Comments
 (0)