diff --git a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs index 6a04e7964..ddf985b42 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Conversations/IConversationService.cs @@ -61,4 +61,6 @@ Task SendMessage(string agentId, Task GetConversationRecordOrCreateNew(string agentId); bool IsConversationMode(); + + void SaveStates(); } diff --git a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabEventSubscription.cs b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabEventSubscription.cs index fa3b372d3..099b93120 100644 --- a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabEventSubscription.cs +++ b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabEventSubscription.cs @@ -1,7 +1,6 @@ using BotSharp.Abstraction.Infrastructures.Events; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using System.Runtime.InteropServices; namespace BotSharp.Core.Crontab.Services; @@ -22,6 +21,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) using (var scope = _services.CreateScope()) { + var publisher = scope.ServiceProvider.GetRequiredService(); var subscriber = scope.ServiceProvider.GetRequiredService(); var cron = scope.ServiceProvider.GetRequiredService(); var crons = await cron.GetCrontable(); @@ -29,15 +29,20 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _ = Task.Run(async () => { + // Clean unhandled messages + await publisher.RemoveAsync($"Crontab:{item.Title}", count: 100); + await subscriber.SubscribeAsync($"Crontab:{item.Title}", "Crontab", port: 0, - priorityEnabled: false, async (sender, args) => + priorityEnabled: false, + async (sender, args) => { var scope = _services.CreateScope(); cron = scope.ServiceProvider.GetRequiredService(); await cron.ScheduledTimeArrived(item); - }, stoppingToken: stoppingToken); + }, + stoppingToken: stoppingToken); }); } } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/IRuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/IRuleEngine.cs index 45eae430c..2d4c11118 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Engines/IRuleEngine.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/IRuleEngine.cs @@ -1,6 +1,8 @@ +using BotSharp.Abstraction.Models; + namespace BotSharp.Core.Rules.Engines; public interface IRuleEngine { - Task Triggered(IRuleTrigger trigger, string data); + Task Triggered(IRuleTrigger trigger, string data, List? states = null); } diff --git a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs index feea122fa..f545522fe 100644 --- a/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs +++ b/src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs @@ -18,7 +18,7 @@ public RuleEngine(IServiceProvider services, ILogger logger) _logger = logger; } - public async Task Triggered(IRuleTrigger trigger, string data) + public async Task Triggered(IRuleTrigger trigger, string data, List? states = null) { // Pull all user defined rules var agentService = _services.GetRequiredService(); @@ -36,10 +36,11 @@ public async Task Triggered(IRuleTrigger trigger, string data) // Trigger the agents var instructService = _services.GetRequiredService(); - var convService = _services.GetRequiredService(); + foreach (var agent in preFilteredAgents) { + var convService = _services.GetRequiredService(); var conv = await convService.NewConversation(new Conversation { Channel = trigger.Channel, @@ -49,18 +50,25 @@ public async Task Triggered(IRuleTrigger trigger, string data) var message = new RoleDialogModel(AgentRole.User, data); - var states = new List + var allStates = new List { - new("channel", trigger.Channel), - new("channel_id", trigger.EntityId) + new("channel", trigger.Channel) }; - convService.SetConversationId(conv.Id, states); + + if (states != null) + { + allStates.AddRange(states); + } + + convService.SetConversationId(conv.Id, allStates); await convService.SendMessage(agent.Id, message, null, msg => Task.CompletedTask); + convService.SaveStates(); + /*foreach (var rule in agent.Rules) { var userSay = $"===Input data with Before and After values===\r\n{data}\r\n\r\n===Trigger Criteria===\r\n{rule.Criteria}\r\n\r\nJust output 1 or 0 without explanation: "; diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.TruncateMessage.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.TruncateMessage.cs index 1a75d7179..05d04f253 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.TruncateMessage.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.TruncateMessage.cs @@ -1,6 +1,6 @@ namespace BotSharp.Core.Conversations.Services; -public partial class ConversationService : IConversationService +public partial class ConversationService { public async Task TruncateConversation(string conversationId, string messageId, string? newMessageId = null) { diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.UpdateBreakpoint.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.UpdateBreakpoint.cs index 75a0bbfb1..203fa8851 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.UpdateBreakpoint.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.UpdateBreakpoint.cs @@ -2,7 +2,7 @@ namespace BotSharp.Core.Conversations.Services; -public partial class ConversationService : IConversationService +public partial class ConversationService { public async Task UpdateBreakpoint(bool resetStates = false, string? reason = null, params string[] excludedStates) { diff --git a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs index 7e498afec..0c02918e2 100644 --- a/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs +++ b/src/Infrastructure/BotSharp.Core/Conversations/Services/ConversationService.cs @@ -216,4 +216,9 @@ public bool IsConversationMode() var agent = db.GetAgent(routingCtx.EntryAgentId, basicsOnly: true); return agent?.MaxMessageCount; } + + public void SaveStates() + { + _state.Save(); + } } diff --git a/src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs b/src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs index 5c49f328d..56cefcc10 100644 --- a/src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs +++ b/src/Infrastructure/BotSharp.Core/Infrastructures/Events/RedisPublisher.cs @@ -182,7 +182,10 @@ public async Task RemoveAsync(string channel, int count = 10) var db = _redis.GetDatabase(); var entries = await db.StreamRangeAsync(channel, "-", "+", count: count, messageOrder: Order.Ascending); - var deletedCount = await db.StreamDeleteAsync(channel, entries.Select(x => x.Id).ToArray()); - _logger.LogWarning($"Deleted {deletedCount} messages from Redis stream {channel}"); + if (entries.Length > 0) + { + var deletedCount = await db.StreamDeleteAsync(channel, entries.Select(x => x.Id).ToArray()); + _logger.LogWarning($"Deleted {deletedCount} messages from Redis stream {channel}"); + } } }