diff --git a/README.md b/README.md index 8745eb2b..ca3fffd6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ An extension for running tasks asynchronously via queues. ## Requirements - PHP 8.1 or higher. +- PCNTL extension for signal handling _(optional, recommended for production use)_. ## Installation @@ -28,134 +29,125 @@ The package could be installed with [Composer](https://getcomposer.org): composer require yiisoft/queue ``` -## Ready for Yii Config +## Quick Start -If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults -in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should -change to start working with the queue: +### 1. Install an adapter -- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation. -- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used - with the [queue provider](#different-queue-channels). -- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`. -- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). +For production use, you should install an adapter package that matches your message broker ([AMQP](https://github.com/yiisoft/queue-amqp), [Kafka](https://github.com/g41797/queue-kafka), [NATS](https://github.com/g41797/queue-nats), and [others](docs/guide/en/adapter-list.md)). +See the [adapter list](docs/guide/en/adapter-list.md) and follow the adapter-specific documentation. -## Differences to yii2-queue +> For development and testing, you can start without an external broker using the built-in [`SynchronousAdapter`](docs/guide/en/adapter-sync.md). +> This adapter processes messages immediately in the same process, so it won't provide true async execution, +> but it's useful for getting started and writing tests. -If you have experience with `yiisoft/yii2-queue`, you will find out that this package is similar. -Though, there are some key differences that are described in the "[migrating from yii2-queue](docs/guide/migrating-from-yii2-queue.md)" -article. +### 2. Configure the queue -## General usage +#### Configuration with [yiisoft/config](https://github.com/yiisoft/config) -Each queue task consists of two parts: +**If you use [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api)** -1. A message is a class implementing `MessageInterface`. For simple cases you can use the default implementation, - `Yiisoft\Queue\Message\Message`. For more complex cases, you should implement the interface by your own. -2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message. +Add queue configuration to your application `$params` config. In [yiisoft/app](https://github.com/yiisoft/app)/[yiisoft/app-api](https://github.com/yiisoft/app-api) templates it's typically the `config/params.php` file. +_If your project structure differs, put it into any params config file that is loaded by [yiisoft/config](https://github.com/yiisoft/config)._ -For example, if you need to download and save a file, your message creation may look like the following: -- Message handler as the first parameter -- Message data as the second parameter +Minimal configuration example: ```php -$data = [ - 'url' => $url, - 'destinationFile' => $filename, +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], ]; -$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data); ``` -Then you should push it to the queue: +[Advanced configuration with `yiisoft/config`](docs/guide/en/configuration-with-config.md) -```php -$queue->push($message); -``` +#### Manual configuration + +For setting up all classes manually, see the [Manual configuration](docs/guide/en/configuration-manual.md) guide. + +### 3. Prepare a handler -Its handler may look like the following: +You need to create a handler class that will process the queue messages. The most simple way is to implement the `HandleInterface`. Let's create an example for remote file processing: ```php -class FileDownloader +use Yiisoft\Queue\Handler\HandleInterface; +use Yiisoft\Queue\Message\MessageInterface; + +final readonly class RemoteFileHandler implements HandleInterface { private string $absolutePath; - public function __construct(string $absolutePath) - { - $this->absolutePath = $absolutePath; - } + // These dependencies will be resolved on handler creation by the DI container + public function __construct( + private FileDownloader $downloader, + private FileProcessor $processor, + ) {} - public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void + // Every received message will be processed by this method + public function handle(MessageInterface $downloadMessage): void { $fileName = $downloadMessage->getData()['destinationFile']; - $path = "$this->absolutePath/$fileName"; - file_put_contents($path, file_get_contents($downloadMessage->getData()['url'])); + $localPath = $this->downloader->download($fileName); + $this->processor->process($localPath); } } ``` -The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`: +### 4. Send (produce/push) a message to a queue -```php -$worker = new \Yiisoft\Queue\Worker\Worker( - [], - $logger, - $injector, - $container -); -``` - -There is a way to run all the messages that are already in the queue, and then exit: +To send a message to the queue, you need to get the queue instance and call the `push()` method. Typically, with Yii Framework you'll get a `Queue` instance as a dependency of a service. ```php -$queue->run(); // this will execute all the existing messages -$queue->run(10); // while this will execute only 10 messages as a maximum before exit -``` - -If you don't want your script to exit immediately, you can use the `listen` method: -```php -$queue->listen(); +final readonly class Foo { + public function __construct(private QueueInterface $queue) {} + + public function bar(): void + { + $this->queue->push(new Message( + // The first parameter is the handler name that will process this concrete message + RemoteFileHandler::class, + // The second parameter is the data that will be passed to the handler. + // It should be serializable to JSON format + ['destinationFile' => 'https://example.com/file-path.csv'], + )); + } +} ``` -You can also check the status of a pushed message (the queue adapter you are using must support this feature): +### 5. Handle queue messages -```php -$queue->push($message); -$id = $message->getId(); +By default, Yii Framework uses [yiisoft/yii-console](https://github.com/yiisoft/yii-console) to run CLI commands. If you installed [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api), you can run the queue worker with on of these two commands: -// Get status of the job -$status = $queue->status($id); +```bash +./yii queue:run # Handle all existing messages in the queue +./yii queue:listen # Start a daemon listening for new messages permanently +``` -// Check whether the job is waiting for execution. -$status->isWaiting(); +> In case you're using the `SynchronosAdapter` for development purposes, you should not use these commands, as you have no asynchronous processing available. The messages are processed immediately when pushed. -// Check whether a worker got the job from the queue and executes it. -$status->isReserved(); -// Check whether a worker has executed the job. -$status->isDone(); -``` ## Custom handler names -### Custom handler names By default, when you push a message to the queue, the message handler name is the fully qualified class name of the handler. This can be useful for most cases, but sometimes you may want to use a shorter name or arbitrary string as the handler name. This can be useful when you want to reduce the amount of data being passed or when you communicate with external systems. To use a custom handler name before message push, you can pass it as the first argument `Message` when creating it: + ```php new Message('handler-name', $data); ``` To use a custom handler name on message consumption, you should configure handler mapping for the `Worker` class: + ```php -$worker = new \Yiisoft\Queue\Worker\Worker( - ['handler-name' => FooHandler::class], - $logger, - $injector, - $container -); +$params['yiisoft/queue']['handlers'] = [ + 'handler-name' => FooHandler::class, +]; ``` ## Different queue channels @@ -168,7 +160,17 @@ channel-specific `Queue` creation is as simple as $queue = $provider->get('channel-name'); ``` -Out of the box, there are four implementations of the `QueueProviderInterface`: +You can also check if a channel exists before trying to get it: + +```php +if ($provider->has('channel-name')) { + $queue = $provider->get('channel-name'); +} +``` + +`QueueProviderInterface::get()` may throw `ChannelNotFoundException`, `InvalidQueueConfigException` or `QueueProviderException`. + +Out of the box, there are three implementations of the `QueueProviderInterface`: - `AdapterFactoryQueueProvider` - `PrototypeQueueProvider` @@ -210,26 +212,43 @@ order they are passed to the constructor. The first queue found will be returned ## Console execution -The exact way of task execution depends on the adapter used. Most adapters can be run using -console commands, which the component automatically registers in your application. +This package provides queue abstractions and includes a `SynchronousAdapter` for development and test environments. +To run a real queue backend, install one of the adapter packages listed in the [guide](docs/guide/en/adapter-list.md). + +The exact way of task execution depends on the adapter used. Most adapters can be run using console commands. +If you are using [yiisoft/config](https://github.com/yiisoft/config) with [yiisoft/yii-console](https://github.com/yiisoft/yii-console), the component automatically registers the commands. The following command obtains and executes tasks in a loop until the queue is empty: ```sh -yii queue:run +yii queue:run [channel1 [channel2 [...]]] --maximum=100 ``` The following command launches a daemon which infinitely queries the queue: ```sh -yii queue:listen +yii queue:listen [channel] ``` +The following command iterates through multiple channels and is meant to be used in development environment only: + +```sh +yii queue:listen:all [channel1 [channel2 [...]]] --pause=1 --maximum=0 +``` + +For long-running processes, graceful shutdown is controlled by `LoopInterface`. When `ext-pcntl` is available, +the default `SignalLoop` handles signals such as `SIGTERM`/`SIGINT`. + See the documentation for more details about adapter specific console commands and their options. The component can also track the status of a job which was pushed into queue. +For more details, see [Job status](docs/guide/en/job-status.md). + +## Debugging -For more details, see [the guide](docs/guide/en/README.md). +If you use [yiisoft/yii-debug](https://github.com/yiisoft/yii-debug), the package provides a `QueueCollector` that can +collect message pushes, `status()` calls and message processing by the worker. The defaults are already present in +[`config/params.php`](config/params.php). ## Middleware pipelines @@ -243,7 +262,6 @@ You can use any of these formats to define a middleware: - A ready-to-use middleware object: `new FooMiddleware()`. It must implement `MiddlewarePushInterface`, `MiddlewareConsumeInterface` or `MiddlewareFailureInterface` depending on the place you use it. - An array in the format of [yiisoft/definitions](https://github.com/yiisoft/definitions). - **Only if you use yiisoft/definitions and yiisoft/di**. - A `callable`: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the [yiisoft/injector](https://github.com/yiisoft/injector), so all the dependencies of your callable will be resolved. - A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class` @@ -284,8 +302,8 @@ You have three places to define push middlewares: 1. `PushMiddlewareDispatcher`. You can pass it either to the constructor, or to the `withMiddlewares()` method, which creates a completely new dispatcher object with only those middlewares, which are passed as arguments. -If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-push` key of the -`yiisoft/queue` array in the `params`. +If you use [yiisoft/config](https://github.com/yiisoft/config), you can add middleware to the `middlewares-push` key of the +[`yiisoft/queue`](https://github.com/yiisoft/queue) array in the `params`. 2. Pass middlewares to either `Queue::withMiddlewares()` or `Queue::withMiddlewaresAdded()` methods. The difference is that the former will completely replace an existing middleware stack, while the latter will add passed middlewares to the end of the existing stack. These middlewares will be executed after the common ones, passed directly to the @@ -298,16 +316,16 @@ along with them. ### Consume pipeline -You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In a pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Except push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-consume` key of the `yiisoft/queue` array in the `params`. +You can set a middleware pipeline for a message when it will be consumed from a queue server. This is useful to collect metrics, modify message data, etc. In a pair with a Push middleware you can deduplicate messages in the queue, calculate time from push to consume, handle errors (push to a queue again, redirect failed message to another queue, send a notification, etc.). Except push pipeline, you have only one place to define the middleware stack: in the `ConsumeMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](https://github.com/yiisoft/config), you can add middleware to the `middlewares-consume` key of the [`yiisoft/queue`](https://github.com/yiisoft/queue) array in the `params`. ### Error handling pipeline -Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with a Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines: +Often when some job is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in [yiisoft/queue](https://github.com/yiisoft/queue) with a Failure middleware pipeline. They are triggered each time message processing via the Consume middleware pipeline is interrupted with any `Throwable`. The key differences from the previous two pipelines: - You should set up the middleware pipeline separately for each queue channel. That means, the format should be `['channel-name' => [FooMiddleware::class]]` instead of `[FooMiddleware::class]`, like for the other two pipelines. There is also a default key, which will be used for those channels without their own one: `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`. - The last middleware will throw the exception, which will come with the `FailureHandlingRequest` object. If you don't want the exception to be thrown, your middlewares should `return` a request without calling `$handler->handleFailure()`. -You can declare error handling a middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](yiisoft/config), you can add middleware to the `middlewares-fail` key of the `yiisoft/queue` array in the `params`. +You can declare error handling a middleware pipeline in the `FailureMiddlewareDispatcher`, either in the constructor, or in the `withMiddlewares()` method. If you use [yiisoft/config](https://github.com/yiisoft/config), you can add middleware to the `middlewares-fail` key of the [`yiisoft/queue`](https://github.com/yiisoft/queue) array in the `params`. See [error handling docs](docs/guide/error-handling.md) for details. diff --git a/docs/guide/en/README.md b/docs/guide/en/README.md index 564b8ca9..4bc0b06b 100644 --- a/docs/guide/en/README.md +++ b/docs/guide/en/README.md @@ -2,10 +2,38 @@ An extension for running tasks asynchronously via queues. -## Guides and concept explanations +## Getting started + +- [Configuration with yiisoft/config](configuration-with-config.md) +- [Manual configuration](configuration-manual.md) - [Usage basics](usage.md) -- [Migrating from `yii2-queue`](migrating-from-yii2-queue.md) -- [Errors and retryable jobs](error-handling.md) - [Workers](worker.md) +- [Console commands](console-commands.md) + +## Adapters + - [Adapter list](adapter-list.md) +- [Synchronous adapter](adapter-sync.md) + +## Core concepts + +- [Queue channels](channels.md) +- [Message handler](message-handler.md) +- [Envelopes](envelopes.md) +- [Loops](loops.md) + +## Interoperability + +- [Producing messages from external systems](producing-messages-from-external-systems.md) + +## Reliability and visibility + +- [Errors and retryable jobs](error-handling.md) +- [Failure handling pipeline](failure-handling-pipeline.md) +- [Job status](job-status.md) +- [Yii Debug integration](debug-integration.md) + +## Migration from Yii2 + +- [Migrating from `yii2-queue`](migrating-from-yii2-queue.md) diff --git a/docs/guide/en/channels.md b/docs/guide/en/channels.md new file mode 100644 index 00000000..5fe2128f --- /dev/null +++ b/docs/guide/en/channels.md @@ -0,0 +1,118 @@ +# Queue channels + +A *queue channel* is a named queue configuration. + +In practice, a channel is a string (for example, `yii-queue`, `emails`, `critical`) that selects which queue backend (adapter) messages are pushed to and which worker consumes them. + +Having multiple channels is useful when you want to separate workloads, for example: + +- **Different priorities**: `critical` vs `low`. +- **Different message types**: `emails`, `reports`, `webhooks`. +- **Different backends / connections**: fast Redis queue for short jobs and a different backend for long-running jobs. + +The default channel name is `Yiisoft\Queue\QueueInterface::DEFAULT_CHANNEL` (`yii-queue`). + +## How channels are used in the code + +- A channel name is passed to `Yiisoft\Queue\Provider\QueueProviderInterface::get($channel)`. +- The provider returns a `Yiisoft\Queue\QueueInterface` instance bound to that channel. +- Internally, the provider creates an adapter instance and calls `AdapterInterface::withChannel($channel)`. + +In other words, a channel is the key that lets the application select a particular adapter instance/configuration. + +## Choosing a channel at runtime + +### In CLI + +These built-in commands accept channel names: + +- `queue:listen [channel]` listens to a single channel (defaults to `yii-queue`). +- `queue:run [channel1 [channel2 [...]]]` processes existing messages and exits. +- `queue:listen-all [channel1 [channel2 [...]]]` iterates over multiple channels (meant mostly for development). + +Examples: + +```sh +yii queue:listen emails +yii queue:run critical emails --maximum=100 +yii queue:listen-all critical emails --pause=1 --maximum=500 +``` + +### In PHP code + +When you have a `QueueProviderInterface`, request a queue by channel name: + +```php +/** @var \Yiisoft\Queue\Provider\QueueProviderInterface $provider */ + +$emailsQueue = $provider->get('emails'); +$emailsQueue->push(new \Yiisoft\Queue\Message\Message('send-email', ['to' => 'user@example.com'])); +``` + +## Configuration with yiisoft/config + +When using [yiisoft/config](https://github.com/yiisoft/config), channel configuration is stored in params under `yiisoft/queue.channels`. + +It is a map: + +- key: channel name +- value: adapter definition that should be resolved for that channel + +Minimal example (single channel): + +```php +use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\QueueInterface; + +return [ + 'yiisoft/queue' => [ + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL => AdapterInterface::class, + ], + ], +]; +``` + +Multiple channels example: + +```php +use Yiisoft\Queue\QueueInterface; + +return [ + 'yiisoft/queue' => [ + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL => \Yiisoft\Queue\Adapter\AdapterInterface::class, + 'critical' => \Yiisoft\Queue\Adapter\AdapterInterface::class, + 'emails' => \Yiisoft\Queue\Adapter\AdapterInterface::class, + ], + ], +]; +``` + +The exact adapter definitions depend on which queue adapter package you use (Redis, AMQP, etc.). + +When using the default DI config from this package, the configured channel names are also used as the default channel list for `queue:run` and `queue:listen-all`. + +## Manual configuration (without yiisoft/config) + +For multiple channels without `yiisoft/config`, you can create a provider manually. + +`AdapterFactoryQueueProvider` accepts adapter definitions indexed by channel names and returns a `QueueInterface` for a channel on demand: + +```php +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; + +$definitions = [ + 'channel1' => new \Yiisoft\Queue\Adapter\SynchronousAdapter($worker, $queue), + 'channel2' => static fn (\Yiisoft\Queue\Adapter\SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), +]; + +$provider = new AdapterFactoryQueueProvider( + $queue, + $definitions, + $container, +); + +$queueForChannel1 = $provider->get('channel1'); +$queueForChannel2 = $provider->get('channel2'); +``` diff --git a/docs/guide/en/configuration-manual.md b/docs/guide/en/configuration-manual.md new file mode 100644 index 00000000..90d42104 --- /dev/null +++ b/docs/guide/en/configuration-manual.md @@ -0,0 +1,116 @@ +# Manual Configuration (without [yiisoft/config](https://github.com/yiisoft/config)) + +This guide explains how to set up the queue component manually, without using [yiisoft/config](https://github.com/yiisoft/config). + +## Basic setup + +To use the queue, you need to create instances of the following classes: + +1. **Adapter** - handles the actual queue backend (e.g., `SynchronousAdapter`, or an adapter from external packages like Redis, AMQP, etc.) +2. **Worker** - processes messages from the queue +3. **Queue** - the main entry point for pushing messages + +### Example + +```php +use Yiisoft\Queue\Adapter\SynchronousAdapter; +use Yiisoft\Queue\Queue; +use Yiisoft\Queue\Worker\Worker; +use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume; +use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher; +use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailure; +use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush; +use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Psr\Container\ContainerInterface; + +// You need a PSR-11 container for dependency injection +/** @var ContainerInterface $container */ + +// Define message handlers +$handlers = [ + 'file-download' => [FileDownloader::class, 'handle'], + FileDownloader::class => [FileDownloader::class, 'handle'], +]; + +// Create middleware dispatchers +$consumeMiddlewareDispatcher = new ConsumeMiddlewareDispatcher( + new MiddlewareFactoryConsume($container), +); + +$failureMiddlewareDispatcher = new FailureMiddlewareDispatcher( + new MiddlewareFactoryFailure($container), + [], +); + +$pushMiddlewareDispatcher = new PushMiddlewareDispatcher( + new MiddlewareFactoryPush($container), +); + +// Create worker +$worker = new Worker( + $handlers, + $container->get(\Psr\Log\LoggerInterface::class), + $container->get(\Yiisoft\Injector\Injector::class), + $container, + $consumeMiddlewareDispatcher, + $failureMiddlewareDispatcher, +); + +// Create queue with adapter +$queue = new Queue( + $worker, + $pushMiddlewareDispatcher, + $container->get(\Psr\EventDispatcher\EventDispatcherInterface::class), + new SynchronousAdapter($worker, /* queue instance will be set via withAdapter */), +); + +// Now you can push messages +$message = new \Yiisoft\Queue\Message\Message('file-download', ['url' => 'https://example.com/file.pdf']); +$queue->push($message); +``` + +## Using Queue Provider + +For multiple queue channels, use `AdapterFactoryQueueProvider`: + +```php +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; +use Yiisoft\Queue\Adapter\SynchronousAdapter; + +$definitions = [ + 'channel1' => new SynchronousAdapter($worker, $queue), + 'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'), +]; + +$provider = new AdapterFactoryQueueProvider( + $queue, + $definitions, + $container, +); + +$queueForChannel1 = $provider->get('channel1'); +$queueForChannel2 = $provider->get('channel2'); +``` + +## Running the queue + +### Processing existing messages + +```php +$queue->run(); // Process all messages +$queue->run(10); // Process up to 10 messages +``` + +### Listening for new messages + +```php +$queue->listen(); // Run indefinitely +``` + +## Next steps + +- [Usage basics](usage.md) - learn how to create messages and handlers +- [Workers](worker.md) - understand handler formats +- [Error handling](error-handling.md) - configure retries and failure handling +- [Adapter list](adapter-list.md) - choose a production-ready adapter diff --git a/docs/guide/en/configuration-with-config.md b/docs/guide/en/configuration-with-config.md new file mode 100644 index 00000000..14d28e2f --- /dev/null +++ b/docs/guide/en/configuration-with-config.md @@ -0,0 +1,45 @@ +# Configuration with [yiisoft/config](https://github.com/yiisoft/config) + +If you are using [yiisoft/config](https://github.com/yiisoft/config) (i.e. installed with [yiisoft/app](https://github.com/yiisoft/app) or [yiisoft/app-api](https://github.com/yiisoft/app-api)), you'll find out this package has some defaults in the [`common`](../../../config/di.php) and [`params`](../../../config/params.php) configurations saving your time. + +## Where to put the configuration + +In [yiisoft/app](https://github.com/yiisoft/app)/[yiisoft/app-api](https://github.com/yiisoft/app-api) templates you typically add or adjust configuration in `config/params.php`. +If your project structure differs, put configuration into any params config file that is loaded by [yiisoft/config](https://github.com/yiisoft/config). + +## What you need to configure + +- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation. +- And/or define channel-specific `AdapterInterface` implementations in the `channels` params key. See more about channels [here](./channels.md). +- Define [message handlers](./message-handlers.md) in the `handlers` params key to be used with the `QueueWorker`. +- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). + +## Minimal configuration example + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], +]; +``` + +## Full configuration example + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + 'channels' => [ + \Yiisoft\Queue\QueueInterface::DEFAULT_CHANNEL => \Yiisoft\Queue\Adapter\AdapterInterface::class, + ], + 'middlewares-push' => [], + 'middlewares-consume' => [], + 'middlewares-fail' => [], + ], +]; +``` diff --git a/docs/guide/en/console-commands.md b/docs/guide/en/console-commands.md new file mode 100644 index 00000000..267ea96f --- /dev/null +++ b/docs/guide/en/console-commands.md @@ -0,0 +1,45 @@ +# Console commands + +Yii Queue provides several console commands for processing queued jobs. + +If you are using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), the commands are registered automatically. + +If you are using [symfony/console](https://github.com/symfony/console) directly, you should register the commands manually. + +## 1. Run queued messages and exit + +The command `queue:run` obtains and executes tasks until the queue is empty, then exits. + +You can also narrow the scope of processed messages by specifying channel(s) and maximum number of messages to process: + +- Specify one or more channels to process. Messages from other channels will be ignored. Default is all registered channels (in case of using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), otherwise pass the default channel list to the command constructor). +- Use `--maximum` to limit the number of messages processed. When set, command will exit either when all the messages are processed or when the maximum count is reached. + +The full command signature is: +```sh +yii queue:run [channel1 [channel2 [...]]] --maximum=100 +``` + +## 2. Listen for queued messages and process them continuously + +The following command launches a daemon, which infinitely consumes messages from a single channel of the queue. This command receives an optional `channel` argument to specify which channel to listen to, defaults to the default channel `yii-queue`. + +```sh +yii queue:listen [channel] +``` + +## 3. Listen to multiple channels + +The following command iterates through multiple channels and is meant to be used in development environment only, as it consumes a lot of CPU for iterating through channels. You can pass to it: + +- `channel` argument(s). Specify one or more channels to process. Messages from other channels will be ignored. Default is all registered channels (in case of using [yiisoft/config](https://github.com/yiisoft/config) and [yiisoft/yii-console](https://github.com/yiisoft/yii-console), otherwise pass the default channel list to the command constructor). +- `--maximum` option to limit the number of messages processed before switching to another channel. E.g. you set `--maximum` to 500 and right now you have 1000 messages in `channel1`. This command will consume only 500 of them, then it will switch to `channel2` to see if there are any messages there. Defaults to `0` (no limit). +- `--pause` option to specify the number of seconds to pause between checking channels when no messages are found. Defaults to `1`. + +The full command signature is: +```sh +yii queue:listen-all [channel1 [channel2 [...]]] --pause=1 --maximum=0 +``` + +For long-running processes, graceful shutdown is controlled by `LoopInterface`. When `ext-pcntl` is available, +the default `SignalLoop` handles signals such as `SIGTERM`/`SIGINT`. diff --git a/docs/guide/en/debug-integration.md b/docs/guide/en/debug-integration.md new file mode 100644 index 00000000..c6f785c6 --- /dev/null +++ b/docs/guide/en/debug-integration.md @@ -0,0 +1,53 @@ +# Yii Debug integration + +This package provides an integration with [yiisoft/yii-debug](https://github.com/yiisoft/yii-debug). + +When debug is enabled, it collects queue-related information and shows it in the Yii Debug panel. + +## What is collected + +The integration is based on `Yiisoft\Queue\Debug\QueueCollector`. + +It collects: + +- Pushed messages grouped by queue channel (including middleware definitions passed to `push()`). +- Job status checks performed via `QueueInterface::status()`. +- Messages processed by a worker grouped by queue channel. + +## How it works + +The integration is enabled by registering the collector and wrapping tracked services with proxy implementations. + +In this package defaults (see `config/params.php`), the following services are tracked: + +- `Yiisoft\Queue\Provider\QueueProviderInterface` is wrapped with `Yiisoft\Queue\Debug\QueueProviderInterfaceProxy`. + The proxy decorates returned queues with `Yiisoft\Queue\Debug\QueueDecorator` to collect `push()` and `status()` calls. +- `Yiisoft\Queue\Worker\WorkerInterface` is wrapped with `Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy` to collect message processing. + +Because of that, to see data in debug you should obtain `QueueProviderInterface` / `WorkerInterface` from the DI container. + +## Configuration + +If you use [yiisoft/config](https://github.com/yiisoft/config) and the configuration plugin, these defaults are loaded automatically from this package. + +Otherwise, you can configure it manually in your params configuration: + +```php +use Yiisoft\Queue\Debug\QueueCollector; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; +use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\Worker\WorkerInterface; + +return [ + 'yiisoft/yii-debug' => [ + 'collectors' => [ + QueueCollector::class, + ], + 'trackedServices' => [ + QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class], + WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class], + ], + ], +]; +``` diff --git a/docs/guide/en/envelopes.md b/docs/guide/en/envelopes.md new file mode 100644 index 00000000..5e46cc25 --- /dev/null +++ b/docs/guide/en/envelopes.md @@ -0,0 +1,62 @@ +# Envelopes + +An *envelope* is a message container that wraps another message and adds metadata. + +An envelope implements `Yiisoft\Queue\Message\EnvelopeInterface`, which itself extends `Yiisoft\Queue\Message\MessageInterface`. + +## How an envelope behaves + +An envelope acts like the wrapped message: + +- `getHandlerName()` is delegated to the wrapped message. +- `getData()` is delegated to the wrapped message. + +What an envelope adds is `getMetadata()`. + +## Metadata and envelope stacking + +Every envelope contributes its own metadata to the resulting metadata array. + +`Yiisoft\Queue\Message\Envelope` (base class) also maintains an envelope stack in message metadata under `EnvelopeInterface::ENVELOPE_STACK_KEY` (`"envelopes"`). + +When `getMetadata()` is called on an envelope, it returns: + +- the wrapped message metadata, +- plus an updated `"envelopes"` stack (previous stack + current envelope class), +- plus envelope-specific metadata. + +Because envelopes wrap other messages, multiple envelopes form a stack. + +## Creating envelopes + +To wrap a message into an envelope, envelope classes provide: + +- `EnvelopeInterface::fromMessage(MessageInterface $message): static` + +and, via `MessageInterface` inheritance, also support: + +- `Envelope::fromData(string $handlerName, mixed $data, array $metadata = []): static` + +## Restoring envelopes from metadata + +If metadata contains the `"envelopes"` key with an array of envelope class names, the serializer will try to rebuild the stack by wrapping the message with each envelope class in the given order. + +During this process: + +- The `"envelopes"` key is removed from the base message metadata (it is set to an empty array before creating the base message). +- Each envelope class from the list is applied to the message using `EnvelopeInterface::fromMessage(...)`. +- A value is applied only if it is a string, the class exists, and it implements `EnvelopeInterface`. Otherwise it is ignored. + +## Built-in envelopes + +### IdEnvelope + +`Yiisoft\Queue\Message\IdEnvelope` adds a message identifier into metadata under the `IdEnvelope::MESSAGE_ID_KEY` key (`"yii-message-id"`). + +This envelope is used to carry the adapter-provided message ID through the message lifecycle. + +### FailureEnvelope + +`Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope` stores failure-handling metadata under the `FailureEnvelope::FAILURE_META_KEY` key (`"failure-meta"`). + +The envelope merges failure metadata when building `getMetadata()`. diff --git a/docs/guide/en/error-handling.md b/docs/guide/en/error-handling.md index 2a17c11a..4c07cd67 100644 --- a/docs/guide/en/error-handling.md +++ b/docs/guide/en/error-handling.md @@ -1,12 +1,12 @@ # Error handling on message processing -Often when some message handling is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in `yiisoft/queue` with _Failure Handling Middleware Pipeline_. It is triggered each time message processing via Consume Middleware Pipeline is interrupted with any `Throwable`. +Often when some message handling is failing, we want to retry its execution a couple more times or redirect it to another queue channel. This can be done in [yiisoft/queue](https://github.com/yiisoft/queue) with _Failure Handling Middleware Pipeline_. It is triggered each time message processing via Consume Middleware Pipeline is interrupted with any `Throwable`. ## Configuration -Here below is configuration via `yiisoft/config`. If you don't use it, you should add a middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own. +Here below is configuration via [yiisoft/config](https://github.com/yiisoft/config). If you don't use it, you should add a middleware definition list (in the `middlewares-fail` key here) to the `FailureMiddlewareDispatcher` by your own. -Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` key of the `params` config to work with the `yiisoft/config`. You can define different failure handling pipelines for each queue channel. Let's see and describe an example: +Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` key of the `params` config to work with the [yiisoft/config](https://github.com/yiisoft/config). You can define different failure handling pipelines for each queue channel. Let's see and describe an example: ```php 'yiisoft/queue' => [ @@ -42,7 +42,7 @@ Configuration should be passed to the `yiisoft/queue.fail-strategy-pipelines` ke Keys here except `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` are queue channel names, and values are lists of `FailureMiddlewareInterface` definitions. `FailureMiddlewareDispatcher::DEFAULT_PIPELINE` defines a default pipeline to apply to channels without an explicitly defined failure strategy pipeline. Each middleware definition must be one of: - A ready-to-use `MiddlewareFailureInterface` object like `new FooMiddleware()`. - A valid definition for the [yiisoft/definitions](https://github.com/yiisoft/definitions). It must describe an object, implementing the `MiddlewareFailureInterface`. -- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the `yiisoft/injector`, so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where an object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`. +- A callable: `fn() => // do stuff`, `$object->foo(...)`, etc. It will be executed through the [yiisoft/injector](https://github.com/yiisoft/injector), so all the dependencies of your callable will be resolved. You can also define a "callable-looking" array, where an object will be instantiated with a DI container: `[FooMiddleware::class, 'handle']`. - A string for your DI container to resolve the middleware, e.g. `FooMiddleware::class`. In the example above failures will be handled this way (look the concrete middleware description below): diff --git a/docs/guide/en/failure-handling-pipeline.md b/docs/guide/en/failure-handling-pipeline.md new file mode 100644 index 00000000..b104e5ea --- /dev/null +++ b/docs/guide/en/failure-handling-pipeline.md @@ -0,0 +1,127 @@ +# Failure handling pipeline + +`yiisoft/queue` can deal with errors that happen while a worker is processing a message. This guide explains what exactly happens when something goes wrong, and when you should rely on the built-in failure handling vs. when the exception will be bubbled up. + +## When failure handling is triggered + +Failure handling is triggered only when message processing throws a `Throwable`. + +In practice it means: + +- The worker runs message processing in `Yiisoft\Queue\Worker\Worker::process()`. +- Your message handler is executed through the consume middleware pipeline. +- If any `Throwable` escapes that pipeline, the worker switches to the failure handling pipeline. + +## Pipeline overview (step-by-step) + +1. A message is processed via the consume pipeline + + The worker builds a `Yiisoft\Queue\Middleware\Consume\ConsumeRequest` and dispatches it through `ConsumeMiddlewareDispatcher`. The final consume handler invokes the resolved message handler. + +2. A `Throwable` is caught by the worker + + If any middleware or the message handler throws, `Worker::process()` catches it. + +3. Failure context is wrapped into a request object + + The worker creates a `Yiisoft\Queue\Middleware\FailureHandling\FailureHandlingRequest` containing: + + - the message + - the caught exception + - the queue instance (including its channel) + +4. A failure pipeline is selected by queue channel + + `FailureMiddlewareDispatcher::dispatch()` selects which pipeline to run: + + - It tries to use the pipeline configured for the current queue channel. + - If there is no pipeline for that channel (or it is empty), it falls back to `FailureMiddlewareDispatcher::DEFAULT_PIPELINE`. + +5. Failure middlewares are executed + + The dispatcher builds a lazy middleware stack (`MiddlewareFailureStack`) and invokes it. + + Each failure middleware implements `MiddlewareFailureInterface`: + + - It receives the `FailureHandlingRequest` and a continuation handler. + - It may “handle” the failure by producing a new request (for example, by pushing a retry message to some queue and returning `withMessage(...)` / `withQueue(...)`). + - If it decides not to handle the failure, it calls `$handler->handleFailure($request)` to continue the pipeline. + +6. If nothing handles the failure, the exception is rethrown + + The failure pipeline ends with `FailureFinalHandler`, which throws `$request->getException()`. + +7. The worker wraps and rethrows + + If the failure pipeline itself ends with an exception, `Worker::process()` wraps it into `Yiisoft\Queue\Exception\JobFailureException` (including message id from `IdEnvelope` metadata when available) and throws it. + +## What “handled failure” means + +A failure is considered “handled” if the failure pipeline returns a `FailureHandlingRequest` without throwing. + +In practice, built-in middlewares “handle” failures by re-queueing the message (same or different queue/channel), optionally with a delay, and returning the updated request. + +## Built-in failure handling components + +This package ships the following built-in failure handling components. + +### SendAgainMiddleware + +Class: `Yiisoft\Queue\Middleware\FailureHandling\Implementation\SendAgainMiddleware` + +Behavior: + +- Resends the message to a queue immediately. +- If `targetQueue` is `null`, it resends to the original queue. +- It stops applying itself after `maxAttempts` attempts. + +State tracking: + +- Uses `FailureEnvelope` metadata (`failure-meta`) to store the per-middleware attempt counter. +- The counter key is `failure-strategy-resend-attempts-{id}`. + +### ExponentialDelayMiddleware + +Class: `Yiisoft\Queue\Middleware\FailureHandling\Implementation\ExponentialDelayMiddleware` + +Behavior: + +- Resends the message with an exponentially increasing delay. +- Requires a `DelayMiddlewareInterface` implementation and an adapter that supports delayed delivery. +- Can resend to an explicitly provided queue or to the original queue. +- It stops applying itself after `maxAttempts` attempts. + +State tracking: + +- Uses `FailureEnvelope` metadata (`failure-meta`) to store attempts and the previous delay. +- The per-middleware keys are: + + - `failure-strategy-exponential-delay-attempts-{id}` + - `failure-strategy-exponential-delay-delay-{id}` + +### FailureEnvelope + +Class: `Yiisoft\Queue\Middleware\FailureHandling\FailureEnvelope` + +Behavior: + +- An envelope that stores failure-related metadata under the `failure-meta` key. +- Built-in failure middlewares use it to persist retry counters / delay parameters across retries. + +### FailureFinalHandler + +Class: `Yiisoft\Queue\Middleware\FailureHandling\FailureFinalHandler` + +Behavior: + +- Terminal failure handler. +- Throws the exception from the request when the failure pipeline does not handle the failure. + +### JobFailureException + +Class: `Yiisoft\Queue\Exception\JobFailureException` + +Behavior: + +- Thrown by the worker when failure handling does not resolve the issue. +- Wraps the original exception and includes the queue message id (if available) in the exception message. diff --git a/docs/guide/en/job-status.md b/docs/guide/en/job-status.md new file mode 100644 index 00000000..9e3224ca --- /dev/null +++ b/docs/guide/en/job-status.md @@ -0,0 +1,86 @@ +# Job status + +Yii Queue can report a job status by its message ID. + +The API surface is: + +- `QueueInterface::status(string|int $id): JobStatus` +- `AdapterInterface::status(string|int $id): JobStatus` + +Status tracking support depends on the adapter. If an adapter doesn't store IDs or doesn't keep status history, you might not be able to use `status()` reliably. + +## Getting a message ID + +`QueueInterface::push()` returns a `MessageInterface`. When the adapter supports IDs, the returned message is typically wrapped into an `IdEnvelope`, which stores the ID in message metadata. + +To read the ID: + +```php +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; +``` + +If `$id` is `null`, the current adapter didn't provide an ID and you can't query a status. + +The ID type (`string` or `int`) and how long it stays queryable are adapter-specific. + +## Statuses + +Statuses are represented by the `Yiisoft\Queue\JobStatus` enum: + +- `JobStatus::WAITING` + The job exists in the queue and is waiting for execution. + +- `JobStatus::RESERVED` + A worker has taken the job for processing. + +- `JobStatus::DONE` + The job has been processed. + +In addition to enum cases, `JobStatus` provides a string key via `JobStatus::key()`: + +```php +$statusKey = $status->key(); // "waiting", "reserved" or "done" +``` + +## Querying a status + +```php +use Yiisoft\Queue\JobStatus; +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; + +if ($id === null) { + throw new \RuntimeException('The adapter did not provide a message ID, status tracking is unavailable.'); +} + +$status = $queue->status($id); + +if ($status === JobStatus::WAITING) { + // The job is waiting for execution. +} + +if ($status === JobStatus::RESERVED) { + // A worker is processing the job right now. +} + +if ($status === JobStatus::DONE) { + // The job has been processed. +} +``` + +## Errors and edge cases + +- **Unknown ID** + If an adapter can't find the message by ID, it must throw `InvalidArgumentException`. + +- **Timing** + `RESERVED` can be transient: depending on the adapter, a job may move from `WAITING` to `RESERVED` and then to `DONE` quickly. + +- **Failures / retries** + Job failures and retries are handled by the worker and middleware pipelines and are described in [Errors and retryable jobs](./error-handling.md). + How failures affect job status is adapter-specific. diff --git a/docs/guide/en/loops.md b/docs/guide/en/loops.md index e69de29b..dd957df4 100644 --- a/docs/guide/en/loops.md +++ b/docs/guide/en/loops.md @@ -0,0 +1,118 @@ +# Loops + +Yii Queue uses `\Yiisoft\Queue\Cli\LoopInterface` to control long-running execution. + +The loop is checked: + +- After each processed message (via `Queue::run()` / `Queue::listen()`). +- On each iteration of `queue:listen-all`. + +When the loop says it **cannot continue**, consuming stops gracefully (as soon as the current message is finished). + +See also: + +- [Console commands](console-commands.md) +- [Workers](worker.md) + +## Loop interface + +The interface is minimal: + +```php +namespace Yiisoft\Queue\Cli; + +interface LoopInterface +{ + public function canContinue(): bool; +} +``` + +Adapters receive a callback that returns `bool`. When the callback returns `false`, the adapter should stop consuming. + +## Built-in implementations + +### `SignalLoop` + +`\Yiisoft\Queue\Cli\SignalLoop` is used by default when `ext-pcntl` is available. + +It supports: + +- Graceful shutdown on `SIGHUP`, `SIGINT`, `SIGTERM`. +- Pause/resume via `SIGTSTP` and `SIGCONT`. +- Optional soft memory limit (see below). + +### `SimpleLoop` + +`\Yiisoft\Queue\Cli\SimpleLoop` is used by default when `ext-pcntl` is **not** available. + +It supports: + +- Optional soft memory limit. + +## Soft memory limit + +Both built-in loops accept `memorySoftLimit` (in bytes): + +- `0` means “no limit”. +- When the current process memory usage reaches the limit, `canContinue()` returns `false`. + +This is useful for recycling long-running workers in process managers such as systemd or Supervisor. + +## Configuration + +### With `yiisoft/config` + +By default, `LoopInterface` is resolved to `SignalLoop` when `ext-pcntl` is available, otherwise to `SimpleLoop`. + +To set a soft memory limit, configure both loop implementations: + +```php +use Yiisoft\Queue\Cli\SignalLoop; +use Yiisoft\Queue\Cli\SimpleLoop; + +return [ + SignalLoop::class => [ + '__construct()' => [ + 'memorySoftLimit' => 256 * 1024 * 1024, + ], + ], + SimpleLoop::class => [ + '__construct()' => [ + 'memorySoftLimit' => 256 * 1024 * 1024, + ], + ], +]; +``` + +To force a specific implementation regardless of `ext-pcntl` availability, override `LoopInterface` binding: + +```php +use Yiisoft\Queue\Cli\LoopInterface; +use Yiisoft\Queue\Cli\SimpleLoop; + +return [ + LoopInterface::class => SimpleLoop::class, +]; +``` + +### Manual configuration (without `yiisoft/config`) + +Instantiate the loop you want and pass it to `Queue` (and, depending on adapter, to adapter constructor as well): + +```php +use Yiisoft\Queue\Cli\SignalLoop; + +$loop = new SignalLoop(memorySoftLimit: 256 * 1024 * 1024); +``` + +## Writing a custom loop + +Implement `LoopInterface` and encapsulate your own stopping conditions: + +- Time limits. +- Message count limits. +- External stop flags. +- Integration with your own signal / shutdown handling. + +The only requirement is that `canContinue()` returns `false` when the worker should stop. + diff --git a/docs/guide/en/message-handler.md b/docs/guide/en/message-handler.md new file mode 100644 index 00000000..01be06e1 --- /dev/null +++ b/docs/guide/en/message-handler.md @@ -0,0 +1,228 @@ +# Message handler + +A *message handler* is what processes a queue message. Internally, `Yiisoft\Queue\Worker\Worker` resolves a handler by the message handler name (`MessageInterface::getHandlerName()`) and then executes it through `Yiisoft\Injector\Injector`. + +Handler definitions are configured in: + +- `$params['yiisoft/queue']['handlers']` when using [yiisoft/config](https://github.com/yiisoft/config), or +- the `$handlers` argument of `Yiisoft\Queue\Worker\Worker` when creating it manually. + +## Supported handler definition formats + +`Worker` supports a limited set of formats. Below are the exact formats that are converted to a callable. + +### 1. HandlerInterface implementation (without mapping) + +If your handler is a dedicated class implementing `Yiisoft\Queue\Message\MessageHandlerInterface`, you can use the class name itself as the message handler name. + +This is the default and most convenient option when the producer and the consumer are the same application. + +In this setup, you usually don't need to configure handler mapping at all as long as your DI container can resolve the handler class. + +**Message**: + +```php +new \Yiisoft\Queue\Message\Message(\App\Queue\RemoteFileHandler::class, ['url' => '...']); +``` + +**Handler**: + +```php +final class RemoteFileHandler implements \Yiisoft\Queue\Message\MessageHandlerInterface +{ + public function handle(\Yiisoft\Queue\Message\MessageInterface $message): void + { + // Handle the message + } +} +``` + +**Config**: + +Not needed + +**Pros**: + +- Minimal configuration. +- Stable refactoring inside the same application (rename-safe if you rename the class and update the producer code). +- Easy to unit-test the handler as a normal class. + +**Cons**: + +- Couples produced messages to PHP class names. +- Requires producer and consumer to share the same naming contract (usually “same app”). + +**Use when**: + +- Producer and consumer are the same application. +- You control message creation code and can safely use FQCN as the handler name. + +### 2. Closure + +In this and all the cases below, you should use a proper handler name when pushing a `Message` instead of a handler class name in the example above: + +```php +new \Yiisoft\Queue\Message\Message('send-email', ['data' => '...']); +``` + +**Config**: + +Map handler name to a closure in `$params`: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'send-email' => static fn (\Yiisoft\Queue\Message\MessageInterface $message, \App\Foo $foo) => $foo->bar($message->getData()), + ], + ], +]; +``` + +**How it works**: + +- A `Closure` is accepted as-is. +- The worker executes it using `Injector`, so you may type-hint extra dependencies in the closure parameters. + +**Pros**: + +- Very simple for small tasks and quick prototypes. +- Easy to inject extra services via `Injector`. + +**Cons**: + +- Less reusable and harder to unit-test than a dedicated class. +- Easy to accidentally put non-trivial business logic into config. +- Harder to maintain and refactor as the logic grows. + +**Use when**: + +- You're prototyping async workflows and going to refactor it later into a proper handler class. +- You want a quick "glue" handler that delegates to services. + +### 3. Container ID string + +**Config**: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => FileDownloader::class, + ], + ], +]; +``` + +**How it works**: + +The handler object is retrieved from the DI container. In this case the handler class should either + +- have the `__invoke()` method, which receives a message parameter, +- implement `Yiisoft\Queue\Message\MessageHandlerInterface` (then the `$handler->handle(...)` method is called). + +If the resolved service is neither callable nor a `MessageHandlerInterface`, the handler is treated as invalid. + +**Pros**: + +- Short and clean configuration. +- Supports invokable handlers and `MessageHandlerInterface` handlers. + +**Cons**: + +— + +**Use when**: + +- You already registered handlers in DI (recommended for production). +- You prefer invokable handlers (`__invoke`) or `MessageHandlerInterface`. + +### 4. Two-element array of strings: `[classOrServiceId, method]` + +**Config**: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => [FileDownloader::class, 'handle'], + 'file-download2' => [$handler, 'handle'], + ], + ], +]; +``` + +**How it works**: + +- If the class exists: + - If the method is static, it is called statically: `[$className, $methodName]`. Dependencies may be passed *to the provided method* in case they are resolvable from the DI container. + - If the first element is an object instance, it is called as `$firstElement->$methodName(...)` with dependency injection applied *to the $methodName*. + - If the method is not static, the class must be resolvable from the DI container, and the worker calls `$container->get($className)->$methodName(...)`. DI container will also resolve dependencies declared in the *class constructor*. + +**Pros**: + +- Explicit method name, good for “classic” `handle()` methods. +- Supports static methods for pure, dependency-free handlers. + +**Cons**: + +- Harder to maintain and refactor than regular class definitions with either `__invoke` method or `MessageHandlerInterface` implementation. + +**Use when**: + +- You want to use static handlers (rare, but can be useful for pure transforms). +- You want to group different handlers in a single class for organizational purposes. + +## When mapping by short names is a better idea + +While FQCN-as-name is convenient inside a single application, mapping by a short name is often a better contract. That is true when messages are produced outside the current codebase, or when you want to create a stable public API for inter-service communication. + +**Typical cases**: + +- Another application pushes messages to the same broker. +- A different language/runtime produces messages. +- You want a stable public contract that is independent of your PHP namespaces and refactorings. + +In these cases you typically keep message handler names small and stable, and map them in config: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => \App\Queue\RemoteFileHandler::class, + ], + ], +]; +``` + +This way external producers never need to know your internal PHP class names. + +## Common pitfalls and unsupported formats + +- A string definition is **not** treated as a function name. It is treated only as a DI container ID. +- A class-string that is not resolvable via `$container->has()` will not be auto-instantiated. +- [yiisoft/definitions](https://github.com/yiisoft/definitions) array format (like `['class' => ..., '__construct()' => ...]`) is **not** supported for handlers. + +## Recommended handler implementation styles + +- Prefer a dedicated handler class registered in DI. +- For maximal compatibility with the worker resolution rules either: + - Implement `MessageHandlerInterface` + - Make the handler invokable (`__invoke(MessageInterface $message): void`) + - Provide `[HandlerClass::class, 'handle']` and keep `handle(MessageInterface $message): void` as the entry point + +## Config location ([yiisoft/config](https://github.com/yiisoft/config)) + +When using [yiisoft/config](https://github.com/yiisoft/config), configure handlers under the [`yiisoft/queue`](https://github.com/yiisoft/queue) params key: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'handler-name' => [FooHandler::class, 'handle'], + ], + ], +]; +``` + +This config is consumed by the DI definitions from `config/di.php` where the `Worker` is constructed with `$params['yiisoft/queue']['handlers']`. diff --git a/docs/guide/en/producing-messages-from-external-systems.md b/docs/guide/en/producing-messages-from-external-systems.md new file mode 100644 index 00000000..647ba98c --- /dev/null +++ b/docs/guide/en/producing-messages-from-external-systems.md @@ -0,0 +1,132 @@ +# Producing messages from external systems + +This guide explains how to publish messages to a queue backend (RabbitMQ, Kafka, SQS, etc.) from *external producers* (including non-PHP services) so that `yiisoft/queue` consumers can correctly deserialize and process these messages. + +The key idea is simple: + +- The queue adapter reads a *raw payload* (usually a string) from the broker. +- The adapter passes that payload to a `Yiisoft\Queue\Message\MessageSerializerInterface` implementation. +- By default, `yiisoft/queue` config binds `MessageSerializerInterface` to `Yiisoft\Queue\Message\JsonMessageSerializer`. + +`JsonMessageSerializer` is only the default implementation. You can replace it with your own serializer by rebinding `Yiisoft\Queue\Message\MessageSerializerInterface` in your DI configuration. + +So, external systems should produce the **same payload format** that your consumer-side serializer expects (JSON described below is for the default `JsonMessageSerializer`). + +## 1. Handler name contract (most important part) + +`yiisoft/queue` resolves a handler by message handler name (`MessageInterface::getHandlerName()`). + +For external producers, you should not rely on PHP FQCN handler names. Prefer a stable short name and map it in the consumer application configuration (see [Message handler](message-handler.md)). + +Example mapping: + +```php +return [ + 'yiisoft/queue' => [ + 'handlers' => [ + 'file-download' => \App\Queue\RemoteFileHandler::class, + ], + ], +]; +``` + +External producer then always publishes `"name": "file-download"`. + +## 2. JSON payload format (JsonMessageSerializer) + +`Yiisoft\Queue\Message\JsonMessageSerializer` expects the message body to be a JSON object with these keys: + +- `name` (string, required) +- `data` (any JSON value, optional; defaults to `null`) +- `meta` (object, optional; defaults to `{}`) + +Minimal example: + +```json +{ + "name": "file-download", + "data": { + "url": "https://example.com/file.pdf", + "destinationFile": "/tmp/file.pdf" + } +} +``` + +Full example: + +```json +{ + "name": "file-download", + "data": { + "url": "https://example.com/file.pdf", + "destinationFile": "/tmp/file.pdf" + }, + "meta": { + "trace-id": "1f2c0e10b7b44c67", + "tenant-id": "acme" + } +} +``` + +### Notes about `meta` + +The `meta` key is used by `yiisoft/queue` for internal processing (like tracing and correlation) and should not be set by external systems. + +## 3. Data encoding rules + +- The payload must be UTF-8 JSON. +- `data` and `meta` must contain only JSON-encodable values: + - strings, numbers, booleans, null + - arrays + - objects (maps) + +If your broker stores bytes, publish the UTF-8 bytes of the JSON string. + +## 4. Publishing to a broker: what exactly to send + +`yiisoft/queue` itself does not define a network protocol. The exact “where” this JSON goes depends on the adapter: + +- Some adapters put this JSON into the broker message **body**. +- Some adapters may additionally use broker headers/attributes. + +For external producers you should: + +- Use the adapter documentation of your chosen backend (AMQP / Kafka / SQS / etc.) to know which queue/topic and routing settings to use. +- Ensure the **message body** is exactly the JSON described above (unless the adapter docs explicitly say otherwise). + +## 5. Examples (non-PHP) + +These examples show how to produce the JSON body. You still need to publish it with your broker-specific client. + +### Python (constructing JSON body) + +```python +import json + +payload = { + "name": "file-download", + "data": {"url": "https://example.com/file.pdf"} +} + +body = json.dumps(payload, ensure_ascii=False).encode("utf-8") +``` + +### Node.js (constructing JSON body) + +```js +const payload = { + name: 'file-download', + data: { url: 'https://example.com/file.pdf' }, +}; + +const body = Buffer.from(JSON.stringify(payload), 'utf8'); +``` + +### curl (for HTTP-based brokers / gateways) + +```sh +curl -X POST \ + -H 'Content-Type: application/json' \ + --data '{"name":"file-download","data":{"url":"https://example.com/file.pdf"}}' \ + https://your-broker-gateway.example.com/publish +``` diff --git a/docs/guide/en/usage.md b/docs/guide/en/usage.md index 9a663a88..b1a21455 100644 --- a/docs/guide/en/usage.md +++ b/docs/guide/en/usage.md @@ -1,5 +1,9 @@ # Usage basics +## Queue channels + +For a detailed explanation of what channels are and how to configure and use them (including CLI examples), see [Queue channels](channels.md). + ## Configuration You can configure it with a DI container in the following way: @@ -57,26 +61,42 @@ The exact way how a job is executed depends on the adapter used. Most adapters c console commands, which the component registers in your application. For more details, check the respective adapter documentation. +If you configured multiple channels, you can choose which channel to consume with console commands: + +```sh +yii queue:listen [channel] +yii queue:run [channel1 [channel2 [...]]] +yii queue:listen-all [channel1 [channel2 [...]]] +``` + ## Job status ```php -// Push a job into the queue and get a message ID. -$id = $queue->push(new SomeJob()); +use Yiisoft\Queue\JobStatus; +use Yiisoft\Queue\Message\IdEnvelope; + +$pushedMessage = $queue->push($message); +$id = $pushedMessage->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null; + +if ($id === null) { + throw new \RuntimeException('The adapter did not provide a message ID, status tracking is unavailable.'); +} -// Get job status. $status = $queue->status($id); // Check whether the job is waiting for execution. -$status->isWaiting(); +$status === JobStatus::WAITING; // Check whether a worker got the job from the queue and executes it. -$status->isReserved($id); +$status === JobStatus::RESERVED; // Check whether a worker has executed the job. -$status->isDone($id); +$status === JobStatus::DONE; ``` +For details and edge cases, see [Job status](job-status.md). + ## Limitations When using queues, it is important to remember that tasks are put into and obtained from the queue in separate diff --git a/src/Message/JsonMessageSerializer.php b/src/Message/JsonMessageSerializer.php index d9b3a92e..4635b632 100644 --- a/src/Message/JsonMessageSerializer.php +++ b/src/Message/JsonMessageSerializer.php @@ -55,7 +55,7 @@ public function unserialize(string $value): MessageInterface } $meta[EnvelopeInterface::ENVELOPE_STACK_KEY] = []; - $class = $payload['meta']['message-class'] ?? Message::class; + $class = $meta['message-class'] ?? Message::class; // Don't check subclasses when it's a default class: that's faster if ($class !== Message::class && !is_subclass_of($class, MessageInterface::class)) { $class = Message::class; diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 33c2dff6..e0ebc293 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -30,9 +30,11 @@ final class Worker implements WorkerInterface { + /** @var array Cache of resolved handlers */ private array $handlersCached = []; public function __construct( + /** @var array */ private readonly array $handlers, private readonly LoggerInterface $logger, private readonly Injector $injector, @@ -77,27 +79,24 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa private function getHandler(string $name): ?callable { + if ($name === '') { + return null; + } + if (!array_key_exists($name, $this->handlersCached)) { $definition = $this->handlers[$name] ?? null; - if ($definition === null && $this->container->has($name)) { - $handler = $this->container->get($name); - if ($handler instanceof MessageHandlerInterface) { - $this->handlersCached[$name] = $handler->handle(...); - - return $this->handlersCached[$name]; - } - - return null; + if ($definition === null) { + $definition = $name; } - $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null); + $this->handlersCached[$name] = $this->prepare($definition); } return $this->handlersCached[$name]; } /** - * Checks if the handler is a DI container alias + * Creates a callable from a definition. * * @param array|callable|object|string|null $definition * @@ -107,52 +106,93 @@ private function getHandler(string $name): ?callable */ private function prepare(callable|object|array|string|null $definition): callable|null { + if ($definition === null) { + return null; + } + + if ($definition instanceof Closure) { + return $definition; + } + if (is_string($definition) && $this->container->has($definition)) { - return $this->container->get($definition); + /** @var object $result */ + $result = $this->container->get($definition); + + if (is_callable($result)) { + return $result; + } + + if ($result instanceof MessageHandlerInterface) { + return $result->handle(...); + } + + return null; } if ( is_array($definition) && array_keys($definition) === [0, 1] - && is_string($definition[0]) && is_string($definition[1]) ) { - [$className, $methodName] = $definition; + if (is_object($definition[0])) { + [$object, $methodName] = $definition; - if (!class_exists($className) && $this->container->has($className)) { - return [ - $this->container->get($className), - $methodName, - ]; - } + try { + $reflection = new ReflectionMethod($object, $methodName); + } catch (ReflectionException $e) { + $this->logger->error($e); - if (!class_exists($className)) { - $this->logger->error("$className doesn't exist."); + return null; + } - return null; + $callable = [$object, $methodName]; + if (!is_callable($callable)) { + $this->logger->error(sprintf('%s::%s is not a callable.', $reflection->getDeclaringClass()->getName(), $methodName)); + + return null; + } + + return $callable; } - try { - $reflection = new ReflectionMethod($className, $methodName); - } catch (ReflectionException $e) { - $this->logger->error($e->getMessage()); + if (is_string($definition[0])) { + [$className, $methodName] = $definition; + + if (!class_exists($className) && $this->container->has($className)) { + return [ + $this->container->get($className), + $methodName, + ]; + } + + if (!class_exists($className)) { + $this->logger->error("$className doesn't exist."); + + return null; + } + + try { + $reflection = new ReflectionMethod($className, $methodName); + } catch (ReflectionException $e) { + $this->logger->error($e->getMessage()); + + return null; + } + if ($reflection->isStatic()) { + return [$className, $methodName]; + } + if ($this->container->has($className)) { + return [ + $this->container->get($className), + $methodName, + ]; + } return null; } - if ($reflection->isStatic()) { - return [$className, $methodName]; - } - if ($this->container->has($className)) { - return [ - $this->container->get($className), - $methodName, - ]; - } - - return null; } - return $definition; + return null; } private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface diff --git a/tests/Unit/WorkerTest.php b/tests/Unit/WorkerTest.php index b8f466f6..fc2051b7 100644 --- a/tests/Unit/WorkerTest.php +++ b/tests/Unit/WorkerTest.php @@ -60,6 +60,10 @@ public static function jobExecutedDataProvider(): iterable FakeHandler::class, [FakeHandler::class => new FakeHandler()], ]; + yield 'definition-object' => [ + [new FakeHandler(), 'execute'], + [], + ]; yield 'definition-class' => [ [FakeHandler::class, 'execute'], [FakeHandler::class => new FakeHandler()],