* Added Result class to communicate with router from handler

* Removed "ExecuteOnlyFirstFoundHanlder" in sake of testing new Result pattern based routing system
* Removed obsolete option property "DescendDescriptorIndex"
* Changed router logic
* Changed handlers pool logic
This commit is contained in:
2025-08-02 02:32:38 +04:00
parent b8e4398b50
commit 16d11990ec
26 changed files with 347 additions and 115 deletions
+52
View File
@@ -0,0 +1,52 @@
using System.Collections;
using System.Collections.Concurrent;
namespace Telegrator.Polling
{
public class LimitedDictionary<TKey, TValue> : IEnumerable<KeyValuePair<TKey, TValue>>, IDisposable
{
private readonly int? _maximum;
private readonly SemaphoreSlim _semaphore = null!;
private readonly ConcurrentDictionary<TKey, TValue> _dict = [];
public LimitedDictionary(int? maximum)
{
_maximum = maximum;
if (maximum != null)
{
int value = maximum.Value;
_semaphore = new SemaphoreSlim(value, value);
}
}
public async Task<bool> Add(TKey key, TValue value, CancellationToken cancellationToken)
{
if (_semaphore != null)
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
return _dict.TryAdd(key, value);
}
public bool Remove(TKey key, out TValue result)
{
if (_dict.TryRemove(key, out result))
{
_semaphore?.Release(1);
return true;
}
return false;
}
public IEnumerator<KeyValuePair<TKey, TValue>> GetEnumerator() => _dict.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => _dict.GetEnumerator();
/// <inheritdoc/>
public void Dispose()
{
GC.SuppressFinalize(this);
_semaphore.Dispose();
}
}
}
+42
View File
@@ -0,0 +1,42 @@
using System.Collections.Concurrent;
namespace Telegrator.Polling
{
public class LimitedQueue<T>
{
private readonly int? _maximum;
private readonly ConcurrentQueue<T> _queue = [];
private readonly SemaphoreSlim _semaphore = null!;
public LimitedQueue(int? maximum)
{
_maximum = maximum;
if (maximum != null)
{
int value = maximum.Value;
_semaphore = new SemaphoreSlim(value, value);
}
}
public async Task Enqueue(T item, CancellationToken cancellationToken)
{
if (_maximum.HasValue)
await _semaphore.WaitAsync(cancellationToken);
_queue.Enqueue(item);
}
public bool Dequeue(out T result)
{
if (_queue.TryDequeue(out result))
{
if (_maximum.HasValue)
_semaphore?.Release(1);
return true;
}
return false;
}
}
}
+55 -5
View File
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using Telegrator.Handlers;
using Telegrator.MadiatorCore;
using Telegrator.MadiatorCore.Descriptors;
@@ -15,6 +16,7 @@ namespace Telegrator.Polling
/// </summary>
protected object SyncObj = new object();
/*
/// <summary>
/// Event that signals when awaiting handlers are queued.
/// </summary>
@@ -34,6 +36,13 @@ namespace Telegrator.Polling
/// Dictionary for tracking currently executing handlers.
/// </summary>
protected readonly ConcurrentDictionary<HandlerLifetimeToken, Task> ExecutingHandlersPool = [];
*/
//protected readonly ConcurrentDictionary<Type, LimitedQueue<DescribedHandlerInfo>> AwaitingHandlersQueue;
//protected readonly LimitedDictionary<HandlerLifetimeToken, Task> ExecutingHandlersPool;
protected SemaphoreSlim ExecutingHandlersSemaphore = null!;
/// <summary>
/// The bot configuration options.
@@ -65,26 +74,62 @@ namespace Telegrator.Polling
{
Options = options;
GlobalCancellationToken = globalCancellationToken;
//AwaitingHandlersQueue = new ConcurrentDictionary<Type, LimitedQueue<DescribedHandlerInfo>>();
//ExecutingHandlersPool = new LimitedDictionary<HandlerLifetimeToken, Task>(options.MaximumParallelWorkingHandlers);
if (options.MaximumParallelWorkingHandlers != null)
{
ExecutingHandlersSemaphore = new SemaphoreSlim(options.MaximumParallelWorkingHandlers ?? 0);
AwaitingHandlersQueuedEvent = new ManualResetEventSlim(false);
ExecutingHandlersSemaphore = new SemaphoreSlim(options.MaximumParallelWorkingHandlers.Value);
//AwaitingHandlersQueuedEvent = new ManualResetEventSlim(false);
}
/*
if (Options.MaximumParallelWorkingHandlers != null)
HandlersCheckpoint();
*/
}
/// <inheritdoc/>
public void Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
public async Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
{
handlers.ForEach(Enqueue);
}
//handlers.ForEach(Enqueue);
Result? lastResult = null;
foreach (DescribedHandlerInfo handlerInfo in handlers)
{
if (lastResult?.NextType != null)
{
if (lastResult.NextType != handlerInfo.HandlerInstance.GetType())
continue;
}
if (ExecutingHandlersSemaphore != null)
{
await ExecutingHandlersSemaphore.WaitAsync();
}
try
{
HandlerExecuting?.Invoke(handlerInfo);
lastResult = await handlerInfo.Execute(GlobalCancellationToken);
ExecutingHandlersSemaphore?.Release(1);
}
catch (OperationCanceledException)
{
break;
}
if (!lastResult.RouteNext)
break;
}
}
/*
/// <inheritdoc/>
public void Enqueue(DescribedHandlerInfo handlerInfo)
{
throw new NotImplementedException();
if (Options.MaximumParallelWorkingHandlers == null)
{
Task.Run(async () => await ExecuteHandlerWrapper(handlerInfo));
@@ -111,7 +156,9 @@ namespace Telegrator.Polling
ExecutingHandlersSemaphore.Release(1);
}
}
*/
/*
/// <summary>
/// Main checkpoint method that manages handler execution in a loop.
/// Continuously processes queued handlers while respecting concurrency limits.
@@ -206,6 +253,7 @@ namespace Telegrator.Polling
return AwaitingHandlersQueue.TryDequeue(out enqueuedHandler);
}
}
*/
/// <summary>
/// Disposes of the handlers pool and releases all resources.
@@ -221,11 +269,13 @@ namespace Telegrator.Polling
ExecutingHandlersSemaphore = null!;
}
/*
if (AwaitingHandlersQueuedEvent != null)
{
AwaitingHandlersQueuedEvent.Dispose();
AwaitingHandlersQueuedEvent = null!;
}
*/
if (SyncObj != null)
SyncObj = null!;
+12 -11
View File
@@ -96,7 +96,7 @@ namespace Telegrator.Polling
/// <param name="update">The update to handle.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous update handling operation.</returns>
public virtual Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
public virtual async Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
{
// Logging
Alligator.RouterWriteLine("Received Update ({0}) of type \"{1}\"", update.Id, update.Type);
@@ -109,31 +109,28 @@ namespace Telegrator.Polling
if (handlers.Any())
{
// Enqueuing found awiting handlers
HandlersPool.Enqueue(handlers);
await HandlersPool.Enqueue(handlers);
// Chicking if awaiting handlers has exclusive routing
if (Options.ExclusiveAwaitingHandlerRouting)
{
Alligator.RouterWriteLine("Receiving Update ({0}) completed with only awaiting handlers", update.Id);
return Task.CompletedTask;
return;
}
}
// Queuing reagular handlers for execution
HandlersPool.Enqueue(GetHandlers(HandlersProvider, this, botClient, update, cancellationToken));
await HandlersPool.Enqueue(GetHandlers(HandlersProvider, this, botClient, update, cancellationToken));
Alligator.RouterWriteLine("Receiving Update ({0}) finished", update.Id);
return Task.CompletedTask;
}
catch (OperationCanceledException)
{
Alligator.RouterWriteLine("Receiving Update ({0}) cancelled", update.Id);
return Task.CompletedTask;
}
catch (Exception ex)
{
Alligator.RouterWriteLine("Receiving Update ({0}) finished with exception {1}", update.Id, ex.Message);
ExceptionHandler?.HandleException(botClient, ex, HandleErrorSource.PollingError, cancellationToken);
return Task.CompletedTask;
}
}
@@ -162,10 +159,11 @@ namespace Telegrator.Polling
return [];
}
IEnumerable<DescribedHandlerInfo> described = DescribeDescriptors(provider, descriptors, updateRouter, client, update, cancellationToken);
Alligator.RouterWriteLine("Described total of {0} handlers for Update ({1}) from {2} provider", described.Count(), update.Id, provider.GetType().Name);
Alligator.RouterWriteLine("Described handlers : {0}", string.Join(", ", described));
return described;
//IEnumerable<DescribedHandlerInfo> described = DescribeDescriptors(provider, descriptors, updateRouter, client, update, cancellationToken);
//Alligator.RouterWriteLine("Described total of {0} handlers for Update ({1}) from {2} provider", described.Count(), update.Id, provider.GetType().Name);
//Alligator.RouterWriteLine("Described handlers : {0}", string.Join(", ", described));
return DescribeDescriptors(provider, descriptors, updateRouter, client, update, cancellationToken);
}
/// <summary>
@@ -192,8 +190,11 @@ namespace Telegrator.Polling
continue;
yield return describedHandler;
/*
if (Options.ExecuteOnlyFirstFoundHanlder)
break;
*/
}
}
finally