Laravel のキューの dispatchAfterResponse() の仕組みのまとめ

PHPLaravel

PHP のフレームワーク Laravel が提供するキューのジョブオブジェクトに dispatchAfterResponse() というメソッドがあります。

少し前にこの仕組みについて調べたのですが、ウェブ上で参考になる情報があまり見つかりませんでした。 今回はこの dispatchAfterResponse() 周りの仕組みについてかんたんにまとめておきます。

対象バージョン

  • PHP 8.0
  • Laravel 9.x / 8.x / 7.x

記事執筆時点で最新の Larvel 9.x と 8.x 7.x が対象です。 以下に取り上げるコードはすべて Laravel 9.x のものです。

Laravel のキュージョブのメソッド dispatchAfterResponse() の仕組み

Laravel は組み込みでキューの仕組みを提供しており(正確には Laravel が依存する illuminate/queue がキューの機能を提供しており)、「時間のかかる処理をキューで処理する」ということが比較的かんたんに実現できます。

キューに追加されたタスクを処理する方法として、ウェブサーバーのプロセスとは別にキューワーカーのプロセスを動かしてそちらで処理するやり方が一般的ですが、 Laravel はウェブサーバーのリクエスト処理のプロセスの中でタスクを実行してしまう機能も用意しています。 それを利用するための API が今回取り上げる dispatchAfterResponse() です。

Laravel 公式ドキュメントのサンプル:

use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

dispatchAfterResponse() 周りの仕組みについて、 Laravel 公式ドキュメントでは次のように説明されています。

Dispatching After The Response Is Sent To Browser

Alternatively, the dispatchAfterResponse method delays dispatching a job until after the HTTP response is sent to the user's browser. This will still allow the user to begin using the application even though a queued job is still executing. This should typically only be used for jobs that take about a second, such as sending an email. Since they are processed within the current HTTP request, jobs dispatched in this fashion do not require a queue worker to be running in order for them to be processed:

拙訳:

(ウェブリクエストに対する)レスポンスを返した後のディスパッチ

別の方法として、メソッド dispatchAfterResponse を使うことで、 HTTP レスポンスをユーザーのブラウザに送信した後までジョブのディスパッチを遅らせることもできます。 この方法を使った場合、キュージョブが実行されている間にもユーザーがアプリケーションを使えるようになります。 通常この方法は所要時間が 1 秒ほどのジョブ(メール送信など)にのみ使うべきです。 この手法でディスパッチされたジョブは HTTP リクエストを処理したのと同じプロセスの中で実行されるため、キューワーカーを別に動かす必要がありません。

つまり、 dispatchAfterResponse() は「クライアントからのリクエストに応えたのと同じプロセスの中で、レスポンスを返しきった後にタスクを実行できる仕組み」ということです。

私が適切なリソースを見つけられていない可能性もあるのですが、記事執筆時点で公式ドキュメントでの dispatchAfterResponse() への言及はこれだけでした。 これだけだと実際にどう動くものなのかわかるようなわからないような感じなので、関係するコードを見ながら流れを追ってみます。

1. dispatchAfterResponse() がジョブを登録する

メソッド dispatchAfterResponse() は trait \Illuminate\Foundation\Bus\Dispatchable で定義されています:

use Illuminate\Contracts\Bus\Dispatcher;

// (省略)

    /**
     * Dispatch a command to its appropriate handler after the current process.
     *
     * @return mixed
     */
    public static function dispatchAfterResponse(...$arguments)
    {
        return app(Dispatcher::class)->dispatchAfterResponse(new static(...$arguments));
    }

ここで使われている関数 app() は laravel のサービスコンテナを利用するためのヘルパーです。 app() に引数としてインタフェースを渡すと、そのインタフェースに対応したクラスのインスタンスを生成して返してくれます。

app() のデフォルトの実装は次のとおりです:

use Illuminate\Container\Container;

// (省略)

    /**
     * Get the available container instance.
     *
     * @param  string|null  $abstract
     * @param  array  $parameters
     * @return mixed|\Illuminate\Contracts\Foundation\Application
     */
    function app($abstract = null, array $parameters = [])
    {
        if (is_null($abstract)) {
            return Container::getInstance();
        }

        return Container::getInstance()->make($abstract, $parameters);
    }

ここでは app()\Illuminate\Contracts\Bus\Dispatcher::class が渡されています。 これに対応したクラスは \Illuminate\Bus\Dispatcher なので、 app(Dispatcher::class)->dispatchAfterResponse(...) では結果として \Illuminate\Bus\Dispatcher:dispatchAfterResponse() を呼び出します。

