Добавьте файлы проекта.

This commit is contained in:
2025-07-24 23:19:59 +04:00
commit 33d1f6218a
168 changed files with 15035 additions and 0 deletions
@@ -0,0 +1,85 @@
using Telegram.Bot;
using Telegram.Bot.Polling;
using Telegram.Bot.Requests;
using Telegram.Bot.Types;
namespace Telegrator.Polling
{
/// <summary>
/// Reactive implementation of <see cref="IUpdateReceiver"/> for polling updates from Telegram.
/// Provides custom update receiving logic with error handling and configuration options.
/// </summary>
/// <param name="client">The Telegram bot client for making API requests.</param>
/// <param name="options">Optional receiver options for configuring update polling behavior.</param>
public class ReactiveUpdateReceiver(ITelegramBotClient client, ReceiverOptions? options) : IUpdateReceiver
{
/// <summary>
/// Gets the receiver options for configuring update polling behavior.
/// </summary>
public readonly ReceiverOptions? Options = options;
/// <summary>
/// Gets the Telegram bot client for making API requests.
/// </summary>
public readonly ITelegramBotClient Client = client;
/// <summary>
/// Receives updates from Telegram using long polling.
/// Handles update processing, error handling, and cancellation.
/// </summary>
/// <param name="updateHandler">The update handler to process received updates.</param>
/// <param name="cancellationToken">The cancellation token to stop receiving updates.</param>
/// <returns>A task representing the asynchronous update receiving operation.</returns>
public async Task ReceiveAsync(IUpdateHandler updateHandler, CancellationToken cancellationToken)
{
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken).Token;
GetUpdatesRequest request = new GetUpdatesRequest()
{
AllowedUpdates = Options?.AllowedUpdates ?? [],
Limit = Options?.Limit.GetValueOrDefault(100),
Offset = Options?.Offset
};
if (Options?.DropPendingUpdates ?? false)
{
try
{
Update[] array = await Client.GetUpdates(-1, 1, 0, [], cancellationToken).ConfigureAwait(false);
request.Offset = array.Length != 0 ? array[^1].Id + 1 : 0;
}
catch (OperationCanceledException)
{
return;
}
}
while (!cancellationToken.IsCancellationRequested)
{
try
{
request.Timeout = (int)Client.Timeout.TotalSeconds;
foreach (Update update in await Client.SendRequest(request, cancellationToken).ConfigureAwait(false))
{
try
{
request.Offset = update.Id + 1;
await updateHandler.HandleUpdateAsync(Client, update, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
catch (Exception exception2)
{
await updateHandler.HandleErrorAsync(Client, exception2, HandleErrorSource.HandleUpdateError, cancellationToken).ConfigureAwait(false);
}
}
}
catch (OperationCanceledException)
{
return;
}
catch (Exception exception)
{
await updateHandler.HandleErrorAsync(Client, exception, HandleErrorSource.PollingError, cancellationToken).ConfigureAwait(false);
}
}
}
}
}
+238
View File
@@ -0,0 +1,238 @@
using System.Collections.Concurrent;
using Telegrator.Configuration;
using Telegrator.MadiatorCore;
using Telegrator.MadiatorCore.Descriptors;
namespace Telegrator.Polling
{
/// <summary>
/// Implementation of <see cref="IUpdateHandlersPool"/> that manages the execution of handlers.
/// Provides thread-safe queuing and execution of handlers with configurable concurrency limits.
/// </summary>
public class UpdateHandlersPool : IUpdateHandlersPool
{
/// <summary>
/// Synchronization object for thread-safe operations.
/// </summary>
protected object SyncObj = new object();
/// <summary>
/// Event that signals when awaiting handlers are queued.
/// </summary>
protected ManualResetEventSlim AwaitingHandlersQueuedEvent = null!;
/// <summary>
/// Semaphore for controlling the number of concurrently executing handlers.
/// </summary>
protected SemaphoreSlim ExecutingHandlersSemaphore = null!;
/// <summary>
/// Queue for storing awaiting handlers.
/// </summary>
protected readonly ConcurrentQueue<DescribedHandlerInfo> AwaitingHandlersQueue = [];
/// <summary>
/// Dictionary for tracking currently executing handlers.
/// </summary>
protected readonly ConcurrentDictionary<HandlerLifetimeToken, Task> ExecutingHandlersPool = [];
/// <summary>
/// The bot configuration options.
/// </summary>
protected readonly TelegramBotOptions Options;
/// <summary>
/// The global cancellation token for stopping all operations.
/// </summary>
protected readonly CancellationToken GlobalCancellationToken;
/// <summary>
/// Flag indicating whether the pool has been disposed.
/// </summary>
protected bool disposed = false;
/// <inheritdoc/>
public event HandlerEnqueued? HandlerEnqueued;
/// <inheritdoc/>
public event HandlerExecuting? HandlerExecuting;
/// <summary>
/// Initializes a new instance of the <see cref="UpdateHandlersPool"/> class.
/// </summary>
/// <param name="options">The bot configuration options.</param>
/// <param name="globalCancellationToken">The global cancellation token.</param>
public UpdateHandlersPool(TelegramBotOptions options, CancellationToken globalCancellationToken)
{
Options = options;
GlobalCancellationToken = globalCancellationToken;
if (options.MaximumParallelWorkingHandlers != null)
{
ExecutingHandlersSemaphore = new SemaphoreSlim(options.MaximumParallelWorkingHandlers ?? 0);
AwaitingHandlersQueuedEvent = new ManualResetEventSlim(false);
}
if (Options.MaximumParallelWorkingHandlers != null)
HandlersCheckpoint();
}
/// <inheritdoc/>
public void Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
{
handlers.ForEach(Enqueue);
}
/// <inheritdoc/>
public void Enqueue(DescribedHandlerInfo handlerInfo)
{
if (Options.MaximumParallelWorkingHandlers == null)
{
Task.Run(async () => await ExecuteHandlerWrapper(handlerInfo));
return;
}
lock (SyncObj)
{
AwaitingHandlersQueue.Enqueue(handlerInfo);
HandlerEnqueued?.Invoke(handlerInfo);
AwaitingHandlersQueuedEvent.Set();
}
}
/// <inheritdoc/>
public void Dequeue(HandlerLifetimeToken token)
{
if (Options.MaximumParallelWorkingHandlers == null)
return;
lock (SyncObj)
{
ExecutingHandlersPool.TryRemove(token, out _);
ExecutingHandlersSemaphore.Release(1);
}
}
/// <summary>
/// Main checkpoint method that manages handler execution in a loop.
/// Continuously processes queued handlers while respecting concurrency limits.
/// </summary>
protected virtual async void HandlersCheckpoint()
{
await Task.Yield();
while (!GlobalCancellationToken.IsCancellationRequested)
{
if (!CanEnqueueHandler())
{
await ExecutingHandlersSemaphore.WaitAsync(GlobalCancellationToken);
if (!CanEnqueueHandler())
continue;
}
if (!TryDequeueHandler(out DescribedHandlerInfo? enqueuedHandler))
{
AwaitingHandlersQueuedEvent.Reset();
AwaitingHandlersQueuedEvent.Wait(GlobalCancellationToken);
if (!TryDequeueHandler(out enqueuedHandler))
continue;
}
if (enqueuedHandler == null)
continue;
ExecuteHandler(enqueuedHandler);
}
}
/// <summary>
/// Executes a handler by creating a lifetime token and tracking the execution.
/// </summary>
/// <param name="enqueuedHandler">The handler to execute.</param>
protected virtual void ExecuteHandler(DescribedHandlerInfo enqueuedHandler)
{
HandlerLifetimeToken lifetimeToken = enqueuedHandler.HandlerLifetime;
lifetimeToken.OnLifetimeEnded += Dequeue;
Task executingHandler = ExecuteHandlerWrapper(enqueuedHandler);
lock (SyncObj)
ExecutingHandlersPool.TryAdd(lifetimeToken, executingHandler);
HandlerExecuting?.Invoke(enqueuedHandler);
}
/// <summary>
/// Wrapper method that executes a handler and handles exceptions.
/// </summary>
/// <param name="enqueuedHandler">The handler to execute.</param>
/// <returns>A task representing the asynchronous execution.</returns>
/// <exception cref="HandlerFaultedException">Thrown when the handler execution fails.</exception>
protected virtual async Task ExecuteHandlerWrapper(DescribedHandlerInfo enqueuedHandler)
{
try
{
await enqueuedHandler.Execute(GlobalCancellationToken);
}
catch (OperationCanceledException)
{
return;
}
catch (Exception ex)
{
throw new HandlerFaultedException(enqueuedHandler, ex);
}
}
/// <summary>
/// Checks if a new handler can be enqueued based on the current execution count.
/// </summary>
/// <returns>True if a new handler can be enqueued; otherwise, false.</returns>
protected virtual bool CanEnqueueHandler()
{
lock (SyncObj)
{
return ExecutingHandlersPool.Count < Options.MaximumParallelWorkingHandlers;
}
}
/// <summary>
/// Attempts to dequeue a handler from the awaiting queue.
/// </summary>
/// <param name="enqueuedHandler">The dequeued handler, if successful.</param>
/// <returns>True if a handler was successfully dequeued; otherwise, false.</returns>
protected virtual bool TryDequeueHandler(out DescribedHandlerInfo? enqueuedHandler)
{
lock (SyncObj)
{
return AwaitingHandlersQueue.TryDequeue(out enqueuedHandler);
}
}
/// <summary>
/// Disposes of the handlers pool and releases all resources.
/// </summary>
public virtual void Dispose()
{
if (disposed)
return;
if (ExecutingHandlersSemaphore != null)
{
ExecutingHandlersSemaphore.Dispose();
ExecutingHandlersSemaphore = null!;
}
if (AwaitingHandlersQueuedEvent != null)
{
AwaitingHandlersQueuedEvent.Dispose();
AwaitingHandlersQueuedEvent = null!;
}
if (SyncObj != null)
SyncObj = null!;
GC.SuppressFinalize(this);
disposed = true;
}
}
}
+125
View File
@@ -0,0 +1,125 @@
using Telegram.Bot;
using Telegram.Bot.Polling;
using Telegram.Bot.Types;
using Telegrator.Polling;
using Telegrator.Configuration;
using Telegrator.Handlers.Components;
using Telegrator.MadiatorCore;
using Telegrator.MadiatorCore.Descriptors;
namespace Telegrator.Polling
{
/// <summary>
/// Implementation of <see cref="IUpdateRouter"/> that routes updates to appropriate handlers.
/// Manages the distribution of updates between regular handlers and awaiting handlers.
/// </summary>
public class UpdateRouter : IUpdateRouter
{
/// <summary>
/// The bot configuration options.
/// </summary>
private readonly TelegramBotOptions _options;
/// <summary>
/// The provider for regular handlers.
/// </summary>
private readonly IHandlersProvider _handlersProvider;
/// <summary>
/// The provider for awaiting handlers.
/// </summary>
private readonly IAwaitingProvider _awaitingProvider;
/// <summary>
/// The pool for managing handler execution.
/// </summary>
private readonly IUpdateHandlersPool _HandlersPool;
/// <inheritdoc/>
public IHandlersProvider HandlersProvider => _handlersProvider;
/// <inheritdoc/>
public IAwaitingProvider AwaitingProvider => _awaitingProvider;
/// <inheritdoc/>
public TelegramBotOptions Options => _options;
/// <inheritdoc/>
public IUpdateHandlersPool HandlersPool => _HandlersPool;
/// <inheritdoc/>
public IRouterExceptionHandler? ExceptionHandler { get; set; }
/// <inheritdoc/>
public IHandlerContainerFactory? DefaultContainerFactory { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="UpdateRouter"/> class.
/// </summary>
/// <param name="handlersProvider">The provider for regular handlers.</param>
/// <param name="awaitingProvider">The provider for awaiting handlers.</param>
/// <param name="options">The bot configuration options.</param>
public UpdateRouter(IHandlersProvider handlersProvider, IAwaitingProvider awaitingProvider, TelegramBotOptions options)
{
_options = options;
_handlersProvider = handlersProvider;
_awaitingProvider = awaitingProvider;
_HandlersPool = new UpdateHandlersPool(_options, _options.GlobalCancellationToken);
}
/// <summary>
/// Initializes a new instance of the <see cref="UpdateRouter"/> class with a custom handlers pool.
/// </summary>
/// <param name="handlersProvider">The provider for regular handlers.</param>
/// <param name="awaitingProvider">The provider for awaiting handlers.</param>
/// <param name="options">The bot configuration options.</param>
/// <param name="handlersPool">The custom handlers pool to use.</param>
public UpdateRouter(IHandlersProvider handlersProvider, IAwaitingProvider awaitingProvider, TelegramBotOptions options, IUpdateHandlersPool handlersPool)
{
_options = options;
_handlersProvider = handlersProvider;
_awaitingProvider = awaitingProvider;
_HandlersPool = handlersPool;
}
/// <summary>
/// Handles errors that occur during update processing.
/// </summary>
/// <param name="botClient">The Telegram bot client.</param>
/// <param name="exception">The exception that occurred.</param>
/// <param name="source">The source of the error.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous error handling operation.</returns>
public virtual Task HandleErrorAsync(ITelegramBotClient botClient, Exception exception, HandleErrorSource source, CancellationToken cancellationToken)
{
ExceptionHandler?.HandleException(botClient, exception, source, cancellationToken);
return Task.CompletedTask;
}
/// <summary>
/// Handles incoming updates by routing them to appropriate handlers.
/// </summary>
/// <param name="botClient">The Telegram bot client.</param>
/// <param name="update">The update to handle.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task representing the asynchronous update handling operation.</returns>
public virtual Task HandleUpdateAsync(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
{
// Queuing handlers for execution
foreach (DescribedHandlerInfo handler in GetHandlers(botClient, update, cancellationToken))
HandlersPool.Enqueue(handler);
return Task.CompletedTask;
}
private IEnumerable<DescribedHandlerInfo> GetHandlers(ITelegramBotClient botClient, Update update, CancellationToken cancellationToken)
{
// Getting handlers in update awaiting pool
IEnumerable<DescribedHandlerInfo> handlers = AwaitingProvider.GetHandlers(this, botClient, update, cancellationToken);
if (handlers.Any() && Options.ExclusiveAwaitingHandlerRouting)
return handlers;
return handlers.Concat(HandlersProvider.GetHandlers(this, botClient, update, cancellationToken));
}
}
}