diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f9d565..d1ee198 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Change log +# Version 0.0.25 +### Fixes +- **Azure Topic Settlement Stability:** Moved Azure Service Bus message settlement back onto the receiver-owning loop instead of settling from worker callback threads. This keeps receive and complete/abandon operations on the same receiver flow for long-running jobs. +- **Receiver Slot Tracking:** Reserved and released in-flight message slots on the receive loop so concurrency limits remain accurate while messages are still processing. +- **Lock Renewal Diagnostics:** Added logging for Service Bus lock-renew failures to make long-running lock-loss issues visible before final settlement. + # Version 0.0.23 - Updated unit test cases pipeline - Added support to upload test cases results on Azure blob @@ -109,4 +115,4 @@ Reference task: - Added classes and methods - Topic - publish - - subscribe \ No newline at end of file + - subscribe diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..540b720 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include requirements.txt \ No newline at end of file diff --git a/src/python_ms_core/core/topic/azure_topic.py b/src/python_ms_core/core/topic/azure_topic.py index deb25c3..4cf741e 100644 --- a/src/python_ms_core/core/topic/azure_topic.py +++ b/src/python_ms_core/core/topic/azure_topic.py @@ -1,21 +1,19 @@ import json - import logging +import multiprocessing as mp +import os import time +import traceback from ..config.config import TopicConfig -from ..resource_errors import ExceptionHandler from concurrent.futures import ThreadPoolExecutor from .abstract.topic_abstract import TopicAbstract from ..queue.models.queue_message import QueueMessage from azure.servicebus import ServiceBusClient, ServiceBusMessage from azure.servicebus import AutoLockRenewer -import concurrent.futures as cf import threading -logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger('AzureTopic') -logger.setLevel(logging.INFO) """ @@ -49,11 +47,24 @@ def __init__(self, config: TopicConfig=None, topic_name=None, max_concurrent_mes self.topic_name = topic_name self.publisher = self.client.get_topic_sender(topic_name=topic_name) self.executor = ThreadPoolExecutor(max_workers=max_concurrent_messages) + self.callback_execution_mode = self._get_callback_execution_mode() + self.callback_process_start_method = self._get_process_start_method() + self.process_context = self._get_process_context() self.internal_count = 0 - self.lock_renewal = AutoLockRenewer(max_workers=max_concurrent_messages) - self.max_renewal_duration = 86400 # Renew the message upto 1 day + self.max_renewal_duration = 86400 # Renew the message upto 1 day + self.lock_renewal_margin = 60 + renewer_max_workers = max(max_concurrent_messages, 2) + self.lock_renewal = AutoLockRenewer( + max_lock_renewal_duration=self.max_renewal_duration, + on_lock_renew_failure=self._handle_lock_renew_failure, + max_workers=renewer_max_workers, + ) + # The SDK default renews only in the last 10 seconds of the lock window. + # Start earlier so long-running jobs have more headroom for scheduler jitter. + self.lock_renewal._renew_period = min(self.lock_renewal_margin, self.max_renewal_duration) self.wait_time_for_message = 5 self.thread_lock = threading.Lock() + self.pending_tasks = [] def publish(self, data: QueueMessage): @@ -78,63 +89,269 @@ def subscribe(self, subscription: str, callback, max_receivable_messages=-1): self.receiver.local_received_messages = 0 while True: try: - to_receive = (self.max_concurrent_messages - self.internal_count) - total_messages_to_recieve_more = max_receivable_messages - self.receiver.local_received_messages - if max_receivable_messages > 0: - to_receive = min(to_receive, total_messages_to_recieve_more) + self._settle_completed_tasks() + to_receive = self._get_receivable_count(max_receivable_messages=max_receivable_messages) + if max_receivable_messages > 0 and self.receiver.local_received_messages >= max_receivable_messages: + if len(self.pending_tasks) == 0: + break + self._wait_for_pending_tasks(timeout=0.5) + continue if to_receive > 0: messages = self.receiver.receive_messages(max_message_count=to_receive, max_wait_time=self.wait_time_for_message) if not messages or len(messages) == 0: + if len(self.pending_tasks) > 0: + self._wait_for_pending_tasks(timeout=0.5) continue self.receiver.local_received_messages += len(messages) + with self.thread_lock: + self.internal_count += len(messages) for message in messages: - self.lock_renewal.register(self.receiver, message, max_lock_renewal_duration=self.max_renewal_duration) - execution_task = self.executor.submit(self.internal_callback, message, callback) - execution_task.add_done_callback(lambda x: self.settle_message(x)) - if self.receiver.local_received_messages >= max_receivable_messages and max_receivable_messages > 0: # Break if the messages are more than max_receivable_messages - break + execution_task = self._submit_processing_task(message, callback) + self.lock_renewal.register( + self.receiver, + message, + max_lock_renewal_duration=self.max_renewal_duration, + on_lock_renew_failure=self._handle_lock_renew_failure, + ) + self.pending_tasks.append((execution_task, message)) else: - time.sleep(self.wait_time_for_message) + if len(self.pending_tasks) > 0: + self._wait_for_pending_tasks(timeout=0.5) + else: + time.sleep(self.wait_time_for_message) except Exception as e: logger.error(f'Error in receiving messages: {e}') - def internal_callback(self, message, callbackfn): + def internal_callback(self, message_payload, callbackfn): """ Internal callback function that processes a message and invokes the callback function. Args: - message (ServiceBusMessage): The message to process. + message_payload (str): The message payload to process. callbackfn (function): The callback function to invoke. Returns: - ServiceBusMessage: The processed message. + dict: The callback status payload. """ try: - with self.thread_lock: - self.internal_count += 1 # thread safe. - queue_message = QueueMessage.data_from(str(message)) + queue_message = QueueMessage.data_from(message_payload) callbackfn(queue_message) - return [True,message] + return {'success': True, 'error': None} except Exception as e: - logger.error(f'Error in processing message: {e}') - return [False,message] + return { + 'success': False, + 'error': ''.join(traceback.format_exception(type(e), e, e.__traceback__)).strip(), + } - def settle_message(self, x: cf.Future): + def settle_message(self, x): + return self._settle_task(x) + + def _submit_processing_task(self, message, callback): + message_payload = str(message) + if self.callback_execution_mode == 'process': + try: + return self._submit_process_task(message_payload, callback) + except Exception as exc: + logger.warning( + 'Falling back to thread execution for message %s because process start failed: %s', + self._get_message_id(message), + exc, + ) + return self._submit_thread_task(message_payload, callback) + + def _submit_thread_task(self, message_payload, callback): + future = self.executor.submit(self.internal_callback, message_payload, callback) + return _FutureExecutionTask(future) + + def _submit_process_task(self, message_payload, callback): + if self.process_context is None: + raise RuntimeError('Process execution mode is not available for this environment.') + + parent_connection, child_connection = self.process_context.Pipe(duplex=False) + callback_process = self.process_context.Process( + target=_run_callback_in_subprocess, + args=(message_payload, callback, child_connection), + ) + try: + callback_process.start() + except Exception: + parent_connection.close() + child_connection.close() + raise + child_connection.close() + return _ProcessExecutionTask(callback_process, parent_connection) + + def _get_receivable_count(self, max_receivable_messages=-1): + with self.thread_lock: + available_slots = self.max_concurrent_messages - self.internal_count + if max_receivable_messages > 0: + remaining_messages = max_receivable_messages - self.receiver.local_received_messages + available_slots = min(available_slots, remaining_messages) + return max(available_slots, 0) + + def _wait_for_pending_tasks(self, timeout=0.5): + if len(self.pending_tasks) == 0: + return + if timeout <= 0: + self._settle_completed_tasks() + return + deadline = time.time() + timeout + while time.time() < deadline: + if any(task.done() for task, _ in self.pending_tasks): + break + time.sleep(min(0.1, max(deadline - time.time(), 0))) + self._settle_completed_tasks() + + def _settle_completed_tasks(self): + remaining_tasks = [] + for future, incoming_message in self.pending_tasks: + if future.done(): + self._settle_task(future, incoming_message=incoming_message) + else: + remaining_tasks.append((future, incoming_message)) + self.pending_tasks = remaining_tasks + + def _settle_task(self, x, incoming_message=None): """ Sets the message as completed and updates the internal count. Args: - x (cf.Future): The future object representing the message processing. + x: The task object representing the message processing. """ - # Lock the internal count - with self.thread_lock: - self.internal_count -= 1 - # Check if the future has an exception - [is_success,incoming_message] = x.result() - if is_success: - self.receiver.complete_message(incoming_message) - else: - print(f'Abandoning message: {incoming_message}') - self.receiver.abandon_message(incoming_message) # send back to the topic - return - - \ No newline at end of file + try: + task_result = x.result() + except Exception as e: + task_result = { + 'success': False, + 'error': f'Callback worker exited before returning a result: {e}', + } + + try: + if incoming_message is None: + return + if getattr(incoming_message, '_lock_expired', False): + logger.error( + f'Skipping settlement for message {self._get_message_id(incoming_message)} ' + f'because the lock expired at {getattr(incoming_message, "locked_until_utc", None)}. ' + f'auto_renew_error={getattr(incoming_message, "auto_renew_error", None)}' + ) + return + if task_result.get('success'): + self.receiver.complete_message(incoming_message) + else: + logger.error( + 'Processing failed for message %s: %s', + self._get_message_id(incoming_message), + task_result.get('error', 'unknown processing failure'), + ) + self.receiver.abandon_message(incoming_message) + except Exception as e: + logger.error(f'Error in settling message: {e}') + finally: + with self.thread_lock: + self.internal_count = max(self.internal_count - 1, 0) + return + + def _handle_lock_renew_failure(self, renewable, error): + message_id = self._get_message_id(renewable) + failure_reason = error or getattr(renewable, 'auto_renew_error', None) or 'lock expired before renewal could complete' + logger.error( + f'Error renewing lock for message {message_id}: {failure_reason}; ' + f'locked_until_utc={getattr(renewable, "locked_until_utc", None)}' + ) + + @staticmethod + def _get_message_id(message): + return getattr(message, 'message_id', None) or getattr(message, 'messageId', 'unknown') + + @staticmethod + def _get_callback_execution_mode(): + value = os.environ.get('TOPIC_CALLBACK_EXECUTION_MODE', 'process') + normalized = str(value).strip().lower() + if normalized in ('process', 'thread'): + return normalized + logger.warning( + 'Invalid value for TOPIC_CALLBACK_EXECUTION_MODE: %s. Using default process.', + value, + ) + return 'process' + + @staticmethod + def _get_process_start_method(): + available_methods = mp.get_all_start_methods() + default_method = 'fork' if 'fork' in available_methods else mp.get_start_method() or available_methods[0] + configured_method = os.environ.get('TOPIC_CALLBACK_PROCESS_START_METHOD', default_method) + normalized_method = str(configured_method).strip().lower() + if normalized_method in available_methods: + return normalized_method + logger.warning( + 'Invalid value for TOPIC_CALLBACK_PROCESS_START_METHOD: %s. Using default %s.', + configured_method, + default_method, + ) + return default_method + + def _get_process_context(self): + if self.callback_execution_mode != 'process': + return None + try: + return mp.get_context(self.callback_process_start_method) + except ValueError: + logger.warning( + 'Process start method %s is unavailable. Falling back to thread execution.', + self.callback_process_start_method, + ) + self.callback_execution_mode = 'thread' + return None + + +def _run_callback_in_subprocess(message_payload, callbackfn, result_connection): + try: + queue_message = QueueMessage.data_from(message_payload) + callbackfn(queue_message) + result_connection.send({'success': True, 'error': None}) + except BaseException as exc: # pragma: no cover - exercised through the parent process wrapper + result_connection.send({ + 'success': False, + 'error': ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__)).strip(), + }) + finally: + result_connection.close() + + +class _FutureExecutionTask: + def __init__(self, future): + self._future = future + + def done(self): + return self._future.done() + + def result(self): + return self._future.result() + + +class _ProcessExecutionTask: + def __init__(self, process, result_connection): + self._process = process + self._result_connection = result_connection + self._result = None + + def done(self): + return not self._process.is_alive() + + def result(self): + if self._result is not None: + return self._result + + self._process.join() + try: + if self._result_connection.poll(): + self._result = self._result_connection.recv() + else: + self._result = { + 'success': False, + 'error': f'Callback worker exited with code {self._process.exitcode} without returning a result.', + } + finally: + self._result_connection.close() + + return self._result diff --git a/src/python_ms_core/version.py b/src/python_ms_core/version.py index 071e34f..13a85f7 100644 --- a/src/python_ms_core/version.py +++ b/src/python_ms_core/version.py @@ -1 +1 @@ -__version__ = '0.0.24' \ No newline at end of file +__version__ = '0.2.5' diff --git a/tests/unit_tests/test_topic/test_azure_topic.py b/tests/unit_tests/test_topic/test_azure_topic.py new file mode 100644 index 0000000..6b68937 --- /dev/null +++ b/tests/unit_tests/test_topic/test_azure_topic.py @@ -0,0 +1,382 @@ +import os +import unittest +from unittest.mock import MagicMock, patch + +from src.python_ms_core.core.topic.azure_topic import AzureTopic + + +class CompletedTask: + def __init__(self, result=None, error=None): + self._result = result + self._error = error + + def done(self): + return True + + def result(self): + if self._error is not None: + raise self._error + return self._result + + +class TestAzureTopic(unittest.TestCase): + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_init_sets_process_execution_defaults( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_renewer = MagicMock() + mock_process_context = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_auto_lock_renewer.return_value = mock_renewer + mock_get_context.return_value = mock_process_context + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + + self.assertEqual(topic.callback_execution_mode, 'process') + self.assertEqual(topic.callback_process_start_method, 'fork') + self.assertIs(topic.process_context, mock_process_context) + mock_auto_lock_renewer.assert_called_once() + _, kwargs = mock_auto_lock_renewer.call_args + self.assertEqual(kwargs['max_lock_renewal_duration'], 86400) + self.assertEqual(kwargs['max_workers'], 2) + self.assertEqual(mock_renewer._renew_period, 60) + mock_get_context.assert_called_once_with('fork') + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_submit_processing_task_uses_process_runner_by_default( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_callback = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.__str__.return_value = '{"message":"hello"}' + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._submit_process_task = MagicMock(return_value='process-task') + topic._submit_thread_task = MagicMock(return_value='thread-task') + + task = topic._submit_processing_task(mock_message, mock_callback) + + self.assertEqual(task, 'process-task') + topic._submit_process_task.assert_called_once_with('{"message":"hello"}', mock_callback) + topic._submit_thread_task.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_submit_processing_task_falls_back_to_thread_when_process_start_fails( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + mock_callback = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.__str__.return_value = '{"message":"hello"}' + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic._submit_process_task = MagicMock(side_effect=RuntimeError('process boom')) + topic._submit_thread_task = MagicMock(return_value='thread-task') + + task = topic._submit_processing_task(mock_message, mock_callback) + + self.assertEqual(task, 'thread-task') + topic._submit_thread_task.assert_called_once_with('{"message":"hello"}', mock_callback) + mock_logger.warning.assert_called_once() + warning_args = mock_logger.warning.call_args[0] + self.assertEqual( + warning_args[0], + 'Falling back to thread execution for message %s because process start failed: %s', + ) + self.assertEqual(warning_args[1], 'message-1') + self.assertEqual(str(warning_args[2]), 'process boom') + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_subscribe_settles_completed_tasks_on_receiver_loop( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message._lock_expired = False + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_receiver.receive_messages.side_effect = [[mock_message]] + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + callback = MagicMock() + topic._submit_processing_task = MagicMock( + return_value=CompletedTask({'success': True, 'error': None}) + ) + + topic.subscribe(subscription='mock-subscription', callback=callback, max_receivable_messages=1) + + topic._submit_processing_task.assert_called_once_with(mock_message, callback) + mock_auto_lock_renewer.return_value.register.assert_called_once_with( + mock_receiver, + mock_message, + max_lock_renewal_duration=topic.max_renewal_duration, + on_lock_renew_failure=topic._handle_lock_renew_failure, + ) + mock_receiver.complete_message.assert_called_once_with(mock_message) + mock_receiver.abandon_message.assert_not_called() + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_abandons_message_when_worker_reports_failure( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = False + mock_message.message_id = 'message-1' + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask({'success': False, 'error': 'worker failure'}), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with( + 'Processing failed for message %s: %s', + 'message-1', + 'worker failure', + ) + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_abandons_message_when_worker_exits_without_result( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = False + mock_message.message_id = 'message-2' + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask(error=RuntimeError('worker died')), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with( + 'Processing failed for message %s: %s', + 'message-2', + 'Callback worker exited before returning a result: worker died', + ) + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_logs_error_and_releases_slot( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = False + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_receiver.complete_message.side_effect = Exception('Mocked settlement failure') + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask({'success': True, 'error': None}), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_called_once_with(mock_message) + mock_logger.error.assert_called_once_with('Error in settling message: Mocked settlement failure') + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_settle_task_skips_expired_message( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_receiver = MagicMock() + mock_message = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + + mock_message._lock_expired = True + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message.auto_renew_error = None + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_client.get_subscription_receiver.return_value = mock_receiver + mock_get_context.return_value = MagicMock() + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + topic.receiver = mock_receiver + topic.internal_count = 1 + + topic._settle_task( + CompletedTask({'success': True, 'error': None}), + incoming_message=mock_message, + ) + + mock_receiver.complete_message.assert_not_called() + mock_receiver.abandon_message.assert_not_called() + mock_logger.error.assert_called_once_with( + 'Skipping settlement for message message-1 because the lock expired at ' + '2026-03-17T09:39:28Z. auto_renew_error=None' + ) + self.assertEqual(topic.internal_count, 0) + + @patch.dict(os.environ, {}, clear=True) + @patch('src.python_ms_core.core.topic.azure_topic.logger') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_context') + @patch('src.python_ms_core.core.topic.azure_topic.mp.get_all_start_methods', return_value=['fork', 'spawn']) + @patch('src.python_ms_core.core.topic.azure_topic.AutoLockRenewer') + @patch('src.python_ms_core.core.topic.azure_topic.ServiceBusClient') + def test_handle_lock_renew_failure_logs_when_sdk_returns_no_error( + self, + mock_service_bus_client, + mock_auto_lock_renewer, + mock_get_all_start_methods, + mock_get_context, + mock_logger, + ): + mock_client = MagicMock() + mock_config = MagicMock(connection_string='Endpoint=sb://test/') + mock_message = MagicMock() + + mock_service_bus_client.from_connection_string.return_value = mock_client + mock_client.get_topic_sender.return_value = MagicMock() + mock_get_context.return_value = MagicMock() + mock_message.message_id = 'message-1' + mock_message.locked_until_utc = '2026-03-17T09:39:28Z' + mock_message.auto_renew_error = None + + topic = AzureTopic(config=mock_config, topic_name='mock-topic', max_concurrent_messages=1) + + topic._handle_lock_renew_failure(mock_message, None) + + mock_logger.error.assert_called_once_with( + 'Error renewing lock for message message-1: lock expired before renewal could complete; ' + 'locked_until_utc=2026-03-17T09:39:28Z' + ) + + +if __name__ == '__main__': + unittest.main()