diff --git a/openevolve/process_parallel.py b/openevolve/process_parallel.py index a2fd6592a9..8a7421dae3 100644 --- a/openevolve/process_parallel.py +++ b/openevolve/process_parallel.py @@ -511,6 +511,7 @@ async def run_evolution( # Early stopping tracking early_stopping_enabled = self.config.early_stopping_patience is not None + termination_reason = None if early_stopping_enabled: best_score = float("-inf") iterations_without_improvement = 0 @@ -686,6 +687,7 @@ async def run_evolution( logger.info( f"Target score {target_score} reached at iteration {completed_iteration}" ) + termination_reason = "target_score" break # Check early stopping @@ -731,6 +733,7 @@ async def run_evolution( f"No improvement for {iterations_without_improvement} iterations " f"(best score: {best_score:.4f})" ) + termination_reason = "early_stopping" break else: @@ -742,6 +745,7 @@ async def run_evolution( f"Task successfully solved with score {best_score:.4f}." ) self.early_stopping_triggered = True + termination_reason = "early_stopping" break except FutureTimeoutError: @@ -777,17 +781,40 @@ async def run_evolution( next_iteration += 1 break # Only submit one iteration per completion to maintain balance - # Handle shutdown - if self.shutdown_event.is_set(): - logger.info("Shutdown requested, canceling remaining evaluations...") + # Handle shutdown / early termination by canceling pending work quickly. + if self.shutdown_event.is_set() and termination_reason is None: + termination_reason = "shutdown" + + if termination_reason in {"target_score", "early_stopping", "shutdown"}: + if termination_reason == "shutdown": + logger.info("Shutdown requested, canceling remaining evaluations...") + else: + logger.info( + f"{termination_reason} triggered, canceling remaining pending evaluations..." + ) for future in pending_futures.values(): future.cancel() + # Determine completion reason if no explicit early termination was set. + if termination_reason is None: + if completed_iterations >= max_iterations: + termination_reason = "max_iterations" + elif not pending_futures: + termination_reason = "no_pending_futures" + elif self.shutdown_event.is_set(): + termination_reason = "shutdown" + else: + termination_reason = "max_iterations" + # Log completion reason - if self.early_stopping_triggered: + if termination_reason == "early_stopping": logger.info("✅ Evolution completed - Early stopping triggered due to convergence") - elif self.shutdown_event.is_set(): + elif termination_reason == "target_score": + logger.info("✅ Evolution completed - Target score reached") + elif termination_reason == "shutdown": logger.info("✅ Evolution completed - Shutdown requested") + elif termination_reason == "no_pending_futures": + logger.info("✅ Evolution completed - No pending futures remaining") else: logger.info("✅ Evolution completed - Maximum iterations reached") diff --git a/tests/test_process_parallel.py b/tests/test_process_parallel.py index 8cdd525b33..fe671e9e9e 100644 --- a/tests/test_process_parallel.py +++ b/tests/test_process_parallel.py @@ -163,6 +163,47 @@ def test_request_shutdown(self): # Verify shutdown event is set self.assertTrue(controller.shutdown_event.is_set()) + def test_target_score_fast_exit_cancels_pending_futures(self): + """Test that reaching target score cancels pending futures immediately.""" + + async def run_test(): + controller = ProcessParallelController(self.config, self.eval_file, self.database) + controller.executor = Mock() # run_evolution requires a started executor + + done_future = MagicMock(spec=Future) + done_future.done.return_value = True + done_future.result.return_value = SerializableResult( + child_program_dict={ + "id": "child_target_hit", + "code": "def evolved(): return 99", + "language": "python", + "parent_id": "test_0", + "generation": 1, + "metrics": {"combined_score": 0.95, "score": 0.95}, + "iteration_found": 1, + "metadata": {"changes": "target score hit", "island": 0}, + }, + parent_id="test_0", + iteration_time=0.1, + iteration=1, + target_island=0, + ) + + pending_future = MagicMock(spec=Future) + pending_future.done.return_value = False + pending_future.cancel.return_value = True + + with patch.object( + controller, "_submit_iteration", side_effect=[done_future, pending_future] + ) as mock_submit: + await controller.run_evolution(start_iteration=1, max_iterations=2, target_score=0.9) + + self.assertEqual(mock_submit.call_count, 2) + self.assertTrue(pending_future.cancel.called) + self.assertIn("child_target_hit", self.database.programs) + + asyncio.run(run_test()) + def test_serializable_result(self): """Test SerializableResult dataclass""" result = SerializableResult(