feat: multiqueue implementation, support for custom settings for queues/exchange#48
feat: multiqueue implementation, support for custom settings for queues/exchange#48
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements multi-queue support, allowing custom settings for queues and exchanges, and refactors the broker to use dedicated Queue and Exchange configuration objects instead of constructor parameters. Key changes include:
- Replaced multiple constructor parameters with
ExchangeandQueueconfiguration objects - Added support for multiple task queues with custom routing
- Introduced new exception types for better error handling
- Enhanced testing with dedicated test files for routing and startup scenarios
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| taskiq_aio_pika/broker.py | Refactored to use Queue/Exchange objects, added multi-queue support, enhanced routing logic |
| taskiq_aio_pika/queue.py | New Queue dataclass for queue configuration |
| taskiq_aio_pika/exchange.py | New Exchange dataclass for exchange configuration |
| taskiq_aio_pika/exceptions.py | New exception types for broker errors |
| tests/test_startup.py | New tests for startup/declaration behavior |
| tests/test_routing.py | New tests for multi-queue routing scenarios |
| tests/test_delay.py | Extracted delay tests from test_broker.py |
| tests/test_delay_with_plugin.py | Extracted plugin delay tests from test_broker.py |
| tests/utils.py | Extracted common test utility function |
| tests/conftest.py | Updated fixtures to use new Queue/Exchange objects |
| examples/*.py | Added usage examples for new features |
| README.md | Updated documentation for new API |
| pyproject.toml | Updated dependencies and linting rules |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Hi! Is this PR still being worked on? We ran into the exact problem this solves (multiple isolated queues on a single broker instance) and would love to see it merged. The current workaround of instantiating one |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 22 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if len(self._task_queues) == 1: | ||
| routing_key_name = ( | ||
| self._task_queues[0].routing_key or self._task_queues[0].name | ||
| ) |
There was a problem hiding this comment.
When len(self._task_queues) == 1, the routing key is determined from the single queue. However, if a queue is added later via with_queue() after messages have already been kicked, the routing logic will change from using a fixed routing key to requiring the queue_name label. This inconsistent behavior could cause confusion. Consider documenting this behavior or adding a warning when queues are added after the broker has been used for kicking messages.
| message.labels.get(self._label_for_routing), | ||
| ) | ||
| or "" | ||
| ) |
There was a problem hiding this comment.
When there are multiple task queues and no queue_name label is provided in the message, routing_key_name becomes an empty string (line 382). With a DIRECT exchange type, this will raise an IncorrectRoutingKeyError, but with TOPIC or FANOUT exchanges, an empty routing key might cause unexpected routing behavior. Consider adding validation or a more informative error message for this case regardless of exchange type.
| ) | |
| ) | |
| if not routing_key_name: | |
| raise IncorrectRoutingKeyError( | |
| "Routing key is empty. When multiple queues are configured " | |
| f"you must provide a routing key via the " | |
| f"'{self._label_for_routing}' label.", | |
| ) |
| per_queue_arguments["x-dead-letter-routing-key"] = ( | ||
| self._delay_queue.routing_key | ||
| or queues[0].routing_key | ||
| or queues[0].name | ||
| ) |
There was a problem hiding this comment.
There's a potential issue with the routing key fallback logic for the delay queue. When self._delay_queue.routing_key is None, the code falls back to queues[0].routing_key or queues[0].name. However, if there are no task queues initially and one gets added by line 261, this could lead to routing delayed messages to the wrong queue. Consider explicitly validating that routing_key is set for delay queues or documenting this behavior more clearly.
Fixes:
Probably will be also be useful for #43 because we will be able to pass custom routing key.