WIP: Make the UpdateRecevier not wait for each handler #4

Closed
XSilverTH wants to merge 3 commits from parallelhandler into master
2 changed files with 59 additions and 44 deletions
Showing only changes of commit de397b95f6 - Show all commits
+4 -4
View File
@@ -1,4 +1,4 @@
using Telegram.Bot;
using Telegram.Bot;
using Telegram.Bot.Polling;
using Telegram.Bot.Requests;
using Telegram.Bot.Types;
@@ -6,12 +6,12 @@ using Telegram.Bot.Types;
namespace Telegrator.Polling
{
/// <summary>
/// Reactive implementation of <see cref="IUpdateReceiver"/> for polling updates from Telegram.
/// Reactive update receiver for polling updates from Telegram.
/// Provides custom update receiving logic with error handling and configuration options.
/// </summary>
/// <param name="client">The Telegram bot client for making API requests.</param>
/// <param name="options">Optional receiver options for configuring update polling behavior.</param>
public class ReactiveUpdateReceiver(ITelegramBotClient client, ReceiverOptions? options) : IUpdateReceiver
public class ReactiveUpdateReceiver(ITelegramBotClient client, ReceiverOptions? options)
{
/// <summary>
/// Gets the receiver options for configuring update polling behavior.
@@ -63,7 +63,7 @@ namespace Telegrator.Polling
try
{
request.Offset = update.Id + 1;
_ = updateHandler.HandleUpdateAsync(Client, update, cancellationToken);
await updateHandler.HandleUpdateAsync(Client, update, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
}
catch (Exception exception2)
{
+55 -40
View File
@@ -61,57 +61,72 @@ namespace Telegrator.Polling
/// <inheritdoc/>
public async Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
{
Result? lastResult = null;
foreach (DescribedHandlerInfo handlerInfo in handlers)
if (ExecutingHandlersSemaphore != null)
{
if (lastResult?.NextType != null)
{
if (lastResult.NextType != handlerInfo.From.HandlerType)
continue;
}
if (ExecutingHandlersSemaphore != null)
{
await ExecutingHandlersSemaphore.WaitAsync().ConfigureAwait(false);
}
await ExecutingHandlersSemaphore.WaitAsync().ConfigureAwait(false);
}
// Offload the entire processing of this update's handlers to a background task.
// This allows the Receiver to continue polling for NEW updates immediately,
// while this update acts as a self-contained unit of work.
_ = Task.Run(async () =>
{
try
{
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
HandlerExecuting?.Invoke(handlerInfo);
using (UpdateHandlerBase instance = handlerInfo.HandlerInstance)
Result? lastResult = null;
foreach (DescribedHandlerInfo handlerInfo in handlers)
{
Task<Result> task = instance.Execute(handlerInfo);
HandlerEnqueued?.Invoke(handlerInfo);
if (lastResult?.NextType != null)
{
if (lastResult.NextType != handlerInfo.From.HandlerType)
continue;
}
await task.ConfigureAwait(false);
lastResult = task.Result;
ExecutingHandlersSemaphore?.Release(1);
}
try
{
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString,
handlerInfo.HandlingUpdate.Id);
HandlerExecuting?.Invoke(handlerInfo);
if (lastResult.RouteNext)
{
Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
using (UpdateHandlerBase instance = handlerInfo.HandlerInstance)
{
Task<Result> task = instance.Execute(handlerInfo);
HandlerEnqueued?.Invoke(handlerInfo);
await task.ConfigureAwait(false);
lastResult = task.Result;
}
if (lastResult.RouteNext)
{
Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})",
handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
}
}
catch (NotImplementedException)
{
_ = 0xBAD + 0xC0DE;
}
catch (OperationCanceledException)
{
_ = 0xBAD + 0xC0DE;
break;
}
catch (Exception ex)
{
Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex,
handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
}
if (lastResult != null && !lastResult.RouteNext)
break;
}
}
catch (NotImplementedException)
finally
{
_ = 0xBAD + 0xC0DE;
ExecutingHandlersSemaphore?.Release(1);
}
catch (OperationCanceledException)
{
_ = 0xBAD + 0xC0DE;
break;
}
catch (Exception ex)
{
Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex, handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
}
if (lastResult != null && !lastResult.RouteNext)
break;
}
}, GlobalCancellationToken);
}
/// <summary>