ASP.NET Core 7でFIFO方式のキューでバックグラウンド処理を実行する

2023年2月28日火曜日

ASP.NET C#

t f B! P L

ASP.NET Coreで、処理時間が長いバックグラウンド処理を手軽に実装する場合は、BackgroundService 基底クラスを継承したサービスクラスを作る。この BackgroundService を使用することで、フロント側のリクエストは即完了しつつ、時間がかかる処理を引き続きバックグランド側で実行できる。

そしてこの記事では、キューを使って、FIFO(先出し先入)方式でジョブの順序を管理し、BackgroundService で順にバックグラウンドタスクを実行する方法を紹介する。

基本は、以下の MS公式の引用ですが、多重度を指定してバックグラウンドタスクを実行するなど、一部カスタマイズしてみます。
ASP.NET Core でホステッド サービスを使用するバックグラウンド タスク

キューを管理するクラス

まず、キューを管理する IBackgroundTaskQueue インターフェイスおよび BackgroundTaskQueue を作成する。このクラスは QueueBackgroundWorkItemAsync でキュー(System.Threading.Channels.Channel)に登録し、DequeueAsync でキューを取り出す。

using System.Threading.Channels;

public interface IBackgroundTaskQueue
{
    /// <summary>
    /// キューイング
    /// </summary>
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    /// <summary>
    /// デキュー
    /// </summary>
    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    /// <summary>
    /// コンストラクタ
    /// </summary>
    public BackgroundTaskQueue(int capacity)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    /// <summary>
    /// キューイング
    /// </summary>
    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        await _queue.Writer.WriteAsync(workItem);
    }

    /// <summary>
    /// デキュー(キューが登録されるまで待機するパターン)
    /// </summary>
    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);
        return workItem;
    }
}

デキュー後、バックグラウンド タスクを起動するクラス

先ほど作成した IBackgroundTaskQueue からデキューを行い、バックグラウンド タスクを起動する QueuedHostedService を作成する。

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILogger<QueuedHostedService> logger)
    {
        _taskQueue = taskQueue;
        _logger = logger;
    }

    public IBackgroundTaskQueue _taskQueue { get; }


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        const int maxConcurrency = 3; //最大同時実行数
        
        _logger.LogInformation(
            $"Queued Hosted Service is running.{Environment.NewLine}" +
            $"{Environment.NewLine}Tap W to add a work item to the " +
            $"background queue.{Environment.NewLine}");

        //最大同時実行数のTaskを作成し、並列でタスクを実行できるようにする
        IEnumerable<Task> tasks = Enumerable.Range(0, maxConcurrency).Select(async n => 
            await BackgroundProcessing(stoppingToken)
        );
        await Task.WhenAll(tasks);
    }

    private async Task BackgroundProcessing(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = await _taskQueue.DequeueAsync(stoppingToken);
            try
            {
                 await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

この QueuedHostedService は、以下のような特徴を持っています。

  • バックグラウンドタスクの最大同時実行数が指定可能。(上の maxConcurrency 変数)
  • ホストアプリケーションが停止するまで、バックグラウンドで処理を実行し続けます。
  • ホストアプリケーションが再起動すると、QueuedHostedServiceは自動的に再開されます。
  • キューに格納されたタスクは、FIFO(先入れ先出しの方式)で実行されます。
  • キューに格納できるタスクの数には制限があり、空きができるまでキューの格納は待ちになる

また、以下の処理は実装されていません。

  • バックグラウンド処理で例外が発生した時のリトライ

Program.cs にサービスを登録

Program.csに上で作成した2のクラスを登録する。

builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(ctx =>
{
    //if (!int.TryParse(hostContext.Configuration["QueueCapacity"], out var queueCapacity))
    int queueCapacity = 100;   //キューに格納できるタスクの数の上限
    return new BackgroundTaskQueue(
        queueCapacity);
});

キューを登録してバックグラウンドタスクを実行する

最後に、キューを登録してバックグラウンドタスクを実行する起動側の処理を実装する。

上で作成したIBackgroundTaskQueue をDI(依存注入)して、QueueBackgroundWorkItemAsync メソッドにバックグラウンドで実行したい処理を渡す。

public class HomeController : Controller
{
    private readonly ILogger<HomeController> _logger;

    private readonly IBackgroundTaskQueue _queue;

    public HomeController(ILogger<HomeController> logger,
        IBackgroundTaskQueue queue)
    {
        _logger = logger;
        _queue = queue;    //IBackgroundTaskQueueの依存性を注入
    }

    public IActionResult Index() {
        //キューの登録
        //引数にバックグラウンドで行う長い処理を渡す
        _queue.QueueBackgroundWorkItemAsync(async (CancellationToken token) => {
            await Task.Delay(10000);  //何らかの長い処理
        });
        return View("Index");
    }
}

まとめ

ASP.NET Core 7でFIFO方式のキューでバックグラウンド処理を実行する方法を紹介した。

本格的にやるなら、専用のワーカープロセスや、Azureなどのキューサービスを使って実装した方がいいと思うが、Webサーバー以外にリソースが準備出来ない場合や、たいしたバックグラウンドの処理がない場合は今回紹介した方法が便利だろう。

スポンサーリンク
スポンサーリンク

このブログを検索

Profile

自分の写真
Webアプリエンジニア。 日々新しい技術を追い求めてブログでアウトプットしています。
プロフィール画像は、猫村ゆゆこ様に書いてもらいました。

仕事募集もしていたり、していなかったり。

QooQ