WIP: Make the UpdateRecevier not wait for each handler #4
@@ -1,4 +1,4 @@
|
|||||||
using Telegram.Bot;
|
using Telegram.Bot;
|
||||||
using Telegram.Bot.Polling;
|
using Telegram.Bot.Polling;
|
||||||
using Telegram.Bot.Requests;
|
using Telegram.Bot.Requests;
|
||||||
using Telegram.Bot.Types;
|
using Telegram.Bot.Types;
|
||||||
@@ -7,7 +7,7 @@ namespace Telegrator.Polling
|
|||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Reactive implementation of <see cref="IUpdateReceiver"/> for polling updates from Telegram.
|
/// Reactive implementation of <see cref="IUpdateReceiver"/> for polling updates from Telegram.
|
||||||
/// Provides custom update receiving logic with error handling and configuration options.
|
/// /// Provides custom update receiving logic with error handling and configuration options.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="client">The Telegram bot client for making API requests.</param>
|
/// <param name="client">The Telegram bot client for making API requests.</param>
|
||||||
/// <param name="options">Optional receiver options for configuring update polling behavior.</param>
|
/// <param name="options">Optional receiver options for configuring update polling behavior.</param>
|
||||||
@@ -67,7 +67,8 @@ namespace Telegrator.Polling
|
|||||||
}
|
}
|
||||||
catch (Exception exception2)
|
catch (Exception exception2)
|
||||||
{
|
{
|
||||||
await updateHandler.HandleErrorAsync(Client, exception2, HandleErrorSource.HandleUpdateError, cancellationToken).ConfigureAwait(false);
|
await updateHandler.HandleErrorAsync(Client, exception2,
|
||||||
|
HandleErrorSource.HandleUpdateError, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -61,57 +61,72 @@ namespace Telegrator.Polling
|
|||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public async Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
|
public async Task Enqueue(IEnumerable<DescribedHandlerInfo> handlers)
|
||||||
{
|
{
|
||||||
Result? lastResult = null;
|
if (ExecutingHandlersSemaphore != null)
|
||||||
foreach (DescribedHandlerInfo handlerInfo in handlers)
|
|
||||||
{
|
{
|
||||||
if (lastResult?.NextType != null)
|
await ExecutingHandlersSemaphore.WaitAsync().ConfigureAwait(false);
|
||||||
{
|
}
|
||||||
if (lastResult.NextType != handlerInfo.From.HandlerType)
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ExecutingHandlersSemaphore != null)
|
|
||||||
{
|
|
||||||
await ExecutingHandlersSemaphore.WaitAsync().ConfigureAwait(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Offload the entire processing of this update's handlers to a background task.
|
||||||
|
// This allows the Receiver to continue polling for NEW updates immediately,
|
||||||
|
// while this update acts as a self-contained unit of work.
|
||||||
|
_ = Task.Run(async () =>
|
||||||
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
|
Result? lastResult = null;
|
||||||
HandlerExecuting?.Invoke(handlerInfo);
|
foreach (DescribedHandlerInfo handlerInfo in handlers)
|
||||||
|
|
||||||
using (UpdateHandlerBase instance = handlerInfo.HandlerInstance)
|
|
||||||
{
|
{
|
||||||
Task<Result> task = instance.Execute(handlerInfo);
|
if (lastResult?.NextType != null)
|
||||||
HandlerEnqueued?.Invoke(handlerInfo);
|
{
|
||||||
|
if (lastResult.NextType != handlerInfo.From.HandlerType)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
await task.ConfigureAwait(false);
|
try
|
||||||
lastResult = task.Result;
|
{
|
||||||
ExecutingHandlersSemaphore?.Release(1);
|
Alligator.LogDebug("Described handler '{0}' (Update {1})", handlerInfo.DisplayString,
|
||||||
}
|
handlerInfo.HandlingUpdate.Id);
|
||||||
|
HandlerExecuting?.Invoke(handlerInfo);
|
||||||
|
|
||||||
if (lastResult.RouteNext)
|
using (UpdateHandlerBase instance = handlerInfo.HandlerInstance)
|
||||||
{
|
{
|
||||||
Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})", handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
|
Task<Result> task = instance.Execute(handlerInfo);
|
||||||
|
HandlerEnqueued?.Invoke(handlerInfo);
|
||||||
|
|
||||||
|
await task.ConfigureAwait(false);
|
||||||
|
lastResult = task.Result;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastResult.RouteNext)
|
||||||
|
{
|
||||||
|
Alligator.LogTrace("Handler '{0}' requested route continuation (Update {1})",
|
||||||
|
handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (NotImplementedException)
|
||||||
|
{
|
||||||
|
_ = 0xBAD + 0xC0DE;
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
_ = 0xBAD + 0xC0DE;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex,
|
||||||
|
handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (lastResult != null && !lastResult.RouteNext)
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (NotImplementedException)
|
finally
|
||||||
{
|
{
|
||||||
_ = 0xBAD + 0xC0DE;
|
ExecutingHandlersSemaphore?.Release(1);
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException)
|
}, GlobalCancellationToken);
|
||||||
{
|
|
||||||
_ = 0xBAD + 0xC0DE;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
Alligator.LogError("Failed to process handler '{0}' (Update {1})", exception: ex, handlerInfo.DisplayString, handlerInfo.HandlingUpdate.Id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lastResult != null && !lastResult.RouteNext)
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
Reference in New Issue
Block a user