* Renamed DescribeHandlerInfo to DescribedHandlerDescriptor

* Added AwaitResult and ReportResult methods to DescribedHandlerDescriptor
* Moved DontCollectAttribute and MightAwaitAttribute to Annotations namespace
* Rewrote UpdateRouter and UpdateHandlersPool
* Fixed Synchronous handlers routing bug
* Code cleanup
This commit is contained in:
2026-03-06 18:51:10 +04:00
parent 21ec3e5f11
commit a7fe90ac50
26 changed files with 221 additions and 140 deletions
@@ -27,7 +27,7 @@ namespace Telegrator.Hosting.Polling
{ {
logger.LogInformation("Starting receiving updates via long-polling"); logger.LogInformation("Starting receiving updates via long-polling");
_receiverOptions.AllowedUpdates = botHost.UpdateRouter.HandlersProvider.AllowedTypes.ToArray(); _receiverOptions.AllowedUpdates = botHost.UpdateRouter.HandlersProvider.AllowedTypes.ToArray();
ReactiveUpdateReceiver updateReceiver = new ReactiveUpdateReceiver(botClient, _receiverOptions); DefaultUpdateReceiver updateReceiver = new DefaultUpdateReceiver(botClient, _receiverOptions);
await updateReceiver.ReceiveAsync(_updateRouter, stoppingToken).ConfigureAwait(false); await updateReceiver.ReceiveAsync(_updateRouter, stoppingToken).ConfigureAwait(false);
} }
} }
@@ -1,4 +1,4 @@
namespace Telegrator.Attributes namespace Telegrator.Annotations
{ {
/// <summary> /// <summary>
/// Attribute that prevents a class from being automatically collected by the handler collection system. /// Attribute that prevents a class from being automatically collected by the handler collection system.
@@ -1,6 +1,6 @@
using Telegram.Bot.Types.Enums; using Telegram.Bot.Types.Enums;
namespace Telegrator.Attributes namespace Telegrator.Annotations
{ {
/// <summary> /// <summary>
/// Attribute that says if this handler can await some of await types, that is not listed by its handler base. /// Attribute that says if this handler can await some of await types, that is not listed by its handler base.
@@ -14,7 +14,10 @@ namespace Telegrator.Annotations.Targetted
/// Creates new instance of <see cref="WelcomeAttribute"/> /// Creates new instance of <see cref="WelcomeAttribute"/>
/// </summary> /// </summary>
/// <param name="onlyFirst"></param> /// <param name="onlyFirst"></param>
public WelcomeAttribute(bool onlyFirst = false) : base(new MessageChatTypeFilter(ChatType.Private), new CommandAlliasFilter("start"), Filter<Message>.If(ctx => !onlyFirst || ctx.Input.Id == 0)) public WelcomeAttribute(bool onlyFirst = false) : base(
new MessageChatTypeFilter(ChatType.Private),
new CommandAlliasFilter("start"),
Filter<Message>.If(ctx => !onlyFirst || ctx.Input.Id == 0))
{ } { }
} }
} }
+2 -2
View File
@@ -36,14 +36,14 @@ namespace Telegrator
/// <summary> /// <summary>
/// The handler info associated with the faulted handler. /// The handler info associated with the faulted handler.
/// </summary> /// </summary>
public readonly DescribedHandlerInfo HandlerInfo; public readonly DescribedHandlerDescriptor HandlerInfo;
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="HandlerFaultedException"/> class. /// Initializes a new instance of the <see cref="HandlerFaultedException"/> class.
/// </summary> /// </summary>
/// <param name="handlerInfo">The handler info.</param> /// <param name="handlerInfo">The handler info.</param>
/// <param name="inner">The inner exception.</param> /// <param name="inner">The inner exception.</param>
public HandlerFaultedException(DescribedHandlerInfo handlerInfo, Exception inner) public HandlerFaultedException(DescribedHandlerDescriptor handlerInfo, Exception inner)
: base(string.Format("Handler's \"{0}\" execution was faulted", handlerInfo.DisplayString), inner) : base(string.Format("Handler's \"{0}\" execution was faulted", handlerInfo.DisplayString), inner)
{ {
HandlerInfo = handlerInfo; HandlerInfo = handlerInfo;
@@ -42,8 +42,7 @@ namespace Telegrator.Filters.Components
public static AnonymousTypeFilter Compile<T>(IFilter<T> filter, Func<Update, T?> getFilterringTarget) where T : class public static AnonymousTypeFilter Compile<T>(IFilter<T> filter, Func<Update, T?> getFilterringTarget) where T : class
{ {
return new AnonymousTypeFilter( return new AnonymousTypeFilter(
filter.GetType().Name, filter.GetType().Name, getFilterringTarget,
getFilterringTarget,
(context, filterringTarget) => CanPassInternal(context, filter, filterringTarget)); (context, filterringTarget) => CanPassInternal(context, filter, filterringTarget));
} }
@@ -37,7 +37,7 @@ namespace Telegrator.Handlers.Building
/// </summary> /// </summary>
/// <param name="describedHandler">The handler information containing the update.</param> /// <param name="describedHandler">The handler information containing the update.</param>
/// <returns>An empty handler container.</returns> /// <returns>An empty handler container.</returns>
public IHandlerContainer CreateContainer(DescribedHandlerInfo describedHandler) public IHandlerContainer CreateContainer(DescribedHandlerDescriptor describedHandler)
{ {
HandlingUpdate = describedHandler.HandlingUpdate; HandlingUpdate = describedHandler.HandlingUpdate;
return new EmptyHandlerContainer(); return new EmptyHandlerContainer();
@@ -62,7 +62,7 @@ namespace Telegrator.Handlers.Components
/// </summary> /// </summary>
/// <param name="handlerInfo">The handler descriptor info.</param> /// <param name="handlerInfo">The handler descriptor info.</param>
/// <returns>The created handler container.</returns> /// <returns>The created handler container.</returns>
public virtual IHandlerContainer CreateContainer(DescribedHandlerInfo handlerInfo) public virtual IHandlerContainer CreateContainer(DescribedHandlerDescriptor handlerInfo)
{ {
return new HandlerContainer<TUpdate>(handlerInfo); return new HandlerContainer<TUpdate>(handlerInfo);
} }
@@ -106,7 +106,7 @@ namespace Telegrator.Handlers.Components
/// <param name="handlerInfo">The handler information.</param> /// <param name="handlerInfo">The handler information.</param>
/// <returns>A handler container for this branching handler.</returns> /// <returns>A handler container for this branching handler.</returns>
/// <exception cref="Exception">Thrown when the awaiting provider is not of the expected type.</exception> /// <exception cref="Exception">Thrown when the awaiting provider is not of the expected type.</exception>
public override IHandlerContainer CreateContainer(DescribedHandlerInfo handlerInfo) public override IHandlerContainer CreateContainer(DescribedHandlerDescriptor handlerInfo)
{ {
return new HandlerContainer<TUpdate>(handlerInfo); return new HandlerContainer<TUpdate>(handlerInfo);
} }
@@ -12,8 +12,8 @@ namespace Telegrator.Handlers.Components
/// <summary> /// <summary>
/// Creates a new <see cref="IHandlerContainer"/> for the specified awaiting provider and handler info. /// Creates a new <see cref="IHandlerContainer"/> for the specified awaiting provider and handler info.
/// </summary> /// </summary>
/// <param name="handlerInfo">The <see cref="DescribedHandlerInfo"/> for the handler.</param> /// <param name="handlerInfo">The <see cref="DescribedHandlerDescriptor"/> for the handler.</param>
/// <returns>A new <see cref="IHandlerContainer"/> instance.</returns> /// <returns>A new <see cref="IHandlerContainer"/> instance.</returns>
public IHandlerContainer CreateContainer(DescribedHandlerInfo handlerInfo); public IHandlerContainer CreateContainer(DescribedHandlerDescriptor handlerInfo);
} }
} }
@@ -26,7 +26,7 @@ public interface IUpdateHandlerBase : IDisposable
/// <param name="described"></param> /// <param name="described"></param>
/// <param name="cancellationToken">The cancellation token.</param> /// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<Result> Execute(DescribedHandlerInfo described, CancellationToken cancellationToken = default); Task<Result> Execute(DescribedHandlerDescriptor described, CancellationToken cancellationToken = default);
/// <summary> /// <summary>
/// Handles failed filters during handler describing. /// Handles failed filters during handler describing.
@@ -22,13 +22,13 @@ namespace Telegrator.Handlers.Components
public HandlerLifetimeToken LifetimeToken { get; } = new HandlerLifetimeToken(); public HandlerLifetimeToken LifetimeToken { get; } = new HandlerLifetimeToken();
/// <inheritdoc cref="Result.Ok"/> /// <inheritdoc cref="Result.Ok"/>
public Result Ok => Result.Ok(); public static Result Ok => Result.Ok();
/// <inheritdoc cref="Result.Fault"/> /// <inheritdoc cref="Result.Fault"/>
public Result Fault => Result.Fault(); public static Result Fault => Result.Fault();
/// <inheritdoc cref="Result.Next"/> /// <inheritdoc cref="Result.Next"/>
public Result Next => Result.Next(); public static Result Next => Result.Next();
/// <summary> /// <summary>
/// Executes the handler logic and marks the lifetime as ended after execution. /// Executes the handler logic and marks the lifetime as ended after execution.
@@ -36,7 +36,7 @@ namespace Telegrator.Handlers.Components
/// <param name="described"></param> /// <param name="described"></param>
/// <param name="cancellationToken">The cancellation token.</param> /// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task<Result> Execute(DescribedHandlerInfo described, CancellationToken cancellationToken = default) public async Task<Result> Execute(DescribedHandlerDescriptor described, CancellationToken cancellationToken = default)
{ {
if (LifetimeToken.IsEnded) if (LifetimeToken.IsEnded)
throw new Exception(); throw new Exception();
@@ -125,7 +125,7 @@ namespace Telegrator.Handlers.Components
} }
} }
internal IHandlerContainer GetContainer(DescribedHandlerInfo handlerInfo) internal IHandlerContainer GetContainer(DescribedHandlerDescriptor handlerInfo)
{ {
if (this is IHandlerContainerFactory handlerDefainedContainerFactory) if (this is IHandlerContainerFactory handlerDefainedContainerFactory)
return handlerDefainedContainerFactory.CreateContainer(handlerInfo); return handlerDefainedContainerFactory.CreateContainer(handlerInfo);
+1 -1
View File
@@ -37,7 +37,7 @@ namespace Telegrator.Handlers
/// Initializes new instance of <see cref="HandlerContainer{TUpdate}"/> /// Initializes new instance of <see cref="HandlerContainer{TUpdate}"/>
/// </summary> /// </summary>
/// <param name="handlerInfo"></param> /// <param name="handlerInfo"></param>
public HandlerContainer(DescribedHandlerInfo handlerInfo) public HandlerContainer(DescribedHandlerDescriptor handlerInfo)
{ {
ActualUpdate = handlerInfo.HandlingUpdate.GetActualUpdateObject<TUpdate>(); ActualUpdate = handlerInfo.HandlingUpdate.GetActualUpdateObject<TUpdate>();
HandlingUpdate = handlerInfo.HandlingUpdate; HandlingUpdate = handlerInfo.HandlingUpdate;
+1 -1
View File
@@ -197,7 +197,7 @@ namespace Telegrator.Handlers
int? directMessageTopicId = null, int? directMessageTopicId = null,
SuggestedPostParameters? suggestedPostParameters = null, SuggestedPostParameters? suggestedPostParameters = null,
CancellationToken cancellationToken = default) CancellationToken cancellationToken = default)
=> await Container.Response( => await Container.Responce(
text, parseMode, replyParameters, text, parseMode, replyParameters,
replyMarkup, linkPreviewOptions, replyMarkup, linkPreviewOptions,
messageThreadId, entities, messageThreadId, entities,
@@ -1,8 +1,6 @@
using Telegram.Bot; using Telegram.Bot;
using Telegram.Bot.Polling;
using Telegram.Bot.Types; using Telegram.Bot.Types;
using Telegrator.Filters.Components; using Telegrator.Filters.Components;
using Telegrator.Handlers;
using Telegrator.Handlers.Components; using Telegrator.Handlers.Components;
namespace Telegrator.MadiatorCore.Descriptors namespace Telegrator.MadiatorCore.Descriptors
@@ -10,47 +8,49 @@ namespace Telegrator.MadiatorCore.Descriptors
/// <summary> /// <summary>
/// Contains information about a described handler, including its context, client, and execution logic. /// Contains information about a described handler, including its context, client, and execution logic.
/// </summary> /// </summary>
public class DescribedHandlerInfo public class DescribedHandlerDescriptor
{ {
private readonly ManualResetEventSlim ResetEvent = new ManualResetEventSlim(false);
/// <summary> /// <summary>
/// descriptor from that handler was described from /// descriptor from that handler was described from
/// </summary> /// </summary>
public readonly HandlerDescriptor From; public HandlerDescriptor From { get; }
/// <summary> /// <summary>
/// The update router associated with this handler. /// The update router associated with this handler.
/// </summary> /// </summary>
public readonly IUpdateRouter UpdateRouter; public IUpdateRouter UpdateRouter { get; }
/// <summary> /// <summary>
/// The awaiting provider to fetch new updates inside handler /// The awaiting provider to fetch new updates inside handler
/// </summary> /// </summary>
public readonly IAwaitingProvider AwaitingProvider; public IAwaitingProvider AwaitingProvider { get; }
/// <summary> /// <summary>
/// The Telegram bot client used for this handler. /// The Telegram bot client used for this handler.
/// </summary> /// </summary>
public readonly ITelegramBotClient Client; public ITelegramBotClient Client { get; }
/// <summary> /// <summary>
/// The handler instance being described. /// The handler instance being described.
/// </summary> /// </summary>
public readonly UpdateHandlerBase HandlerInstance; public UpdateHandlerBase HandlerInstance { get; }
/// <summary> /// <summary>
/// Extra data associated with the handler execution. /// Extra data associated with the handler execution.
/// </summary> /// </summary>
public readonly Dictionary<string, object> ExtraData; public Dictionary<string, object> ExtraData { get; }
/// <summary> /// <summary>
/// List of completed filters for this handler. /// List of completed filters for this handler.
/// </summary> /// </summary>
public readonly CompletedFiltersList CompletedFilters; public CompletedFiltersList CompletedFilters { get; }
/// <summary> /// <summary>
/// The update being handled. /// The update being handled.
/// </summary> /// </summary>
public readonly Update HandlingUpdate; public Update HandlingUpdate { get; }
/// <summary> /// <summary>
/// Lifetime token for the handler instance. /// Lifetime token for the handler instance.
@@ -63,7 +63,12 @@ namespace Telegrator.MadiatorCore.Descriptors
public string DisplayString { get; set; } public string DisplayString { get; set; }
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="DescribedHandlerInfo"/> class. /// The final execution result.
/// </summary>
public Result? Result { get; private set; }
/// <summary>
/// Initializes a new instance of the <see cref="DescribedHandlerDescriptor"/> class.
/// </summary> /// </summary>
/// <param name="fromDescriptor">descriptor from that handler was described from</param> /// <param name="fromDescriptor">descriptor from that handler was described from</param>
/// <param name="awaitingProvider"></param> /// <param name="awaitingProvider"></param>
@@ -72,7 +77,14 @@ namespace Telegrator.MadiatorCore.Descriptors
/// <param name="handlerInstance">The handler instance.</param> /// <param name="handlerInstance">The handler instance.</param>
/// <param name="filterContext">The filter execution context.</param> /// <param name="filterContext">The filter execution context.</param>
/// <param name="displayString">Optional display string.</param> /// <param name="displayString">Optional display string.</param>
public DescribedHandlerInfo(HandlerDescriptor fromDescriptor, IUpdateRouter updateRouter, IAwaitingProvider awaitingProvider, ITelegramBotClient client, UpdateHandlerBase handlerInstance, FilterExecutionContext<Update> filterContext, string? displayString) public DescribedHandlerDescriptor(
HandlerDescriptor fromDescriptor,
IUpdateRouter updateRouter,
IAwaitingProvider awaitingProvider,
ITelegramBotClient client,
UpdateHandlerBase handlerInstance,
FilterExecutionContext<Update> filterContext,
string? displayString)
{ {
From = fromDescriptor; From = fromDescriptor;
UpdateRouter = updateRouter; UpdateRouter = updateRouter;
@@ -85,6 +97,22 @@ namespace Telegrator.MadiatorCore.Descriptors
DisplayString = displayString ?? fromDescriptor.HandlerType.Name; DisplayString = displayString ?? fromDescriptor.HandlerType.Name;
} }
public async Task AwaitResult(CancellationToken cancellationToken)
{
await Task.Yield();
ResetEvent.Reset();
ResetEvent.Wait(cancellationToken);
}
public void ReportResult(Result result)
{
if (result != null)
throw new InvalidOperationException("Result already reported");
Result = result;
ResetEvent.Set();
}
/// <inheritdoc/> /// <inheritdoc/>
public override string ToString() public override string ToString()
=> DisplayString ?? From.HandlerType.Name; => DisplayString ?? From.HandlerType.Name;
@@ -1,6 +1,5 @@
using Telegram.Bot; using Telegram.Bot;
using Telegram.Bot.Polling; using Telegram.Bot.Polling;
using Telegrator.Polling;
namespace Telegrator.MadiatorCore namespace Telegrator.MadiatorCore
{ {
+5 -19
View File
@@ -5,14 +5,14 @@ namespace Telegrator.MadiatorCore
/// <summary> /// <summary>
/// Represents a delegate for when a handler is enqueued. /// Represents a delegate for when a handler is enqueued.
/// </summary> /// </summary>
/// <param name="args">The <see cref="DescribedHandlerInfo"/> for the enqueued handler.</param> /// <param name="args">The <see cref="DescribedHandlerDescriptor"/> for the enqueued handler.</param>
public delegate void HandlerEnqueued(DescribedHandlerInfo args); public delegate void HandlerEnqueued(DescribedHandlerDescriptor args);
/// <summary> /// <summary>
/// Represents a delegate for when a handler is executing. /// Represents a delegate for when a handler is executing.
/// </summary> /// </summary>
/// <param name="args">The <see cref="DescribedHandlerInfo"/> for the executing handler.</param> /// <param name="args">The <see cref="DescribedHandlerDescriptor"/> for the executing handler.</param>
public delegate void HandlerExecuting(DescribedHandlerInfo args); public delegate void HandlerExecuting(DescribedHandlerDescriptor args);
/// <summary> /// <summary>
/// Provides a pool for managing the execution and queuing of update handlers. /// Provides a pool for managing the execution and queuing of update handlers.
@@ -33,20 +33,6 @@ namespace Telegrator.MadiatorCore
/// Enqueues a collection of handlers for execution. /// Enqueues a collection of handlers for execution.
/// </summary> /// </summary>
/// <param name="handlers">The handlers to enqueue.</param> /// <param name="handlers">The handlers to enqueue.</param>
public Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers); public Task Enqueue(params IEnumerable<DescribedHandlerDescriptor> handlers);
/*
/// <summary>
/// Enqueues a single handler for execution.
/// </summary>
/// <param name="handlerInfo">The handler to enqueue.</param>
public void Enqueue(DescribedHandlerInfo handlerInfo);
/// <summary>
/// Dequeues a handler using its lifetime token.
/// </summary>
/// <param name="token">The <see cref="HandlerLifetimeToken"/> of the handler to dequeue.</param>
public void Dequeue(HandlerLifetimeToken token);
*/
} }
} }
@@ -11,7 +11,7 @@ namespace Telegrator.Polling
/// </summary> /// </summary>
/// <param name="client">The Telegram bot client for making API requests.</param> /// <param name="client">The Telegram bot client for making API requests.</param>
/// <param name="options">Optional receiver options for configuring update polling behavior.</param> /// <param name="options">Optional receiver options for configuring update polling behavior.</param>
public class ReactiveUpdateReceiver(ITelegramBotClient client, ReceiverOptions? options) : IUpdateReceiver public class DefaultUpdateReceiver(ITelegramBotClient client, ReceiverOptions? options) : IUpdateReceiver
{ {
/// <summary> /// <summary>
/// Gets the receiver options for configuring update polling behavior. /// Gets the receiver options for configuring update polling behavior.
+21
View File
@@ -0,0 +1,21 @@
using Telegram.Bot.Polling;
using Telegram.Bot.Types;
namespace Telegrator.Polling;
/// <summary>
/// Requests new <see cref="Update"/>s and processes them using provided <see cref="IUpdateHandler"/> instance<
/// /summary>
public interface IUpdateReceiver
{
/// <summary>
/// Starts receiving <see cref="Update"/>s invoking <see cref="IUpdateHandler.HandleUpdateAsync"/> for each <see cref="Update"/>.
/// <para>
/// This method will block if awaited.
/// </para>
/// </summary>
/// <param name="updateHandler">The <see cref="IUpdateHandler"/> used for processing <see cref="Update"/>s</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> with which you can stop receiving</param>
/// <returns>A <see cref="Task"/> that will be completed when cancellation will be requested through <paramref name="cancellationToken"/></returns>
Task ReceiveAsync(IUpdateHandler updateHandler, CancellationToken cancellationToken = default);
}
+91 -60
View File
@@ -1,4 +1,6 @@
using Telegrator.Handlers.Components; using System.Security.Authentication.ExtendedProtection;
using System.Threading.Channels;
using Telegrator.Handlers.Components;
using Telegrator.Logging; using Telegrator.Logging;
using Telegrator.MadiatorCore; using Telegrator.MadiatorCore;
using Telegrator.MadiatorCore.Descriptors; using Telegrator.MadiatorCore.Descriptors;
@@ -14,12 +16,18 @@ namespace Telegrator.Polling
/// <summary> /// <summary>
/// Synchronization object for thread-safe operations. /// Synchronization object for thread-safe operations.
/// </summary> /// </summary>
protected object SyncObj = new object(); protected readonly object SyncObj = new object();
protected readonly Task ChannelReaderTask;
protected readonly Channel<DescribedHandlerDescriptor> ExecutionChannel;
/// <summary> /// <summary>
/// Semaphore for controlling the number of concurrently executing handlers. /// Semaphore for controlling the number of concurrently executing handlers.
/// </summary> /// </summary>
protected SemaphoreSlim ExecutingHandlersSemaphore = null!; protected readonly SemaphoreSlim? ExecutionLimiter;
protected readonly IUpdateRouter UpdateRouter;
/// <summary> /// <summary>
/// The bot configuration options. /// The bot configuration options.
@@ -45,72 +53,101 @@ namespace Telegrator.Polling
/// <summary> /// <summary>
/// Initializes a new instance of the <see cref="UpdateHandlersPool"/> class. /// Initializes a new instance of the <see cref="UpdateHandlersPool"/> class.
/// </summary> /// </summary>
/// <param name="router">The update handler that claims updates</param>
/// <param name="options">The bot configuration options.</param> /// <param name="options">The bot configuration options.</param>
/// <param name="globalCancellationToken">The global cancellation token.</param> /// <param name="globalCancellationToken">The global cancellation token.</param>
public UpdateHandlersPool(TelegratorOptions options, CancellationToken globalCancellationToken) public UpdateHandlersPool(IUpdateRouter router, TelegratorOptions options, CancellationToken globalCancellationToken)
{ {
UpdateRouter = router;
Options = options; Options = options;
GlobalCancellationToken = globalCancellationToken; GlobalCancellationToken = globalCancellationToken;
ExecutionChannel = Channel.CreateUnbounded<DescribedHandlerDescriptor>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});
if (options.MaximumParallelWorkingHandlers != null) if (options.MaximumParallelWorkingHandlers != null)
{ ExecutionLimiter = new SemaphoreSlim(options.MaximumParallelWorkingHandlers.Value);
ExecutingHandlersSemaphore = new SemaphoreSlim(options.MaximumParallelWorkingHandlers.Value);
} GlobalCancellationToken.Register(() => ExecutionChannel.Writer.Complete());
ChannelReaderTask = ReadChannel();
} }
/// <inheritdoc/> /// <inheritdoc/>
public async Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers) public async Task Enqueue(params IEnumerable<DescribedHandlerDescriptor> handlers)
{ {
Result? lastResult = null; try
foreach (DescribedHandlerInfo handlerInfo in handlers)
{ {
if (lastResult?.NextType != null) foreach (DescribedHandlerDescriptor handlerInfo in handlers)
{ {
if (lastResult.NextType != handlerInfo.From.HandlerType) if (handlerInfo.UpdateRouter != UpdateRouter)
continue; throw new InvalidOperationException("Tried to enqueue update handler info from other router.");
}
if (ExecutingHandlersSemaphore != null) await ExecutionChannel.Writer.WriteAsync(handlerInfo, GlobalCancellationToken);
}
}
catch (OperationCanceledException)
{
_ = 0xDEADBEEF;
}
}
private async Task ReadChannel()
{
try
{
await foreach (DescribedHandlerDescriptor handlerInfo in ExecutionChannel.Reader.ReadAllAsync(GlobalCancellationToken))
{ {
await ExecutingHandlersSemaphore.WaitAsync().ConfigureAwait(false); if (ExecutionLimiter != null)
} await ExecutionLimiter.WaitAsync(GlobalCancellationToken);
try // Как только слот получен, "отстреливаем" задачу в ThreadPool
// и идем на следующий круг цикла за новым обработчиком из канала.
_ = ProcessHandler(handlerInfo);
}
}
catch (ChannelClosedException)
{
// TODO: add logging
}
}
private async Task ProcessHandler(DescribedHandlerDescriptor handlerInfo)
{
try
{
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
HandlerExecuting?.Invoke(handlerInfo);
using UpdateHandlerBase instance = handlerInfo.HandlerInstance;
Task<Result> task = instance.Execute(handlerInfo);
HandlerEnqueued?.Invoke(handlerInfo);
await task.ConfigureAwait(false);
Result lastResult = task.Result;
handlerInfo.ReportResult(lastResult);
ExecutionLimiter?.Release(1);
if (lastResult.RouteNext)
{ {
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id); Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
HandlerExecuting?.Invoke(handlerInfo);
using (UpdateHandlerBase instance = handlerInfo.HandlerInstance)
{
Task<Result> task = instance.Execute(handlerInfo);
HandlerEnqueued?.Invoke(handlerInfo);
await task.ConfigureAwait(false);
lastResult = task.Result;
ExecutingHandlersSemaphore?.Release(1);
}
if (lastResult.RouteNext)
{
Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
}
} }
catch (NotImplementedException) }
{ catch (NotImplementedException)
_ = 0xBAD + 0xC0DE; {
} _ = 0xBAD + 0xC0DE;
catch (OperationCanceledException) }
{ catch (OperationCanceledException)
_ = 0xBAD + 0xC0DE; {
break; _ = 0xDEADBEEF;
} }
catch (Exception ex) catch (Exception ex)
{ {
Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex, handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id); Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex, handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
}
if (lastResult != null && !lastResult.RouteNext)
break;
} }
} }
@@ -122,15 +159,9 @@ namespace Telegrator.Polling
if (disposed) if (disposed)
return; return;
if (ExecutingHandlersSemaphore != null) // do not dispose UpdateRouter
{ ExecutionLimiter?.Dispose();
ExecutingHandlersSemaphore.Dispose();
ExecutingHandlersSemaphore = null!;
}
if (SyncObj != null)
SyncObj = null!;
GC.SuppressFinalize(this); GC.SuppressFinalize(this);
disposed = true; disposed = true;
} }
+34 -17
View File
@@ -1,4 +1,5 @@
using System.Text; using System.Runtime.InteropServices;
using System.Text;
using Telegram.Bot; using Telegram.Bot;
using Telegram.Bot.Polling; using Telegram.Bot.Polling;
using Telegram.Bot.Types; using Telegram.Bot.Types;
@@ -24,6 +25,7 @@ namespace Telegrator.Polling
private readonly IAwaitingProvider _awaitingProvider; private readonly IAwaitingProvider _awaitingProvider;
private readonly IUpdateHandlersPool _HandlersPool; private readonly IUpdateHandlersPool _HandlersPool;
private readonly ITelegramBotInfo _botInfo; private readonly ITelegramBotInfo _botInfo;
private readonly HandlerDescriptorList _handlingRoutes;
/// <inheritdoc/> /// <inheritdoc/>
public IHandlersProvider HandlersProvider => _handlersProvider; public IHandlersProvider HandlersProvider => _handlersProvider;
@@ -55,8 +57,9 @@ namespace Telegrator.Polling
_options = options; _options = options;
_handlersProvider = handlersProvider; _handlersProvider = handlersProvider;
_awaitingProvider = awaitingProvider; _awaitingProvider = awaitingProvider;
_HandlersPool = new UpdateHandlersPool(_options, _options.GlobalCancellationToken); _HandlersPool = new UpdateHandlersPool(this, _options, _options.GlobalCancellationToken);
_botInfo = botInfo; _botInfo = botInfo;
_handlingRoutes = new HandlerDescriptorList();
} }
/// <summary> /// <summary>
@@ -74,6 +77,7 @@ namespace Telegrator.Polling
_awaitingProvider = awaitingProvider; _awaitingProvider = awaitingProvider;
_HandlersPool = handlersPool; _HandlersPool = handlersPool;
_botInfo = botInfo; _botInfo = botInfo;
_handlingRoutes = new HandlerDescriptorList();
} }
/// <summary> /// <summary>
@@ -105,19 +109,32 @@ namespace Telegrator.Polling
try try
{ {
// Getting handlers in update awaiting pool Result? lastResult = null;
IEnumerable<DescribedHandlerInfo> handlers = GetHandlers(AwaitingProvider, botClient, update, cancellationToken); foreach (DescribedHandlerDescriptor handlerInfo in GetHandlers(AwaitingProvider, botClient, update, cancellationToken))
if (handlers.Any())
{ {
// Enqueuing found awiting handlers if (lastResult?.NextType != null)
await HandlersPool.Enqueue(handlers);
// Chicking if awaiting handlers has exclusive routing
if (Options.ExclusiveAwaitingHandlerRouting)
{ {
Alligator.LogTrace("Receiving Update ({0}) completed with only awaiting handlers", update.Id); if (lastResult.NextType != handlerInfo.From.HandlerType)
return; continue;
} }
// Enqueuing found awiting handlers
await HandlersPool.Enqueue(handlerInfo);
await handlerInfo.AwaitResult(cancellationToken).ConfigureAwait(false);
lastResult = handlerInfo.Result;
if (lastResult == null)
break; // Smth went horribly wrong, better to stop routing
if (lastResult != null && !lastResult.RouteNext)
break;
}
// Checking if awaiting handlers has exclusive routing
if (Options.ExclusiveAwaitingHandlerRouting)
{
Alligator.LogTrace("Receiving Update ({0}) completed with only awaiting handlers", update.Id);
return;
} }
// Queuing reagular handlers for execution // Queuing reagular handlers for execution
@@ -144,7 +161,7 @@ namespace Telegrator.Polling
/// <param name="update">The incoming Telegram update to process</param> /// <param name="update">The incoming Telegram update to process</param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns>A collection of described handler information for the update</returns> /// <returns>A collection of described handler information for the update</returns>
protected virtual IEnumerable<DescribedHandlerInfo> GetHandlers(IHandlersProvider provider, ITelegramBotClient client, Update update, CancellationToken cancellationToken = default) protected virtual IEnumerable<DescribedHandlerDescriptor> GetHandlers(IHandlersProvider provider, ITelegramBotClient client, Update update, CancellationToken cancellationToken = default)
{ {
Alligator.LogTrace("Requested handlers for UpdateType.{0}", update.Type); Alligator.LogTrace("Requested handlers for UpdateType.{0}", update.Type);
if (!provider.TryGetDescriptorList(update.Type, out HandlerDescriptorList? descriptors)) if (!provider.TryGetDescriptorList(update.Type, out HandlerDescriptorList? descriptors))
@@ -172,13 +189,13 @@ namespace Telegrator.Polling
/// <param name="update">The incoming Telegram update to process</param> /// <param name="update">The incoming Telegram update to process</param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns>A collection of described handler information</returns> /// <returns>A collection of described handler information</returns>
protected virtual IEnumerable<DescribedHandlerInfo> DescribeDescriptors(IHandlersProvider provider, HandlerDescriptorList descriptors, ITelegramBotClient client, Update update, CancellationToken cancellationToken = default) protected virtual IEnumerable<DescribedHandlerDescriptor> DescribeDescriptors(IHandlersProvider provider, HandlerDescriptorList descriptors, ITelegramBotClient client, Update update, CancellationToken cancellationToken = default)
{ {
Alligator.LogTrace("Describing descriptors of descriptorsList.HandlingType.{0} for Update ({1})", descriptors.HandlingType, update.Id); Alligator.LogTrace("Describing descriptors of descriptorsList.HandlingType.{0} for Update ({1})", descriptors.HandlingType, update.Id);
foreach (HandlerDescriptor descriptor in descriptors.Reverse()) foreach (HandlerDescriptor descriptor in descriptors.Reverse())
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
DescribedHandlerInfo? describedHandler = DescribeHandler(provider, descriptor, client, update, out bool breakRouting, cancellationToken); DescribedHandlerDescriptor? describedHandler = DescribeHandler(provider, descriptor, client, update, out bool breakRouting, cancellationToken);
if (breakRouting) if (breakRouting)
yield break; yield break;
@@ -202,7 +219,7 @@ namespace Telegrator.Polling
/// <param name="breakRouting"></param> /// <param name="breakRouting"></param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
/// <returns>The described handler info if validation passes; otherwise, null</returns> /// <returns>The described handler info if validation passes; otherwise, null</returns>
public virtual DescribedHandlerInfo? DescribeHandler(IHandlersProvider provider, HandlerDescriptor descriptor, ITelegramBotClient client, Update update, out bool breakRouting, CancellationToken cancellationToken = default) public virtual DescribedHandlerDescriptor? DescribeHandler(IHandlersProvider provider, HandlerDescriptor descriptor, ITelegramBotClient client, Update update, out bool breakRouting, CancellationToken cancellationToken = default)
{ {
breakRouting = false; breakRouting = false;
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
@@ -231,7 +248,7 @@ namespace Telegrator.Polling
} }
} }
return new DescribedHandlerInfo(descriptor, this, AwaitingProvider, client, handlerInstance, filterContext, descriptor.DisplayString); return new DescribedHandlerDescriptor(descriptor, this, AwaitingProvider, client, handlerInstance, filterContext, descriptor.DisplayString);
} }
/// <summary> /// <summary>
@@ -1,7 +1,6 @@
using System.Reflection; using System.Reflection;
using Telegram.Bot.Types.Enums; using Telegram.Bot.Types.Enums;
using Telegrator.Annotations; using Telegrator.Annotations;
using Telegrator.Attributes;
using Telegrator.Configuration; using Telegrator.Configuration;
using Telegrator.MadiatorCore; using Telegrator.MadiatorCore;
using Telegrator.MadiatorCore.Descriptors; using Telegrator.MadiatorCore.Descriptors;
@@ -1,7 +1,6 @@
using System.Reflection; using System.Reflection;
using Telegram.Bot.Types.Enums; using Telegram.Bot.Types.Enums;
using Telegrator.Annotations; using Telegrator.Annotations;
using Telegrator.Attributes;
using Telegrator.Configuration; using Telegrator.Configuration;
using Telegrator.Handlers.Components; using Telegrator.Handlers.Components;
using Telegrator.MadiatorCore; using Telegrator.MadiatorCore;
+2 -4
View File
@@ -1,9 +1,7 @@
using Telegram.Bot.Types; using Telegrator.Aspects;
using Telegrator.Aspects;
using Telegrator.Handlers.Components; using Telegrator.Handlers.Components;
using Telegrator.Filters.Components;
using Telegrator.MadiatorCore;
using Telegrator.Handlers.Diagnostics; using Telegrator.Handlers.Diagnostics;
using Telegrator.MadiatorCore;
namespace Telegrator namespace Telegrator
{ {
+2 -1
View File
@@ -27,7 +27,8 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Telegram.Bot" Version="22.6.2" /> <PackageReference Include="System.Threading.Channels" Version="10.0.3" />
<PackageReference Include="Telegram.Bot" Version="22.9.5.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
+1 -1
View File
@@ -97,7 +97,7 @@ namespace Telegrator
{ {
try try
{ {
await new ReactiveUpdateReceiver(this, receiverOptions) await new DefaultUpdateReceiver(this, receiverOptions)
.ReceiveAsync(UpdateRouter, cancellationToken) .ReceiveAsync(UpdateRouter, cancellationToken)
.ConfigureAwait(false); .ConfigureAwait(false);
} }