Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,6 @@ Task<bool> SendMessage(string agentId,
Task<Conversation> GetConversationRecordOrCreateNew(string agentId);

bool IsConversationMode();

void SaveStates();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using BotSharp.Abstraction.Infrastructures.Events;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Runtime.InteropServices;

namespace BotSharp.Core.Crontab.Services;

Expand All @@ -22,22 +21,28 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

using (var scope = _services.CreateScope())
{
var publisher = scope.ServiceProvider.GetRequiredService<IEventPublisher>();
var subscriber = scope.ServiceProvider.GetRequiredService<IEventSubscriber>();
var cron = scope.ServiceProvider.GetRequiredService<ICrontabService>();
var crons = await cron.GetCrontable();
foreach (var item in crons)
{
_ = Task.Run(async () =>
{
// Clean unhandled messages
await publisher.RemoveAsync($"Crontab:{item.Title}", count: 100);

await subscriber.SubscribeAsync($"Crontab:{item.Title}",
"Crontab",
port: 0,
priorityEnabled: false, async (sender, args) =>
priorityEnabled: false,
async (sender, args) =>
{
var scope = _services.CreateScope();
cron = scope.ServiceProvider.GetRequiredService<ICrontabService>();
await cron.ScheduledTimeArrived(item);
}, stoppingToken: stoppingToken);
},
stoppingToken: stoppingToken);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using BotSharp.Abstraction.Models;

namespace BotSharp.Core.Rules.Engines;

public interface IRuleEngine
{
Task Triggered(IRuleTrigger trigger, string data);
Task Triggered(IRuleTrigger trigger, string data, List<MessageState>? states = null);
}
20 changes: 14 additions & 6 deletions src/Infrastructure/BotSharp.Core.Rules/Engines/RuleEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public RuleEngine(IServiceProvider services, ILogger<RuleEngine> logger)
_logger = logger;
}

public async Task Triggered(IRuleTrigger trigger, string data)
public async Task Triggered(IRuleTrigger trigger, string data, List<MessageState>? states = null)
{
// Pull all user defined rules
var agentService = _services.GetRequiredService<IAgentService>();
Expand All @@ -36,10 +36,11 @@ public async Task Triggered(IRuleTrigger trigger, string data)

// Trigger the agents
var instructService = _services.GetRequiredService<IInstructService>();
var convService = _services.GetRequiredService<IConversationService>();


foreach (var agent in preFilteredAgents)
{
var convService = _services.GetRequiredService<IConversationService>();
var conv = await convService.NewConversation(new Conversation
{
Channel = trigger.Channel,
Expand All @@ -49,18 +50,25 @@ public async Task Triggered(IRuleTrigger trigger, string data)

var message = new RoleDialogModel(AgentRole.User, data);

var states = new List<MessageState>
var allStates = new List<MessageState>
{
new("channel", trigger.Channel),
new("channel_id", trigger.EntityId)
new("channel", trigger.Channel)
};
convService.SetConversationId(conv.Id, states);

if (states != null)
{
allStates.AddRange(states);
}

convService.SetConversationId(conv.Id, allStates);

await convService.SendMessage(agent.Id,
message,
null,
msg => Task.CompletedTask);

convService.SaveStates();

/*foreach (var rule in agent.Rules)
{
var userSay = $"===Input data with Before and After values===\r\n{data}\r\n\r\n===Trigger Criteria===\r\n{rule.Criteria}\r\n\r\nJust output 1 or 0 without explanation: ";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace BotSharp.Core.Conversations.Services;

public partial class ConversationService : IConversationService
public partial class ConversationService
{
public async Task<bool> TruncateConversation(string conversationId, string messageId, string? newMessageId = null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace BotSharp.Core.Conversations.Services;

public partial class ConversationService : IConversationService
public partial class ConversationService
{
public async Task UpdateBreakpoint(bool resetStates = false, string? reason = null, params string[] excludedStates)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,9 @@ public bool IsConversationMode()
var agent = db.GetAgent(routingCtx.EntryAgentId, basicsOnly: true);
return agent?.MaxMessageCount;
}

public void SaveStates()
{
_state.Save();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ public async Task RemoveAsync(string channel, int count = 10)
var db = _redis.GetDatabase();

var entries = await db.StreamRangeAsync(channel, "-", "+", count: count, messageOrder: Order.Ascending);
var deletedCount = await db.StreamDeleteAsync(channel, entries.Select(x => x.Id).ToArray());
_logger.LogWarning($"Deleted {deletedCount} messages from Redis stream {channel}");
if (entries.Length > 0)
{
var deletedCount = await db.StreamDeleteAsync(channel, entries.Select(x => x.Id).ToArray());
_logger.LogWarning($"Deleted {deletedCount} messages from Redis stream {channel}");
}
}
}