\Illuminate\Bus\Dispatcher:dispatchAfterResponse() の中身は次のとおりです:

    /**
     * Dispatch a command to its appropriate handler after the current process.
     *
     * @param  mixed  $command
     * @param  mixed  $handler
     * @return void
     */
    public function dispatchAfterResponse($command, $handler = null)
    {
        $this->container->terminating(function () use ($command, $handler) {
            $this->dispatchNow($command, $handler);
        });
    }

ここで、 $this->container\Illuminate\Foundation\Application のインスタンスを格納しています。 そのメソッド terminating() のコードは次のとおりで、プロパティ terminatingCallbacksarray )に引数で渡された $callback を追加するだけのシンプルな内容です:

    /**
     * Register a terminating callback with the application.
     *
     * @param  callable|string  $callback
     * @return $this
     */
    public function terminating($callback)
    {
        $this->terminatingCallbacks[] = $callback;

        return $this;
    }

最終的にこの terminatingCallbacks に格納された callable が 1 つずつ呼び出されるのですが、そのことは後ほど確認します。

\Illuminate\Bus\Dispatcher:dispatchAfterResponse() の中で terminating() に渡されている無名関数では dispatchNow() というメソッドが実行されるようになっています。

この dispatchNow() の中身は次のとおりです:

class Dispatcher implements QueueingDispatcher
{
    // (省略)

    /**
     * Dispatch a command to its appropriate handler in the current process without using the synchronous queue.
     *
     * @param  mixed  $command
     * @param  mixed  $handler
     * @return mixed
     */
    public function dispatchNow($command, $handler = null)
    {
        $uses = class_uses_recursive($command);

        if (in_array(InteractsWithQueue::class, $uses) &&
            in_array(Queueable::class, $uses) &&
            ! $command->job) {
            $command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
        }

        if ($handler || $handler = $this->getCommandHandler($command)) {
            $callback = function ($command) use ($handler) {
                $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';

                return $handler->{$method}($command);
            };
        } else {
            $callback = function ($command) {
                $method = method_exists($command, 'handle') ? 'handle' : '__invoke';

                return $this->container->call([$command, $method]);
            };
        }

        return $this->pipeline->send($command)->through($this->pipes)->then($callback);
    }

    // (省略)
}

少し複雑ですが、要はここでは大元の dispatchAfterResponse() のレシーバのメソッド handle()__invoke() が後々呼び出されるように $this->pipeline を使って指示がされています。 ここで、 $this->pipeline はクラス \Illuminate\Pipeline\Pipeline のインスタンスです。

\Illuminate\Pipeline\Pipeline のメソッド send() through() then() の中身は枝葉にあたるのでここでは立ち入りませんが、興味のある方はコードを確認してみてください。

ここまででジョブ登録が完了です。 続いて登録されたジョブが実行されるところを見ていきます。

2. 登録されたジョブがリクエスト処理完了後に実行される

dispatchAfterResponse() で登録されたジョブは Laravel のリクエスト処理完了後に実行されます。 具体的には、 Laravel のフロントコントローラの最後のところで実行されます。 コードは次のようになっています:

public/index.php:

$app = require_once __DIR__.'/../bootstrap/app.php';

$kernel = $app->make(Kernel::class);

$response = $kernel->handle(
    $request = Request::capture()
)->send();

$kernel->terminate($request, $response);

クライアントからのリクエストの処理はこのうち send() のところで完了してしまっていて、 $kernel->terminate() はレスポンス送信後に実行されます。 まさにこの $kernel->terminate() の中で dispatchAfterResponse() で登録されたジョブが実行されることになります。

その前の send() までのところでどんなことが行われているのかをざっくり追いかけてみます。

$kernel->handle()$kernel は通常 \Illuminate\Foundation\Http\Kernel のインスタンスです。 そのメソッド handle() はクラス \Illuminate\Http\Response のインスタンスを返します。 そして、 \Illuminate\Http\Response は Symfony のクラス \Symfony\Component\HttpFoundation\Response を継承しています:

namespace Illuminate\Http;

// (省略)
use Symfony\Component\HttpFoundation\Response as SymfonyResponse;
// (省略)

class Response extends SymfonyResponse
{
    // (省略)
}

\Illuminate\Http\Response はメソッド send() をオーバーライドしていないため、 Laravel のフロントコントローラ index.php で呼ばれる send() は Symfony の方の Response のメソッドです。 その実装は次のとおりです:

class Response
{
    // (省略)

    /**
     * Sends HTTP headers and content.
     *
     * @return $this
     */
    public function send(): static
    {
        $this->sendHeaders();
        $this->sendContent();

        if (\function_exists('fastcgi_finish_request')) {
            fastcgi_finish_request();
        } elseif (\function_exists('litespeed_finish_request')) {
            litespeed_finish_request();
        } elseif (!\in_array(\PHP_SAPI, ['cli', 'phpdbg'], true)) {
            static::closeOutputBuffers(0, true);
        }

        return $this;
    }

    // (省略)
}

ここで注目すべきは以下の部分です。

        if (\function_exists('fastcgi_finish_request')) {
            fastcgi_finish_request();

        } elseif (\function_exists('litespeed_finish_request')) {
            litespeed_finish_request();
        } elseif (!\in_array(\PHP_SAPI, ['cli', 'phpdbg'], true)) {
            static::closeOutputBuffers(0, true);
        }

ここでは実行環境次第で fastcgi_finish_request()litespeed_finish_request() が呼び出されるようになっています。

前者の fastcgi_finish_request() は、 FPM (FastCGI Process Manager) で PHP を動かしたときに利用できる関数です。 リクエストの処理が完了したことを宣言するためのもので、 fastcgi_finish_request() の呼び出し後は max_execution_time による制約がかからなくなります。

fastcgi_finish_request() については別記事に書いたので興味のある方はご覧になってみてください。

後者の litespeed_finish_request() については今回細かく追いかけていませんが、 fastcgi_finish_request() の LiteSpeed 版と捉えればよいかと思います。 LSAPI 7.3 で追加されたようです。

その後 send()$this を返すため、 Laravel のフロントコントローラ index.php の次の部分の $response\Illuminate\Http\Response のインスタンスとなります。

$response = $kernel->handle(
    $request = Request::capture()
)->send();

send() までの処理については以上です。

後は Laravel のフロントコントローラで残す処理は最後の terminate() だけです:

$kernel->terminate($request, $response);

上述のとおり dispatchAfterResponse() で登録されたジョブがここで実行されるのですが、その仕組みを以下追いかけてみます。

上でも述べましたが、この $kernel\Illuminate\Foundation\Http\Kernel のインスタンスです。 そのメソッド terminate() の中身は次のとおりです:

    /**
     * Call the terminate method on any terminable middleware.
     *
     * @param  \Illuminate\Http\Request  $request
     * @param  \Illuminate\Http\Response  $response
     * @return void
     */
    public function terminate($request, $response)
    {
        $this->terminateMiddleware($request, $response);

        $this->app->terminate();
    }

ここでは terminateMiddleware() という自身のメソッドを呼び出してあとは $this->app->terminate() を呼び出しているだけです。

terminateMiddleware() はメソッド terminate() を持つミドルウェア(いわゆる「 terminable middleware 」)を順に実行していくシンプルなものです。 本題から外れるのでこれ以上深追いはしません。 興味のある方は次のページを参照されるとよいかと思います。

本題の dispatchAfterResponse() に関わるのは $this->app->terminate() の方です。 この $this->app はフロントコントローラの次の箇所で生成された \Illuminate\Foundation\Application のインスタンスです。

$app = require_once __DIR__.'/../bootstrap/app.php';

そのメソッド terminate() の中身は次のとおりです:

    /**
     * Terminate the application.
     *
     * @return void
     */
    public function terminate()
    {
        $index = 0;

        while ($index < count($this->terminatingCallbacks)) {
            $this->call($this->terminatingCallbacks[$index]);

            $index++;
        }
    }

ここではプロパティ terminatingCallbacksarray )に格納されたコールバックが順に呼び出されています。

このプロパティ terminatingCallbacks はジョブ登録時に terminating() で操作していたものです。


    /**
     * Register a terminating callback with the application.
     *
     * @param  callable|string  $callback
     * @return $this
     */
    public function terminating($callback)
    {
        $this->terminatingCallbacks[] = $callback;

        return $this;
    }

登録されたジョブが実行されるところについては以上です。

つまり、 dispatchAfterResponse() については次のような仕組みになっているということでした。

  1. dispatchAfterResponse() から terminating() を経由して登録されたジョブ( callable )が
  2. \Illuminate\Foundation\Application のインスタンスのプロパティ terminatingCallbacks として管理され
  3. 最終的に terminate() で実行される

ということで、少し長くなりましたが Laravel のキューの dispatchAfterResponse() で登録されたジョブが実行される仕組みについてのまとめでした。


アバター
後藤隼人 ( ごとうはやと )

ソフトウェア開発やマーケティング支援などをしています。詳しくはこちら